RxJava 学习笔记

为什么要学RxJava

  1. 最大的好处在于,很优雅处理各种线程切换,能使整个代码块的逻辑连贯,易于阅读和维护;
  2. 提供丰富的操作符, 能使很多复杂的操作简单几步搞定;
  3. 运用响应式编程,使整个编程更流畅简洁;

开源库


RxJava     RxAndroid

Rx Sample

Rx 相关的开源工具库

  • RxAnimations
  • RxBinding
  • rx-preferences Reactive SharedPreferences for Android
  • requery 支持RxJava Kotlin / Android ORM库
  • RxLifecycle 处理由Activity 或 Fragment生命周期,导致没有完成的订阅(subscriptions)触发的内存泄露
  • Frodo 模仿Jake Wharton’s Hugo,使用Java切面编程的Android 日志工具库
  • RxIAPv3 一个用Rx封装后的 Android App内购支付库,提供几个购买、消费和商品清单列表的Rx方法。
  • RxFile
  • RxCamera RxJava style API for android camera
  • RxPermissions Android runtime permissions powered by RxJava

Resource

官方文档

翻译文档

一些高质量学习资源

操作符


操作符相关文章

throttleFirst():

RxView.clickEvents(button) // RxBinding 代码,后面的文章有解释 .throttleFirst(500, TimeUnit.MILLISECONDS)

FlatMap无序,concatMap有序排列

FlatMap应用场景: 1. 抽取对象的集合并逐个输出; 2. 嵌套回调解决回调地狱的问题;

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);

first() vs. takeFirst()

The difference between the two calls is that first()
will throw a NoSuchElementException
if none of the sources emits valid data, whereas takeFirst()
will simply complete without exception.

.groupBy()

一旦产生的Observable被订阅,分组产生的GroupedObservable就开始缓存,没有被订阅(处理)的GroupedObservable可能会引起内存泄露,
故,对不想处理的GroupedObservable,使用take(0)释放缓存;

combineLatest

笔记

操作符分类
合并型
combineLatest
join
merge
mergeDelayError
switchOnNext操作符是把一组Observable转换成一个Observable
zip操作符是把两个observable提交的结果,严格按照顺序进行合并
截取添加分类型
startWith

buffer
debounce
window
groupBy
distinct

elementAt
filter
ofType
first
last
single
sample
skip
skipLast
take
错误处理
onErrorReturn
onErrorResumeNext return Observerable
onExceptionResumeNext

改变流程走向
retry
retryWhen

其他
concatMap
cast
scan
ignoreElements操作符忽略所有源Observable产生的结果,只把Observable的onCompleted和onError事件通知给订阅者

进价问题研究


线程控制 Scheduler

在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
    另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
    有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。
  • subscribeOn() vs observeOn() 对事情线程的影响;

    subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。

  • doOnSubscribe()的后面跟一个 subscribeOn() ,就能指定准备工作的线程了。
    Observable.create(onSubscribe)
      .subscribeOn(Schedulers.io())
      .doOnSubscribe(new Action0() {
          @Override
          public void call() {
              progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
          }
      })
      .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(subscriber);

Backpressure 【待研究】

生产速度比消费快,抛出MissingBackpressureException的异常
官方对Backpressure的介绍及对应的解决方案

使用案例


省略subscribeOn()和observeOn()在切换线程时

在Android开发的时候频繁使用subscribeOn()和observeOn(),后台线程和UI线程切换
这可以可以抽离出来(文章:避免打断链式结构:使用.compose( )操作符

 <T> Transformer<T, T> applySchedulers() {
 return new Transformer<T, T>() {
   @Override
   public Observable<T> call(Observable<T> observable) {
     return observable.subscribeOn(Schedulers.io())
         .observeOn(AndroidSchedulers.mainThread());
   }
 };
}

使用debounce做textSearch

用简单的话讲就是当N个结点发生的时间太靠近(即发生的时间差小于设定的值T),debounce就会自动过滤掉前N-1个结点。
解决频繁改变,导致的多余的网络访问(场景举例:删除多余信息)

RxTextView.textChangeEvents(inputEditText)
      .debounce(400, TimeUnit.MILLISECONDS)
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Observer<TextViewTextChangeEvent>() {
    @Override
    public void onCompleted() {
        log.d("onComplete");
    }

    @Override
    public void onError(Throwable e) {
        log.d("Error");
    }

    @Override
    public void onNext(TextViewTextChangeEvent onTextChangeEvent) {
        log.d(format("Searching for %s", onTextChangeEvent.text().toString()));
    }
});

RxJava代替EventBus,Otto:RxBus

延迟

Observable.timer(2, TimeUnit.SECONDS)
              .subscribe(new Observer<Long>() {
                  @Override
                  public void onCompleted() {
                      log.d ("completed");
                  }

                  @Override
                  public void onError(Throwable e) {
                      log.e("error");
                  }

                  @Override
                  public void onNext(Long number) {
                      log.d ("hello world");
                  }
              });

schedulePeriodically轮询

Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(final Subscriber<? super String> observer) {

                Schedulers.newThread().createWorker()
                      .schedulePeriodically(new Action0() {
                          @Override
                          public void call() {
                              observer.onNext(doNetworkCallAndGetStringResult());
                          }
                      }, INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                log.d("polling….));
            }
        })

使用RxJava处理复杂的Url操作

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))  //拆分List
    .flatMap(url -> getTitle(url)) // 获取网站头名称
    .filter(title -> title != null) // 过滤掉title == null
    .take(5) // 取前5个
    .doOnNext(title -> saveTitle(title)) // 在得到结果前保存标题到Disk里
    .subscribe(title -> System.out.println(title));

按顺序从内存–>文件–>网络获取资源,成功程序停止

final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        if (memoryCache != null) {
            subscriber.onNext(memoryCache);
        } else {
            subscriber.onCompleted();
        }
    }
});


Observable.concat(memory, disk, network)
.first()
.subscribeOn(Schedulers.newThread())
.subscribe(s -> {
    memoryCache = "memory";
    System.out.println("--------------subscribe: " + s);
});

多个异步并发处理完,再更新

  Observable.merge(observable1, observable2)
            .subscribeOn(Schedulers.newThread())
            .subscribe(System.out::println);

flatMap处理异步嵌套异步(Callback Hell)

NetworkService.getToken("username", "password")
    .flatMap(s -> NetworkService.getMessage(s))
    .subscribe(s -> {
        System.out.println("message: " + s);
    });

按钮防抖

RxView.clicks(findViewById(R.id.btn_throttle))
    .throttleFirst(1, TimeUnit.SECONDS)
    .subscribe(aVoid -> {
        System.out.println("click");
    });

响应式的界面

勾选了某个checkbox,自动更新对应的preference

SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);
Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);
CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.checkedChanges(checkBox)
        .subscribe(checked.asAction());

Fragment 旋转缓存【待研究】

  @Override public void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    /*.cache()操作符: 当第一个subscribe订阅的时候,才会连接原始Observable,缓存事件,
      重发给后续订阅的subscribe 值得注意的事,
       它和使用了.replay().publish()操作符的ConnectableObservable的不同。
       另外,为了避免内存开销,不建议缓存大量事件*/
    setRetainInstance(true);
    cacheObservable = weatherManager.getWeather().cache();
  }

  @Override public void onViewCreated(View view, Bundle savedInstanceState) {
    super.onViewCreated(view, savedInstanceState);
    cacheObservable.subscribe(/*your subscribe*/);
  }

测试调试 【待研究】


实战笔记


RxJava-Android-Samples
RetrofitFragment.java
Observable.zip / Observable.just gist code

DebounceSearchEmitterFragment.java
RxTextView.textChangeEvents gitst code

BufferDemoFragment.java
RxView.clickEvents/ .buffer(2, TimeUnit.SECONDS) //收集2s内的点击事件到List中

 private Subscription _getBufferedSubscription() {

        return RxView.clickEvents(_tapBtn)
              .map(new Func1<ViewClickEvent, Integer>() {
                  @Override
                  public Integer call(ViewClickEvent onClickEvent) {
                      Timber.d("--------- GOT A TAP");
                      _log("GOT A TAP");
                      return 1;
                  }
              })
              .buffer(2, TimeUnit.SECONDS) //收集2s内的点击事件到List中
              .observeOn(AndroidSchedulers.mainThread())
              .subscribe(new Observer<List<Integer>>() {} );

PublishSubject

 PublishSubject<Float>   _resultEmitterSubject = PublishSubject.create();
        _subscription = _resultEmitterSubject.asObservable().subscribe(new Action1<Float>() {
            @Override
            public void call(Float aFloat) {
                _result.setText(String.valueOf(aFloat));
            }
        });
         _resultEmitterSubject.onNext(num1 + num2);
         @OnTextChanged

PollingFragment.java
CompositeSubscription / interval /doOnSubscribe /repeatWhen /Observable.timer

       _subscriptions.add(//
              Observable.just(1)
                    .repeatWhen(new RepeatWithDelay(pollCount, pollingInterval))
                    .subscribe(new Action1<Object>() {
                        @Override
                        public void call(Object o) {
                            _log(String.format(Locale.US, "Executing polled task now time : [xx:%02d]",
                                  _getSecondHand()));
                        }
                    }, new Action1<Throwable>() {
                        @Override
                        public void call(Throwable e) {
                            Timber.d(e, "arrrr. Error");
                        }
                    })
        );
            _subscriptions.add(//
              Observable.interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS)
                    .map(new Func1<Long, String>() {
                        @Override
                        public String call(Long heartBeat) {
                            return _doNetworkCallAndGetStringResult(heartBeat);
                        }
                    }).take(pollCount)
                    .doOnSubscribe(new Action0() {
                        @Override
                        public void call() {
                            _log(String.format("Start simple polling - %s", _counter));
                        }
                    })
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String taskName) {
                            _log(String.format(Locale.US, "Executing polled task [%s] now time : [xx:%02d]",
                                  taskName, _getSecondHand()));
                        }
                    })
        );
    public class RepeatWithDelay
          implements Func1<Observable<? extends Void>, Observable<?>> {

        private final int _repeatLimit;
        private final int _pollingInterval;
        private int _repeatCount = 1;

        RepeatWithDelay(int repeatLimit, int pollingInterval) {
            _pollingInterval = pollingInterval;
            _repeatLimit = repeatLimit;
        }

        // this is a notificationhandler, all we care about is
        // the emission "type" not emission "content"
        // only onNext triggers a re-subscription

        @Override
        public Observable<?> call(Observable<? extends Void> inputObservable) {

            // it is critical to use inputObservable in the chain for the result
            // ignoring it and doing your own thing will break the sequence

            return inputObservable.flatMap(new Func1<Void, Observable<?>>() {
                @Override
                public Observable<?> call(Void blah) {


                    if (_repeatCount >= _repeatLimit) {
                        // terminate the sequence cause we reached the limit
                        _log("Completing sequence");
                        return Observable.empty();
                    }

                    // since we don't get an input
                    // we store state in this handler to tell us the point of time we're firing
                    _repeatCount++;

                    return Observable.timer(_repeatCount * _pollingInterval,
                          TimeUnit.MILLISECONDS);
                }
            });
        }
    }

RxBusDemoFragment.java
ConnectableObservable / SerializedSubject
gist代码

private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());
_bus.onNext(object);
_bus.hasObservers();
ConnectableObservable<Object> tapEventEmitter = _rxBus.toObserverable().publish();
   _subscriptions//
              .add(tapEventEmitter.subscribe(new Action1<Object>() {
                  @Override
                  public void call(Object event) {
                      if (event instanceof RxBusDemoFragment.TapEvent) {
                          _showTapText();
                      }
                  }
              }));
 stream.buffer(stream.debounce(1, TimeUnit.SECONDS));
 _subscriptions.add(tapEventEmitter.connect());

FormValidationCombineLatestFragment.java

private Observable<CharSequence> _numberChangeObservable = RxTextView.textChanges(_email_editText).skip(1);
 _subscription = Observable.combineLatest(_emailChangeObservable,
              _passwordChangeObservable,
              _numberChangeObservable, new Fun3...)

PseudoCacheMergeFragment.java
Observable.merge();
TimingDemoFragment.java

Observable.timer(2, TimeUnit.SECONDS)//
              //.just(1).delay(2, TimeUnit.SECONDS)//
              //.interval(1, TimeUnit.SECONDS)//
              //.interval(0, 1, TimeUnit.SECONDS)//
              //.interval(3, TimeUnit.SECONDS).take(5)//

ExponentialBackoffFragment.java

  Observable//
                    .error(new RuntimeException("testing")) // always fails
                    .retryWhen(new RetryWithDelay(5, 1000)) //当错误时调用,适用于错误处理
 //1-4 指数的递增延迟
Observable.range(1, 4)//
                    .delay(new Func1<Integer, Observable<Integer>>() {
                        @Override
                        public Observable<Integer> call(final Integer integer) {
                            // Rx-y way of doing the Fibonnaci :P
                            return MathObservable//
                                  .sumInteger(Observable.range(1, integer))
                                  .flatMap(new Func1<Integer, Observable<Integer>>() {
                                      @Override
                                      public Observable<Integer> call(Integer targetSecondDelay) {
                                          return Observable.just(integer)
                                                .delay(targetSecondDelay, TimeUnit.SECONDS);
                                      }
                                  });
                        }
                    })//
RxView.clickEvents(button) // RxBinding 代码,后面的文章有解释
    .throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为 500ms
    .subscribe(subscriber);

其他


RxJS

参考