آموزشگاه برنامه نویسی تحلیل داده
آموزشگاه برنامه نویسی تحلیل داده

استفاده از RxJava 2.0

دوره های مرتبط با این مقاله

آموزش RxJava 2.0

آموزش RxJava و شرح مفهوم برنامه نویسی Reactive یا ناهمزمان

هر برنامه ای که دارای چهار ویژگی یا پارامتر (قابلیت مدیریت خطاها و ارائه ی بهترین سرویس) resilience، (انعطاف و مقایس پذیری) scalability، (واکنش گرا و تعاملی) responsive و (رویداد محور) Event-driven باشد را در اصطلاح reactive می نامند. در مدل برنامه نویسی reactive، سرویس گیرنده یا کاربر به محض ورود داده به آن واکنش نشان می دهند. به همین خاطر برنامه نویسی asynch را برنامه نویسی reactive نیز می خوانند. مدل برنامه نویسی reactive این امکان را فراهم می آورد تا تغییرات event ها را به observer ها (توابع) گوش فرا دهنده و ثبت شده، منتشر (propagate) و اعمال نمایید.
در واقع RxJava امکان پیاده سازی وظایف غیرهمزمان را به راحتی برای توسعه دهنده فراهم می آورد.
RxJava پیاده سازی از ReativeX برای Java می باشد که NetFlix توسعه داده و در بین برنامه نویسان جاوا محبوبیت زیادی پیدا کره است. Rxjava در سال 2014 کد باز (open source) شده و تحت آدرس http://reactivex.io/ میزبانی می شود. ورژن جاوایی آن تحت https://github.com/ReactiveX/RxJava قابل دسترسی می باشد. این کتابخانه تحت لیسانس Apache 2.0 منتشر می شود.
RxJava خود را تحت عنوان یک API که برای برنامه نویسی ناهمزمان طراحی شده (تعریف تسک ها و وظایف ناهمزمان) و از stream observable (آبجکت های observable اطلاعاتی را منتشر کرده و آن ها را به توابع گوش فرادهنده به تغییرات که همان observer ها هستند ارسال می کنند) استفاده می کند، معرفی می نماید.

آموزش اضافه کردن کتابخانه RxJava 2.0

از زمان نگارش این مقاله، ویرایش جدیدتری از کتابخانه ی RxJava ارائه شده است. مقدار g.a.y را با 2.0.1 یا نسخه ی جدیدتر جایگزین نمایید.
برای افزودن کتابخانه ی مزبور ویژه ی سیستم کامپایل Maven، می توانید تکه کد زیر را در بخش dependency ها اضافه نمایید.


                io.reactivex.rxjava2
                rxjava
                g.a.v
 

برای سیستم کامپایل Gradle، می توانید RxJava را از طریق دستور زیر اضافه نمایید.

compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: 'g.a.v'

آموزش برنامه نویسی ناهمزمان (Async)

امروزه برنامه نویسی به سبک دستوری/imperative طوری که اپلیکیشن در لحظه تنها یک عملیات را پردازش کند (single-thread) دیگر به هیچ وجه کارامد نیست و ممکن است UI های unresponsive را مسدود کند و در نهایت تجربه ی کاربری ضعیفی را در پی داشته باشد.
با مدیریت رخدادهای پیش بینی نشده به صورت ناهمزمان (async)، می توان از این اتفاق جلوگیری کرد. برای مثال، چنانچه لازم باشد منتظر فراخوانی وب سرویس یا کوئری دیتابیس باشید، اگر شبکه پاسخگو نبود (responsive)، قطعا اپلیکیشن هنگ خواهد کرد.
مثالی در این زمینه:

public List getTodos() {
        List todosFromWeb = // query a webservice (with bad network latency)
        return todosFromDb;
}

فراخوانی متد getTodos() از thread اصلی یا یک UI thread سبب می شود تا زمان رسیدن todosFromWeb، اپلیکیشن nonresponsive بوده و تجربه ی کاربری ضعیفی برای کاربر رقم خورده شود.
برای بهبود کارایی این query که مدت زمان اجرای آن مشخص نیست، می بایست آن را در یک thread دیگر اجرا نموده و سپس به هنگام رسیدن نتیجه، thread اصلی را مطلع نمایید.

public void getTodos(Consumer> todosConsumer) {
        Thread thread = new Thread(()-> {
                List todosFromWeb = // query a webservice
                todosConsumer.accept(todosFromWeb);
        });
        thread.start();
}

حال پس از فراخوانی متد getTodos(Consumer> todosConsumer)، نخ یا thread اصلی می تواند بدون متوقف شدن همچنان به اجرا ادامه دهد و زمانی که متد accept از consumer صدا خورده شد، واکنش نشان دهد.

آموزش Observable ها، Observer ها و Subscription ها

RxJava از مفهوم Observable ها و Observer ها استفاده می کند. Observer ها توابعی هستند که با گوش فراخوانی و subscribe کردن به Observable ها که آبجکت هستند، از تغییرات رخ داده بلافاصله آگاه شده و آن تغییرات را در خود منعکس می نمایند. Observer ها به مجرد اینکه یک observable مقداری را emit (ارسال) می کند، از آن مطلع می شوند. همچنین زمانی که observable اطلاعاتی را مبنی بر اینکه دیگر مقادیری وجود ندارد ارسال می کند، observer ها از آن مطلع می شوند. زمانی هم که observable با خطایی مواجه می شود، متد onError(Throwable e) خورده شده و باز observer از آن آگاه می شود. توابع مربوطه onNext()، onError() و onCompleted() هستند که همگی از interface ای به نام Observer ارث بری می شوند. آبجکت یا نمونه ای از Subscription بیانگر اتصال و ارتباط بین یک observer و observable است. اگر متد unsubscribe() را بر روی این نمونه صدا بزنید، observable از عضویت متد بیرون آمده و اتصال بین آن ها قطع می شود. این متد می تواند زمانی مفید باشد که شما می خواهید بروز رسانی و آپدیت یک المان رابط کاربری یا widget خاتمه یافته (dispose) و از حافظه حذف شود.

آموزش thread اجرا و یک thread برای گوش دادن به تغییرات (observe)

می توانید thread ای که observable در آن قرار است اجرا شود را با فراخوانی متد subscribeOn() تعریف نمایید و thread ای که observer ها در آن اجرا می شوند را با اجرای متد observeOn() اعلان کنید.

آموزش Operator ها

می توانید عملیاتی را بر روی observer های خود ثبت کنید که به شما اجازه می دهد نتیجه و emission یک observable را پیش از ارسال آن به observer ویرایش و دستکاری نمایید. متد map به شما این امکان را می دهد یک Func1 ثبت کنید که ورودی را ترجمه و تبدیل می کند.

آموزش استفاده از delay و به تعویق انداختن تولید و ارسال نتیجه

با فراخوانی متد debounce(delay, TimeUnit.MILLISECONDS) بر روی observer، می توانید اعلان کنید که observer تنها زمانی تغییرات را ارسال یا به عنوان نتیجه ارسال (emit) نماید که مقدار مورد نظر پس از گذشت مدت زمان از پیش تعیین شده تغییر نکرده باشد.

آموزش ایجاد Observable ها و Observer ها

RxJava متدهای زیادی برای ایجاد observable در اختیار توسعه دهنده قرار می دهد. این متدها عبارت اند از:

  • Observable.just() – یک Observable به عنوان ظرف ایجاد می کند که انواع داده ای را دربرمی گیرد. به عبارت دیگر این متد یک آبجکت را به observable ارسال می کند یا یک یا چند object را به یک Observable تبدیل می کند و سپس Observable آن آبجکت یا آبجکت ها را به observer می فرستد.
  • Observable.from() – یک collection یا آرایه گرفته، مقادیر آن ها را به ترتیب موجود در ساختار داده ای مربوطه، emit یا به عنوان نتیجه ارسال می کند. در واقع این متد یک آرایه دریافت کرده، به ازای هر المان در ساختار داده ای مزبور، تابع observer را صدا می زند و مقدار را می فرستد.
  • Observable.fromCallable() – به شما امکان ساخت یک observable برای Callable را می دهد.
    به منظور ساخت observer ها:
  • Action1 را پیاده سازی نمایید – به شما این امکان را می دهد تا observer ساده ایجاد کنید که یک متد به نام call دارد. در صورت تولید یا ارسال (emit) آبجکت جدید، این متد صدا خورده می شود.

مثالی از فراخوانی متد Observable.just()

تابع Observable.just() یک Observable ایجاد می کند و زمانی که یک observer به این Observable گوش می دهد یا برای آن subscribe می کند، متد onNext() از Observer با آرگومان ورودی Observable.just() فراخوانی می شود.

import java.util.Arrays;
import java.util.List;
import rx.Observable;
public class RxJavaExample {
        public static void main(String[] args) {
                List list = Arrays.asList("Android", "Ubuntu", "Mac OS");    1
                Observable> listObservable = Observable.just(list);  2
                listObservable.subscribe(new Observer>() {
						    3
            @Override
            public void onCompleted() {}
            @Override
            public void onError(Throwable e) {}
            @Override
            public void onNext(List list) {
                                System.out.println(list);
            }
        });
        }
}
  1. یک لیست ایجاد می کند.
  2. Observable را ایجاد می کند.
  3. Observer را برای گوش دادن به Observable ثبت می کند.

آموزش انجام عملیات تبدیل

در زیر مثال ساده ای از RxJava را مشاهده می کنید که نوعی عملیات تبدیل در آن رخ داده است.

import java.util.Arrays;
import java.util.List;
import rx.Observable;
public class RxJavaExample {
        public static void main(String[] args) {
                List list = Arrays.asList("Hello", "Streams", "Not");
                Observable.from(list).
                                filter(s -> s.contains("e")).
                                map(s -> s.toUpperCase()).
                                reduce(new StringBuilder(), StringBuilder::append).
                                subscribe(System.out::print, e -> {},
                                () -> System.out.println("!"));
        }
}

آموزش Subject ها

Subject ها در واقع یک نوع پل ارتباطی یا پیشکار (proxy) هستند که خود هر دو نقش Observable و Observer را یکجا ایفا می کنند. Subject ها را می توان به عنوان یک pipe یا پل ارتباطی که داده ها را تبدیل و ترجمه می کنند در نظر گرفته و مورد استفاده قرار داد. به عنوان مثال می توان به PublishSubject اشاره کرد. همین که داده ای به PublishSubject ارسال می شود، بلافاصله این داده به بیرون ارسال می گردد.
به عبارت دیگر Subject یک آبجکت در RxJava هست که از تمامی ویژگی ها و property های Observable و Observer/Subscriber برخوردار می باشد. هم می تواند به Observable ها گوش داده و از آن ها داده تحویل بگیرد و هم قادر است داده ها را به Observer هایی که به آن گوش می دهند، داده به عنوان خروجی ارسال کند. همچنین می تواند داده هایی را با فراخوانی مستقیم متد onNext()ارسال کند. از این طریق، Subject می تواند نقش واسط را ایفا کند که داده ها را دریافت کرده، عملیاتی را بروی آن انجام می دهد و سپس آن را به گوش فراخوان یا subscriber های خود ارسال می کند.

آموزش آبجکت Single یا همان Promise (مکان نگهدار مقدار مورد نظر)

مدل برنامه نویسی reactive برای برقراری ارتباط دو طرفه و واکنشی از دو مفهوم Observer ها و Observable ها استفاده می کند.
امروزه استفاده از promise ها در فراخوانی های ناهمزمان (async) به ویژه همراه با زبان سمت سرویس گیرنده ی JavaScript، از محبوبیت ویژه و کاربرد فراوانی برخودار شده است. Promise در اصل جانگهدار یا placeholder برای یک مقدار دلخواه است که observer ها یا متدهای گوش فرا دهنده ی خود را به مجرد دریافت مقدار مورد انتظار و به اصطلاح موعود با خبر می سازد.
RxJava نیز آبجکت یا نوعی به نام Single دارد که از لحاظ کاربرد بسیار شبیه به Promise است.
نمونه یا آبجکت هایی که از کلاس Single ساخته می شود در واقع وظیفه ای مشابه وظیفه ی آبجکت Observable ایفا می نماید، با این تفاوت که دو متد بازفراخوان بیشتر به نام های onSuccess() و onError() ندارد.

import io.reactivex.Single;
public Single> getTodos() {
        return Single.create(emitter -> {
                Thread thread = new Thread(() -> {
                        try {
                                List todosFromWeb = // query a webservice
                                emitter.onSuccess(todosFromWeb);
                        } catch (Exception e) {
                                emitter.onError(e);
                        }
                });
                thread.start();
        });
}

نحوه ی استفاده از نمونه ی کلاس Single در قطعه کد زیر به نمایش گذاشته شده است.

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
Single> todosSingle = getTodos();
todosSingle.subscribeWith(new DisposableSingleObserver>() {
        @Override
        public void onSuccess(List todos) {
                // work with the resulting todos
        }
        @Override
        public void onError(Throwable e) {
                // handle the error case
        }
});

آموزش دور انداختن subscription ها (قطع اتصال بین observer و observable) و استفاده از کلاس CompositeDisposable

گوش فراخوان ها یا subscriber هایی که به آبجکت مربوطه متصل بوده و از تغییرات آن آگاه می شوند، طبیعتا قرار نیست تا ابد به همین کار ادامه دهند. برای مثال هم ممکن است به خاطر تغییر کوچکی در وضعیت، event ارسالی از کلاس Observable دیگر مورد نیاز و درخور توجه نباشد.

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
Single> todosSingle = getTodos();
Disposable disposable = todosSingle.subscribeWith(new DisposableSingleObserver>() {
        @Override
        public void onSuccess(List todos) {
                // work with the resulting todos
        }
        @Override
        public void onError(Throwable e) {
                // handle the error case
        }
});
// continue working and dispose when value of the Single is not interesting any more
disposable.dispose();
توجه:

کلاس Single و دیگر کلاس های observable توابع متعددی جهت subscribe و گوش فراخوانی به تغییرات ارائه می دهند که همگی در خروجی یک آبجکت Disposable بازگردانی می نمایند.

به هنگام کار با چندین subscription (نشانگر ارتباط دو طرفه بین observable و observer) که ممکن است به دلیل تغییر در وضعیت، رخداد ارسالی از آن ها دیگر لازم و جالب توجه نباشد، می توان با بهره گیری از کلاس CompositeDisposable همزمان چندین subscription را دور انداخته و به اصطلاح از حافظه پاک نمود (dispose).

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.disposables.CompositeDisposable;
CompositeDisposable compositeDisposable = new CompositeDisposable();
Single> todosSingle = getTodos();
Single happiness = getHappiness();
compositeDisposable.add(todosSingle.subscribeWith(new DisposableSingleObserver>() {
        @Override
        public void onSuccess(List todos) {
                // work with the resulting todos
        }
        @Override
        public void onError(Throwable e) {
                // handle the error case
        }
}));
compositeDisposable.add(happiness.subscribeWith(new DisposableSingleObserver>() {
        @Override
        public void onSuccess(Happiness happiness) {
                // celebrate the happiness :-D
        }
        @Override
        public void onError(Throwable e) {
                System.err.println("Don't worry, be happy! :-P");
        }
}));
// continue working and dispose all subscriptions when the values from the Single objects are not interesting any more
compositeDisposable.dispose();

آموزش ذخیره ی موقتی مقدار Observable های ارسال شده (completed obervables)

هنگام کار با چند Observable و فراخوانی های ناهمزمان تمامی subscription های یک observable (observer هایی که برای گوش دادن به آن ثبت شده اند) ضروری نبوده و منابع زیادی را مصرف می کند.
Observable ها معمولا به بخش های مختلف اپلیکیشن پاس داده می شوند و هر بار لازم نیست با اضافه شدن یک subscription (و گوش دادن یک observer به observable) این فراخوانی سنگین اتفاق بیافتد.
کد زیر چهار با از یک وب سرویس کوئری می گیرد، هر چند یک بار کوئری گرفتن هم کفایت می کند. در واقع همان آبجکت های Todo نمایش داده می شوند اما هر بار به روش های مختلف.

Single> todosSingle = Single.create(emitter -> {
        Thread thread = new Thread(() -> {
                try {
                        List todosFromWeb = // query a webservice
                        System.out.println("Called 4 times!");
                        emitter.onSuccess(todosFromWeb);
                } catch (Exception e) {
                        emitter.onError(e);
                }
        });
        thread.start();
});
todosSingle.subscribe(... " Show todos times in a bar chart " ...);
showTodosInATable(todosSingle);
todosSingle.subscribe(... " Show todos in gant diagram " ...);
anotherMethodThatsSupposedToSubscribeTheSameSingle(todosSingle);

تکه کد بعدی از متد cache استفاده می کند، به همین خاطر آبجکت Single، پس از اینکه ارسال برای بار اول با موفقیت انجام شد، در حافظه ی موقت ذخیره می شود و از وب سرویس تنها یک بار کوئری گرفته می شود.

Single> todosSingle = Single.create(emitter -> {
        Thread thread = new Thread(() -> {
                try {
                        List todosFromWeb = // query a webservice
                        System.out.println("I am only called once!");
                        emitter.onSuccess(todosFromWeb);
                } catch (Exception e) {
                        emitter.onError(e);
                }
        });
        thread.start();
});
// cache the result of the single, so that the web query is only done once
Single> cachedSingle = todosSingle.cache();
cachedSingle.subscribe(... " Show todos times in a bar chart " ...);
showTodosInATable(cachedSingle);
cachedSingle.subscribe(... " Show todos in gant diagram " ...);
anotherMethodThatsSupposedToSubscribeTheSameSingle(cachedSingle);

آموزش استفاده از Flowable و Backpressure

در RxJava ممکن است به کرات با شرایطی مواجه شوید که در آن تعداد آیتم های ارسالی از یک Observable بیشتر از میزانی است که یک Observer قادر به استفاده از آن هست. در چنین شرایطی این مسئله مطرح می شود که چگونه می توان این انباشت آیتم های استفاده نشده را مدیریت نمود. به این رخداد در اصطلاح برنامه نویسی back pressure گفته می شود.
RxJava 2.0 یک نوع جدید به نام Flowable می باشد با این تفاوت که Flowable قابلیت مدیریت back pressure دارد و Observable همچنین امکانی را فراهم نمی کند.
در RxJava 1.0 این مفهوم کمی دیر به نوع Observable اضافه شد. اما یک مشکل وجود داشت: برخی خطای زمان اجرای MissingBackpressureException را می دادند. به همین خاطر Flowable اضافه شده و این مشکل را برطرف نمود.
جدا از نوع Observable، نوع هایی همچون Maybe، Single و Completable نیز back pressure را اصلا پشتیبانی نمی کنند.

تبدیل نوعی به نوع دیگر

می توان به راحتی یک نوع از RxJava را به نوع دیگر تبدیل نمود.

Table 1. Conversion between types
Completable
Single
Maybe
Observable
Flowable
From / To
ignoreElements()
scan()
elementAt()
first()/firstOrError()
last()/lastOrError()
single()/singleOrError()
all()/any()/count()
(and more…)
reduce()
elementAt()
firstElement()
lastElement()
singleElement()
toObservable()
Flowable
ignoreElements()
scan()
elementAt()
first()/firstOrError()
last()/lastOrError()
single()/singleOrError()
all()/any()/count()
(and more…)
reduce()
elementAt()
firstElement()
lastElement()
singleElement()
toFlowable()
Observable
toCompletable()
toSingle()
sequenceEqual()
toObservable()
toFlowable()
Maybe
toCompletable()
toMaybe()
toObservable()
toFlowable()
Single
toSingle()
toSingleDefault()
toMaybe()
toObservable()
toFlowable()
Completable

آموزش و تست Observable و Subscription های RxJava

آموزش و تست observable ها

کتابخانه ی RxJava یک کلاس به نام TestSubscriber در اختیار توسعه دهنده قرار می دهد که با استفاده از آن می توان یک observable را تست نمایید.

Observable obs = ...// assume creation code here
TestSubscriber testSubscriber = new TestSubscriber<>();
obs.subscribe(testSubscriber);
testSubscriber.assertNoErrors();
List chickens = testSubscriber.getOnNextEvents();
// TODO assert your string integrity...
                    

RxJava مکانیزمی را در اختیار شما قرار می دهد که به واسطه ی آن می توانید scheduler های ارائه (exposed) شده را طوری بازنویسی نمایید که به صورت همزمان (sync) فراخوانی شوند. برای مثالی در این زمینه می توانید به آدرس http://fedepaol.github.io/blog/2015/09/13/testing-rxjava-observables-subscriptions/ مراجعه نمایید.

  • 3262
  •    1228
  • تاریخ ارسال :   1395/12/08

دانلود PDF دانشجویان گرامی اگر این مطلب برای شما مفید بود لطفا ما را در GooglePlus محبوب کنید
رمز عبور: tahlildadeh.com یا www.tahlildadeh.com
ارسال دیدگاه نظرات کاربران
شماره موبایل دیدگاه
عنوان پست الکترونیک

ارسال

آموزشگاه برنامه نویسی تحلیل داده
آموزشگاه برنامه نویسی تحلیل داده

تمامی حقوق این سایت متعلق به آموزشگاه تحلیل داده می باشد .