关于rxjava的Subscriber自动取消订阅

2016-07-27· 7630 次浏览
通过查看源代码发现 ``` static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { 。。。 。。。 /* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */ // if not already wrapped if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber<T>(subscriber); } 。。。 。。。 } ``` 在Observable的subscribe里面他会检查当前传进去的订阅者是不是SafeSubscriber,如果不是咋转换成SafeSubscriber; 为什么要转换成SafeSubscriber呢?看代码 ``` public class SafeSubscriber<T> extends Subscriber<T> { private final Subscriber<? super T> actual; boolean done = false; public SafeSubscriber(Subscriber<? super T> actual) { super(actual); this.actual = actual; } /** * Notifies the Subscriber that the {@code Observable} has finished sending push-based notifications. * <p> * The {@code Observable} will not call this method if it calls {@link #onError}. */ @Override public void onCompleted() { if (!done) { done = true; try { actual.onCompleted(); } catch (Throwable e) { // we handle here instead of another method so we don't add stacks to the frame // which can prevent it from being able to handle StackOverflow Exceptions.throwIfFatal(e); RxJavaPluginUtils.handleException(e); throw new OnCompletedFailedException(e.getMessage(), e); } finally { try { // Similarly to onError if failure occurs in unsubscribe then Rx contract is broken // and we throw an UnsubscribeFailureException. unsubscribe(); } catch (Throwable e) { RxJavaPluginUtils.handleException(e); throw new UnsubscribeFailedException(e.getMessage(), e); } } } } /** * Notifies the Subscriber that the {@code Observable} has experienced an error condition. * <p> * If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or * {@link #onCompleted}. * * @param e * the exception encountered by the Observable */ @Override public void onError(Throwable e) { // we handle here instead of another method so we don't add stacks to the frame // which can prevent it from being able to handle StackOverflow Exceptions.throwIfFatal(e); if (!done) { done = true; _onError(e); } } /** * Provides the Subscriber with a new item to observe. * <p> * The {@code Observable} may call this method 0 or more times. * <p> * The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or * {@link #onError}. * * @param args * the item emitted by the Observable */ @Override public void onNext(T args) { try { if (!done) { actual.onNext(args); } } catch (Throwable e) { // we handle here instead of another method so we don't add stacks to the frame // which can prevent it from being able to handle StackOverflow Exceptions.throwOrReport(e, this); } } /** * The logic for {@code onError} without the {@code isFinished} check so it can be called from within * {@code onCompleted}. * * @see <a href="https://github.com/ReactiveX/RxJava/issues/630">the report of this bug</a> */ protected void _onError(Throwable e) { RxJavaPluginUtils.handleException(e); try { actual.onError(e); } catch (Throwable e2) { if (e2 instanceof OnErrorNotImplementedException) { /* * onError isn't implemented so throw * * https://github.com/ReactiveX/RxJava/issues/198 * * Rx Design Guidelines 5.2 * * "when calling the Subscribe method that only has an onNext argument, the OnError behavior * will be to rethrow the exception on the thread that the message comes out from the observable * sequence. The OnCompleted behavior in this case is to do nothing." */ try { unsubscribe(); } catch (Throwable unsubscribeException) { RxJavaPluginUtils.handleException(unsubscribeException); throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException))); } throw (OnErrorNotImplementedException) e2; } else { /* * throw since the Rx contract is broken if onError failed * * https://github.com/ReactiveX/RxJava/issues/198 */ RxJavaPluginUtils.handleException(e2); try { unsubscribe(); } catch (Throwable unsubscribeException) { RxJavaPluginUtils.handleException(unsubscribeException); throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException))); } throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2))); } } // if we did not throw above we will unsubscribe here, if onError failed then unsubscribe happens in the catch try { unsubscribe(); } catch (RuntimeException unsubscribeException) { RxJavaPluginUtils.handleException(unsubscribeException); throw new OnErrorFailedException(unsubscribeException); } } /** * Returns the {@link Subscriber} underlying this {@code SafeSubscriber}. * * @return the {@link Subscriber} that was used to create this {@code SafeSubscriber} */ public Subscriber<? super T> getActual() { return actual; } } ``` 通过查看代码可以发现SafeSubscriber在onCompleted和onError的时候都会去调用unsubscribe()这个方法,这个是Subscriber的方法里面是来取消这个订阅者,SafeSubscriber是继承自Subscriber。 也就是说我们不用担心创建了那么多的订阅者而没有及时的取消而带来问题。