مشخصات مقاله
-
2171
-
0.0
-
6769
-
0
-
0
استفاده از RxJava 2.0
آموزش RxJava 2.0
آموزش RxJava و شرح مفهوم برنامه نویسی Reactive یا ناهمزمان
هر برنامه ای که دارای چهار ویژگی یا پارامتر (قابلیت مدیریت خطاها و ارائه ی بهترین سرویس) resilience، (انعطاف و مقایس پذیری) scalability، (واکنش گرا و تعاملی) responsive و (رویداد محور) Event-driven باشد را در اصطلاح reactive می نامند. در مدل برنامه نویسی reactive، سرویس گیرنده یا کاربر به محض ورود داده به آن واکنش نشان می دهند. به همین خاطر برنامه نویسی asynch را برنامه نویسی reactive نیز می خوانند. مدل برنامه نویسی reactive این امکان را فراهم می آورد تا تغییرات event ها را به observer ها (توابع) گوش فرا دهنده و ثبت شده، منتشر (propagate) و اعمال نمایید.
در واقع RxJava امکان پیاده سازی وظایف غیرهمزمان را به راحتی برای توسعه دهنده فراهم می آورد.
RxJava پیاده سازی از ReativeX برای Java می باشد که NetFlix توسعه داده و در بین برنامه نویسان جاوا محبوبیت زیادی پیدا کره است. Rxjava در سال 2014 کد باز (open source) شده و تحت آدرس http://reactivex.io/ میزبانی می شود. ورژن جاوایی آن تحت https://github.com/ReactiveX/RxJava قابل دسترسی می باشد. این کتابخانه تحت لیسانس Apache 2.0 منتشر می شود.
RxJava خود را تحت عنوان یک API که برای برنامه نویسی ناهمزمان طراحی شده (تعریف تسک ها و وظایف ناهمزمان) و از stream observable (آبجکت های observable اطلاعاتی را منتشر کرده و آن ها را به توابع گوش فرادهنده به تغییرات که همان observer ها هستند ارسال می کنند) استفاده می کند، معرفی می نماید.
آموزش اضافه کردن کتابخانه RxJava 2.0
از زمان نگارش این مقاله، ویرایش جدیدتری از کتابخانه ی RxJava ارائه شده است. مقدار g.a.y را با 2.0.1 یا نسخه ی جدیدتر جایگزین نمایید.
برای افزودن کتابخانه ی مزبور ویژه ی سیستم کامپایل Maven، می توانید تکه کد زیر را در بخش dependency ها اضافه نمایید.
io.reactivex.rxjava2 rxjava g.a.v
برای سیستم کامپایل Gradle، می توانید RxJava را از طریق دستور زیر اضافه نمایید.
compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: 'g.a.v'
آموزش برنامه نویسی ناهمزمان (Async)
امروزه برنامه نویسی به سبک دستوری/imperative طوری که اپلیکیشن در لحظه تنها یک عملیات را پردازش کند (single-thread) دیگر به هیچ وجه کارامد نیست و ممکن است UI های unresponsive را مسدود کند و در نهایت تجربه ی کاربری ضعیفی را در پی داشته باشد.
با مدیریت رخدادهای پیش بینی نشده به صورت ناهمزمان (async)، می توان از این اتفاق جلوگیری کرد. برای مثال، چنانچه لازم باشد منتظر فراخوانی وب سرویس یا کوئری دیتابیس باشید، اگر شبکه پاسخگو نبود (responsive)، قطعا اپلیکیشن هنگ خواهد کرد.
مثالی در این زمینه:
public ListgetTodos() { List todosFromWeb = // query a webservice (with bad network latency) return todosFromDb; }
فراخوانی متد getTodos() از thread اصلی یا یک UI thread سبب می شود تا زمان رسیدن todosFromWeb، اپلیکیشن nonresponsive بوده و تجربه ی کاربری ضعیفی برای کاربر رقم خورده شود.
برای بهبود کارایی این query که مدت زمان اجرای آن مشخص نیست، می بایست آن را در یک thread دیگر اجرا نموده و سپس به هنگام رسیدن نتیجه، thread اصلی را مطلع نمایید.
public void getTodos(Consumer<>> todosConsumer) { Thread thread = new Thread(()-> { List todosFromWeb = // query a webservice todosConsumer.accept(todosFromWeb); }); thread.start(); }
حال پس از فراخوانی متد getTodos(Consumer<>
RxJava از مفهوم Observable ها و Observer ها استفاده می کند. Observer ها توابعی هستند که با گوش فراخوانی و subscribe کردن به Observable ها که آبجکت هستند، از تغییرات رخ داده بلافاصله آگاه شده و آن تغییرات را در خود منعکس می نمایند. Observer ها به مجرد اینکه یک observable مقداری را emit (ارسال) می کند، از آن مطلع می شوند. همچنین زمانی که observable اطلاعاتی را مبنی بر اینکه دیگر مقادیری وجود ندارد ارسال می کند، observer ها از آن مطلع می شوند. زمانی هم که observable با خطایی مواجه می شود، متد onError(Throwable e) خورده شده و باز observer از آن آگاه می شود. توابع مربوطه onNext()، onError() و onCompleted() هستند که همگی از interface ای به نام Observer ارث بری می شوند. آبجکت یا نمونه ای از Subscription بیانگر اتصال و ارتباط بین یک observer و observable است. اگر متد unsubscribe() را بر روی این نمونه صدا بزنید، observable از عضویت متد بیرون آمده و اتصال بین آن ها قطع می شود. این متد می تواند زمانی مفید باشد که شما می خواهید بروز رسانی و آپدیت یک المان رابط کاربری یا widget خاتمه یافته (dispose) و از حافظه حذف شود. می توانید thread ای که observable در آن قرار است اجرا شود را با فراخوانی متد subscribeOn() تعریف نمایید و thread ای که observer ها در آن اجرا می شوند را با اجرای متد observeOn() اعلان کنید. می توانید عملیاتی را بر روی observer های خود ثبت کنید که به شما اجازه می دهد نتیجه و emission یک observable را پیش از ارسال آن به observer ویرایش و دستکاری نمایید. متد map به شما این امکان را می دهد یک Func1 ثبت کنید که ورودی را ترجمه و تبدیل می کند. با فراخوانی متد debounce(delay, TimeUnit.MILLISECONDS) بر روی observer، می توانید اعلان کنید که observer تنها زمانی تغییرات را ارسال یا به عنوان نتیجه ارسال (emit) نماید که مقدار مورد نظر پس از گذشت مدت زمان از پیش تعیین شده تغییر نکرده باشد. RxJava متدهای زیادی برای ایجاد observable در اختیار توسعه دهنده قرار می دهد. این متدها عبارت اند از: تابع Observable.just() یک Observable ایجاد می کند و زمانی که یک observer به این Observable گوش می دهد یا برای آن subscribe می کند، متد onNext() از Observer با آرگومان ورودی Observable.just() فراخوانی می شود. در زیر مثال ساده ای از RxJava را مشاهده می کنید که نوعی عملیات تبدیل در آن رخ داده است.
Subject ها در واقع یک نوع پل ارتباطی یا پیشکار (proxy) هستند که خود هر دو نقش Observable و Observer را یکجا ایفا می کنند. Subject ها را می توان به عنوان یک pipe یا پل ارتباطی که داده ها را تبدیل و ترجمه می کنند در نظر گرفته و مورد استفاده قرار داد. به عنوان مثال می توان به PublishSubject اشاره کرد. همین که داده ای به PublishSubject ارسال می شود، بلافاصله این داده به بیرون ارسال می گردد.
مدل برنامه نویسی reactive برای برقراری ارتباط دو طرفه و واکنشی از دو مفهوم Observer ها و Observable ها استفاده می کند. نحوه ی استفاده از نمونه ی کلاس Single در قطعه کد زیر به نمایش گذاشته شده است. گوش فراخوان ها یا subscriber هایی که به آبجکت مربوطه متصل بوده و از تغییرات آن آگاه می شوند، طبیعتا قرار نیست تا ابد به همین کار ادامه دهند. برای مثال هم ممکن است به خاطر تغییر کوچکی در وضعیت، event ارسالی از کلاس Observable دیگر مورد نیاز و درخور توجه نباشد. کلاس Single و دیگر کلاس های observable توابع متعددی جهت subscribe و گوش فراخوانی به تغییرات ارائه می دهند که همگی در خروجی یک آبجکت Disposable بازگردانی می نمایند. به هنگام کار با چندین subscription (نشانگر ارتباط دو طرفه بین observable و observer) که ممکن است به دلیل تغییر در وضعیت، رخداد ارسالی از آن ها دیگر لازم و جالب توجه نباشد، می توان با بهره گیری از کلاس CompositeDisposable همزمان چندین subscription را دور انداخته و به اصطلاح از حافظه پاک نمود (dispose).
هنگام کار با چند Observable و فراخوانی های ناهمزمان تمامی subscription های یک observable (observer هایی که برای گوش دادن به آن ثبت شده اند) ضروری نبوده و منابع زیادی را مصرف می کند. تکه کد بعدی از متد cache استفاده می کند، به همین خاطر آبجکت Single، پس از اینکه ارسال برای بار اول با موفقیت انجام شد، در حافظه ی موقت ذخیره می شود و از وب سرویس تنها یک بار کوئری گرفته می شود.
در RxJava ممکن است به کرات با شرایطی مواجه شوید که در آن تعداد آیتم های ارسالی از یک Observable بیشتر از میزانی است که یک Observer قادر به استفاده از آن هست. در چنین شرایطی این مسئله مطرح می شود که چگونه می توان این انباشت آیتم های استفاده نشده را مدیریت نمود. به این رخداد در اصطلاح برنامه نویسی back pressure گفته می شود.
می توان به راحتی یک نوع از RxJava را به نوع دیگر تبدیل نمود. کتابخانه ی RxJava یک کلاس به نام TestSubscriber در اختیار توسعه دهنده قرار می دهد که با استفاده از آن می توان یک observable را تست نمایید. RxJava مکانیزمی را در اختیار شما قرار می دهد که به واسطه ی آن می توانید scheduler های ارائه (exposed) شده را طوری بازنویسی نمایید که به صورت همزمان (sync) فراخوانی شوند. برای مثالی در این زمینه می توانید به آدرس http://fedepaol.github.io/blog/2015/09/13/testing-rxjava-observables-subscriptions/ مراجعه نمایید. آموزش Observable ها، Observer ها و Subscription ها
آموزش thread اجرا و یک thread برای گوش دادن به تغییرات (observe)
آموزش Operator ها
آموزش استفاده از delay و به تعویق انداختن تولید و ارسال نتیجه
آموزش ایجاد Observable ها و Observer ها
به منظور ساخت observer ها:
مثالی از فراخوانی متد Observable.just()
import java.util.Arrays;
import java.util.List;
import rx.Observable;
public class RxJavaExample {
public static void main(String[] args) {
List
آموزش انجام عملیات تبدیل
import java.util.Arrays;
import java.util.List;
import rx.Observable;
public class RxJavaExample {
public static void main(String[] args) {
List
آموزش Subject ها
به عبارت دیگر Subject یک آبجکت در RxJava هست که از تمامی ویژگی ها و property های Observable و Observer/Subscriber برخوردار می باشد. هم می تواند به Observable ها گوش داده و از آن ها داده تحویل بگیرد و هم قادر است داده ها را به Observer هایی که به آن گوش می دهند، داده به عنوان خروجی ارسال کند. همچنین می تواند داده هایی را با فراخوانی مستقیم متد onNext()ارسال کند. از این طریق، Subject می تواند نقش واسط را ایفا کند که داده ها را دریافت کرده، عملیاتی را بروی آن انجام می دهد و سپس آن را به گوش فراخوان یا subscriber های خود ارسال می کند.
آموزش آبجکت Single یا همان Promise (مکان نگهدار مقدار مورد نظر)
امروزه استفاده از promise ها در فراخوانی های ناهمزمان (async) به ویژه همراه با زبان سمت سرویس گیرنده ی JavaScript، از محبوبیت ویژه و کاربرد فراوانی برخودار شده است. Promise در اصل جانگهدار یا placeholder برای یک مقدار دلخواه است که observer ها یا متدهای گوش فرا دهنده ی خود را به مجرد دریافت مقدار مورد انتظار و به اصطلاح موعود با خبر می سازد.
RxJava نیز آبجکت یا نوعی به نام Single دارد که از لحاظ کاربرد بسیار شبیه به Promise است.
نمونه یا آبجکت هایی که از کلاس Single ساخته می شود در واقع وظیفه ای مشابه وظیفه ی آبجکت Observable ایفا می نماید، با این تفاوت که دو متد بازفراخوان بیشتر به نام های onSuccess() و onError() ندارد.
import io.reactivex.Single;
public Single<>
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
Single<>
آموزش دور انداختن subscription ها (قطع اتصال بین observer و observable) و استفاده از کلاس CompositeDisposable
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
Single<>
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.disposables.CompositeDisposable;
CompositeDisposable compositeDisposable = new CompositeDisposable();
Single<>
آموزش ذخیره ی موقتی مقدار Observable های ارسال شده (completed obervables)
Observable ها معمولا به بخش های مختلف اپلیکیشن پاس داده می شوند و هر بار لازم نیست با اضافه شدن یک subscription (و گوش دادن یک observer به observable) این فراخوانی سنگین اتفاق بیافتد.
کد زیر چهار با از یک وب سرویس کوئری می گیرد، هر چند یک بار کوئری گرفتن هم کفایت می کند. در واقع همان آبجکت های Todo نمایش داده می شوند اما هر بار به روش های مختلف.
Single<>
Single<>
آموزش استفاده از Flowable
RxJava 2.0 یک نوع جدید به نام Flowable
در RxJava 1.0 این مفهوم کمی دیر به نوع Observable
جدا از نوع Observableتبدیل نوعی به نوع دیگر
elementAt()
first()/firstOrError()
last()/lastOrError()
single()/singleOrError()
all()/any()/count()
(and more…)
elementAt()
firstElement()
lastElement()
singleElement()
elementAt()
first()/firstOrError()
last()/lastOrError()
single()/singleOrError()
all()/any()/count()
(and more…)
elementAt()
firstElement()
lastElement()
singleElement()
sequenceEqual()
toSingleDefault()
آموزش و تست Observable و Subscription های RxJava
آموزش و تست observable ها
Observable