В этой статье я буду собирать интересные и полезные решения, которые реализованы с помощью RxJava. Это будут решения как из моей практики, так и из различных статей и книг. Если у вас есть, чем поделиться, или какие-то вопросы - пишите в наш Telegramm чат RxJava.

 

 

 

Общая информация

Материал рассчитан на тех, кто уже имеет какой-то опыт работы с RxJava. Если же вы пока совсем ничего не знаете по этой теме, посмотрите два первых урока моего курса: Урок 1 и Урок 2. По ним вы сможете примерно понять механизмы RxJava. А в последующих уроках курса рассмотрены уже более продвинутые вещи.

В примерах использовалась RxJava версии 2. Но в целом будет работать и на первой версии.

Код примеров представлен в двух вариантах: полном и с лямбдами, чтобы каждый мог смотреть примеры в удобном ему варианте.

Статья будет периодически пополняться новыми примерами.

 

 

Список примеров: 

От списка к отдельным элементам (flatMap, fromIterable)

Использование одного из нескольких источников (concat, first, filter)

Данные второго Observable зависят от данных первого Observable (flatMap, fromIterable)

Данные второго Observable зависят от данных первого Observable и соединяются друг с другом в общем Observable (flatMap, fromIterable)

Соединение данных из нескольких Observable в один общий (flatMap, fromIterable, zip)

Обработка ввода текста в строку поиска (switchMap, debounce)

Повтор при ошибке (retryWhen, take, delay, range, zip, just, error, flatMap)

Периодический повтор операции (repeatWhen, delay)

 

 

  

 

От списка к отдельным элементам

Используемые операторы: flatMap, fromIterable

 

У нас есть метод getUsers, который возвращает список пользователей.

Observable<List<User>> getUsers();

 

Нам нужно от списка пользователей перейти к отдельным пользователям. Т.е. из Observable<List<User>> получить Observable<User>, чтобы дальше работать с отдельными пользователями. Для этого можно использовать flatMap и fromIterable.

getUsers()
       .flatMap(new Function<List<User>, ObservableSource<User>>() {
           @Override
           public ObservableSource<User> apply(List<User> users) throws Exception {
               return Observable.fromIterable(users);
           }
       })
       .subscribe(new Consumer<User>() {
           @Override
           public void accept(User user) throws Exception {
               saveUser(user);
           }
       });

Метод fromIterable возьмет список и создаст из него Observable с отдельными элементами списка, т.е. из List<User> мы получим Observable<User>.

Оператор flatMap раскроет получившийся Observable<User> и запостит его элементы далее в поток .

В итоге, в метод saveUser будут приходить отдельные объекты User.

 

Лямбда версия того же кода:

getUsers()
       .flatMap(users -> Observable.fromIterable(users))
       .subscribe(user -> saveUser(user));

 

 

  

 

Использование одного из нескольких источников

Используемые операторы: concat, firstfilter

 

Это вполне распространенный сценарий, когда нам необходимо получать данные. Мы смотрим сначала в кэш, если там пусто, то смотрим в БД, если и там пусто, то идем на сервер.

 

У нас есть три репозитория, которые возвращают нам список пользователей. Мы получаем от них три Observable.

Observable<List<User>> cacheUsers = cacheRepository.getUsers();
Observable<List<User>> dbUsers = databaseRepository.getUsers();
Observable<List<User>> networkUsers = networkRepository.getUsers();

Если данные в репозитории есть, то Observable отправит их нам в (onNext) и завершит работу (onCompleted). Если же данных нет, то Observable сразу вызовет onCompleted.

 

Соединяем три Observable с помощью concat.

Observable.concat(cacheUsers, dbUsers, networkUsers)
       .first(Collections.emptyList())
       .subscribe(new Consumer<List<User>>() {
           @Override
           public void accept(List<User> users) throws Exception {
               showUsers(users);
           }
       });

Оператор concat будет последовательно получать данные из этих Observable и передавать их дальше. Т.е. сначала пойдут данные из cacheUsers, затем из dbUsers, затем из networkUsers. Если в каком-то из Observable нет данных, он будет просто пропущен.

Может так случиться, что во всех трех Observable будут данные. И мы получим их всех. Чтобы избежать этого и получить только одни данные, мы используем оператор first. Он пропустит только первые данные, а затем завершит всю цепочку.

Т.е. если в cacheUsers были данные, мы получим их, а остальное (dbUsers и networkUsers) будет проигнорировано.

Если в cacheUsers данных не было, но они были в dbUsers, то мы получим их, а networkUsers будет проигнорирован.

А если cacheUsers и dbUsers были пусты, то мы получим данные из networkUsers.

Если все три Observable ничего не вернули, то мы получим пустой список, который мы указали, как дефолтное значение, в операторе first.

 

Лямбда запись:

Observable.concat(cacheUsers, dbUsers, networkUsers)
       .first(Collections.emptyList())
       .subscribe(users -> showUsers(users));

 

 

Может быть так, что Observable в случае отсутствия данных выполняет не onCompleted, а onNext с пустым списком. Тогда concat из предыдущего примера вернет нам этот пустой список. Это неправильно. Нам надо игнорировать пустой список и смотреть следующий репозиторий.

В этом случае нам поможет оператор filter, который не пропустит пустые списки.

Observable.concat(cacheUsers, dbUsers, networkUsers)
       .filter(new Predicate<List<User>>() {
           @Override
           public boolean test(List<User> users) throws Exception {
               return !users.isEmpty();
           }
       })
       .first(Collections.emptyList())
       .subscribe(new Consumer<List<User>>() {
           @Override
           public void accept(List<User> users) throws Exception {
               showUsers(users);
           }
       });

  

Лямбда запись:

Observable.concat(cacheUsers, dbUsers, networkUsers)
       .filter(users -> !users.isEmpty())
       .first(Collections.emptyList())
       .subscribe(users -> showUsers(users));

 

 

  

 

Данные второго Observable зависят от данных первого Observable

Используемые операторы: flatMap, fromIterable

 

В репозитарии есть два метода: получение списка пользователей и получение детальной информации по пользователю.

Observable<List<User>> getUsers();
Observable<UserDetails> getUserDetails(User user);

 

Необходимо для всех пользователей из списка getUsers получить детальные данные методом getUserDetails.

networkRepository.getUsers()
       .flatMap(new Function<List<User>, ObservableSource<User>>() {
           @Override
           public ObservableSource<User> apply(List<User> users) throws Exception {
               return Observable.fromIterable(users);
           }
       })
       .flatMap(new Function<User, ObservableSource<UserDetails>>() {
           @Override
           public ObservableSource<UserDetails> apply(User user) throws Exception {
               return networkRepository.getUserDetails(user);
           }
       })
       .subscribe(new Consumer<UserDetails>() {
           @Override
           public void accept(UserDetails userDetails) throws Exception {
               saveUserDetails(userDetails);
           }
       });

Первый flatMap разделит список, полученный из getUsers, на отдельных пользователей.

Далее следующий flatMap вызовет метод getUserDetails для каждого пользователя, и дальше в поток пойдут уже объекты UserDetails.

В методе subscribe мы получаем UserDetails и выполняем необходимую операцию.

 

Лямбда запись:

networkRepository.getUsers()
        .flatMap(users -> Observable.fromIterable(users))
        .flatMap(user -> networkRepository.getUserDetails(user))
        .subscribe(userDetails -> saveUserDetails(userDetails));

 

 

  

 

Данные второго Observable зависят от данных первого Observable и соединяются друг с другом в общем Observable

Используемые операторы: flatMapfromIterable

 

В репозитарии есть два метода: получение списка пользователей и получение детальной информации по пользователю.

Observable<List<User>> getUsers();
Observable<UserDetails> getUserDetails(User user);

 

Нам снова необходимо для всех пользователей из списка getUsers получить детальные данные методом getUserDetails. Но теперь нам надо еще и соединить вместе User и UserDetails. Т.е. у класса User есть метод:

setDetails(UserDetails userDetails)

и для каждого объекта User из списка getUsers нам надо передать в этот метод полученный UserDetails.

 

Это делается расширенной версией оператора flatMap

networkRepository.getUsers()
       .flatMap(new Function<List<User>, ObservableSource<User>>() {
           @Override
           public ObservableSource<User> apply(List<User> users) throws Exception {
               return Observable.fromIterable(users);
           }
       })
       .flatMap(
               new Function<User, ObservableSource<UserDetails>>() {
                   @Override
                   public ObservableSource<UserDetails> apply(User user) throws Exception {
                       return networkRepository.getUserDetails(user);
                   }
               }, new BiFunction<User, UserDetails, User>() {
                   @Override
                   public User apply(User user, UserDetails userDetails) throws Exception {
                       user.setDetails(userDetails);
                       return user;
                   }
               })
       .subscribe(new Consumer<User>() {
           @Override
           public void accept(User user) throws Exception {
               saveUser(user);
           }
       });

Первый flatMap разделит список, полученный из getUsers, на отдельных пользователей.

У следующего flatMap мы используем две функции. Первая вызовет метод getUserDetails для каждого объекта User и получит объект UserDetails. Оба этих объекта (User и UserDetails) пойдут во вторую функцию, и тем самым нам дается возможность соединить их. В нашем примере, мы вызываем метод user.setDetails и возвращаем этот же User объект. Он пойдет дальше в поток.

В методе subscribe мы получаем User (с присоединенным к нему UserDetails) и выполняем необходимую операцию.

 

Лямбда запись

networkRepository.getUsers()
       .flatMap(users -> Observable.fromIterable(users))
       .flatMap(
               user -> networkRepository.getUserPhoto(user),
               (user, userPhoto) -> {
                   user.setPhoto(userPhoto);
                   return user;
               })
       .subscribe(user -> saveUser(user));

 

 

  

 

Соединение данных из нескольких Observable в один общий

Используемые операторы: flatMapfromIterable, zip

 

В репозитарии есть три метода: получение списка пользователей, получение детальной информации по пользователю и получение фото пользователя.

Observable<List<User>> getUsers();
Observable<UserDetails> getUserDetails(User user);
Observable<UserPhoto> getUserPhoto(User user);

 

Нам необходимо для каждого пользователя из списка getUsers получить детальные данные и фото. Для хранения данных пользователя будем использовать класс UserData.

Сначала отдельно напишем метод getUserData, где будем для User получать UserDetails и UserPhoto и складывать их в UserData.

private ObservableSource<UserData> getUserData(User user) {
   return Observable.zip(networkRepository.getUserDetails(user), networkRepository.getUserPhoto(user),
           new BiFunction<UserDetails, UserPhoto, UserData>() {
               @Override
               public UserData apply(UserDetails userDetails, UserPhoto userPhoto) throws Exception {
                   return new UserData(userDetails, userPhoto);
               }
           });
}

В оператор zip мы передаем два Observable, которые вернут UserDetails и UserPhoto для указанного пользователя. Получив эти данные, zip предоставит нам возможность соединить их в один общий объект UserData и создаст Observable с этим объектом.

 

Лямбда запись

private ObservableSource<UserData> getUserData(User user) {
   return Observable.zip(networkRepository.getUserDetails(user), networkRepository.getUserPhoto(user),
           (userDetails, userPhoto) -> new UserData(userDetails, userPhoto));
}

 

В методе getUserData мы соединили два Observable в один. Теперь используем этот метод в итоговом Observable.

networkRepository.getUsers()
       .flatMap(new Function<List<User>, ObservableSource<User>>() {
           @Override
           public ObservableSource<User> apply(List<User> users) throws Exception {
               return Observable.fromIterable(users);
           }
       })
       .flatMap(new Function<User, ObservableSource<UserData>>() {
           @Override
           public ObservableSource<UserData> apply(User user) throws Exception {
               return getUserData(user);
           }
       })
       .subscribe(new Consumer<UserData>() {
           @Override
           public void accept(UserData userData) throws Exception {
               saveUserData(userData);
           }
       });

Первый flatMap разделит список, полученный из getUsers, на отдельных пользователей.

Далее следующий flatMap вызовет метод getUserData для каждого пользователя и дальше в поток пойдут уже объекты UserData содержащие UserDetails и UserPhoto.

В методе subscribe мы получаем UserData и выполняем необходимую операцию.

 

Лямбда запись

networkRepository.getUsers()
       .flatMap(users -> Observable.fromIterable(users))
       .flatMap(user -> getUserData(user))
       .subscribe(userData -> saveUserData(userData));

 

 

  

 

Обработка ввода текста в строку поиска

Используемые операторы: switchMapdebounce

 

У нас есть экран поиска пользователей. По мере ввода текста в строку поиска нам необходимо запускать метод, который вернет нам список пользователей, удовлетворяющих запросу.

Пусть это будет метод searchUsers:

Observable<List<User>> searchUsers(String query);

На вход принимает строку и возвращает Observable со списком.

 

Чтобы по мере ввода символов в поле запроса мы могли получать полный текст запроса как Observable<String>, мы будем использовать Subject<String>.

PublishSubject<String> subject = PublishSubject.create();

А на EditText нам надо повесить TextWatcher, который будет ловить ввод новых символов и передавать полный текст запроса в subject. 

Либо вы можете просто использовать библиотеку RxBinding.

 

В этом сценарии есть пара нюансов, которые нам надо учитывать.

Во-первых, если пользователь будет быстро набирать текст, то мы будем запускать поиск при вводе каждого нового символа и вполне может получиться так, что будет несколько запусков в секунду. Это не очень хорошая практика, особенно если поиск выполняется на сервере.

Лучше мы будем после каждого символа ждать. Если прошла, например, одна секунда, значит пользователь скорее всего закончил ввод, и мы действительно можем запускать поиск.

Второй нюанс обсудим после просмотра кода.

 

Обработка текста из subject будет выглядеть так:

subject.debounce(1000, TimeUnit.MILLISECONDS)
       .switchMap(new Function<String, ObservableSource<List<User>>>() {
           @Override
           public ObservableSource<List<User>> apply(String query) throws Exception {
               return repository.searchUsers(query);
           }
       })
       .subscribe(new Consumer<List<User>>() {
           @Override
           public void accept(List<User> users) throws Exception {
               showUsers(users);
           }
       });

Оператор debounce при получении нового текста будет делать паузу в одну секунду. Если в течение этой секунды больше не будет новых данных, то последнее полученное значение пойдет дальше. Если же в течение этой секунды пришел новый текст, то предыдущий текст никуда не пойдет, а debounce снова сделает паузу в одну секунду. И т.д.

В переводе на нормальный язык - debounce будет отсеивать передаваемые ему данные, если между ними пауза меньше, чем указанное время. Это как раз то, что нам нужно, чтобы отсеять излишние срабатывания поиска.

Далее мы используем switchMap. Это аналог flatMap, но работает чуть по-другому.

 

Давайте предположим, что поиск работает примерно полторы секунды. Посмотрим на хронологию событий, как все происходило бы, если бы мы использовали flatMap.

0 мс. Пользователь вводит в строку поиска символ А. Оператор debounce получает его и включает паузу.

1000 мс - debounce выждал паузу и передает символ дальше. Запускается поиск по строке А. Т.е. flatMap запускает searchUsers(“А”) и подписывается на его Observable, чтобы получить результат поиска и передать его дальше в поток.

1100 мс - пользователь вводит символ Б. Оператор debounce получает текст АБ и включает паузу.

2100 мс - debounce выждал паузу и передает текст АБ дальше. Запускается поиск по строке АБ. flatMap запускает searchUsers(“АБ”) и подписывается на его Observable, чтобы получить результат поиска и передать его дальше в поток.

2500 мс - отработал поиск по строке А и вернул результаты. flatMap получил их и отправил дальше - в метод showUsers, и мы отобразили их на экране.

3600 мс - отработал поиск по строке АБ и вернул результаты. flatMap получил их и отправил дальше - в метод showUsers, и мы отобразили их на экране.

Обратите внимание что с 2500 по 3600 мс экран будет отображать результаты поиска по строке А, хотя в строке поиска в этом время уже был текст АБ. Это неправильно.

 

Нам нужно игнорировать результаты прошлого поиска, как только мы запускаем новый поиск. Для этого мы используем switchMap, а не flatMap. Хронология тех же событий, но с оператором switchMap будет выглядеть так:

0 мс. Пользователь вводит в строку поиска символ А. Оператор debounce получает его и включает паузу.

1000 мс - debounce выждал паузу и передает символ дальше. Запускается поиск по строке А. Т.е. switchMap запускает searchUsers(“А”) и подписывается на его Observable, чтобы получить результат поиска и передать его дальше в поток.

1100 мс - пользователь вводит символ Б. Оператор debounce получает текст АБ и включает паузу.

2100 мс - debounce выждал паузу и передает текст АБ дальше. Запускается поиск по строке АБ. switchMap запускает searchUsers(“АБ”) и подписывается на его Observable, чтобы получить результат поиска и передать его дальше в поток. Но! При этом switchMap отпишется от прошлого Observable, который он получил от searchUsers(“А”).

2500 мс - отработал поиск по строке А и вернул результаты. Но switchMap уже отписался от этого поиска и результат просто никуда не пойдет.

3600 мс - отработал поиск по строке АБ и вернул результаты. Они пошли дальше в метод showUsers и мы отобразили их на экране.

 

Т.е. switchMap в отличие от flatMap будет отписываться от прошлых поисков при запуске нового.

 

Лямбда запись

subject.debounce(1000, TimeUnit.MILLISECONDS)
       .switchMap(query -> repository.searchUsers(query))
       .subscribe(users -> showUsers(users));

 

 

  

 

Повтор при ошибке

Используемые операторы: retryWhen, take, delay, range, zip, just, error, flatMap

 

У нас есть метод getUsers, который возвращает список пользователей.

Observable<List<User>> getUsers();

 

Мы можем настроить Observable так, чтобы при ошибке он перезапускался определенное количество раз и через определенное время.

Для этого используется оператор retryWhen

repository.getUsers()
       .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
           @Override
           public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
               return throwableObservable.take(3).delay(1, TimeUnit.SECONDS);
           }
       })
       .subscribe(...);

Если из метода getUsers придет ошибка, то этот метод будет перезапущен через одну секунду. Если при перезапуске снова придет ошибка, то он будет еще раз перезапущен через одну секунду. В общем, он будет перезапускаться, пока не вернет успешный результат или количество попыток не достигнет трех.

Временной интервал мы указали в операторе delay, а количество попыток - в take.

 

Описать принцип работы оператора retryWhen достаточно сложно. В моем курсе RxJava есть отдельный урок по retry операторам и там я все подробно объясняю.

Если вкратце, то throwableObservable, который мы получаем в функции в retryWhen, - это Observable, куда будут приходить ошибки из getUsers. От нас требуется вернуть, как результат работы функции, Observable, который будет использован, как триггер перезапуска метода getUsers.

В нашем примере мы берем throwableObservable, добавляем к нему take и delay и возвращаем как результат работы функции. Соответственно, первые три (оператор take) ошибки из getUsers будут отложенным (оператор delay) сигналом к перезапуску getUsers.

 

Лямбда запись

repository.getUsers()
       .retryWhen(throwableObservable -> throwableObservable.take(3).delay(1, TimeUnit.SECONDS))
       .subscribe(...);

 

 

Но в этой схеме есть недостаток. Когда будет четвертая ошибка, мы не получим ее в onError. Вместо этого придет onComplete. И мы даже не узнаем, что что-то пошло не так.

Это исправляется следующим образом:

repository.getUsers()
       .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
           @Override
           public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
               return throwableObservable
                       .zipWith(Observable.range(1, RETRY_COUNT), new BiFunction<Throwable, Integer, Observable>() {
                           @Override
                           public Observable apply(Throwable throwable, Integer integer) throws Exception {
                               if (integer < RETRY_COUNT) {
                                   return Observable.just(0L);
                               } else {
                                   return Observable.error(throwable);
                               }
                           }
                       }).flatMap(new Function<Observable, ObservableSource<?>>() {
                           @Override
                           public ObservableSource<?> apply(Observable observable) throws Exception {
                               return observable;
                           }
                       });
           }
       })
       .subscribe(...);

Содержимое функции в retryWhen стало сложнее. Но весь этот код просто прокидывает ошибку из throwableObservable на верхний уровень, когда количество попыток достигает установленного максимума. И в итоге мы получим эту ошибку в onError обработчике в subscribe.

 

Лямбда запись

repository.getUsers()
       .retryWhen(throwableObservable -> throwableObservable
               .zipWith(Observable.range(1, RETRY_COUNT), (BiFunction<Throwable, Integer, Observable>) (throwable, integer) -> {
                   if (integer < RETRY_COUNT) {
                       return Observable.just(0L);
                   } else {
                       return Observable.error(throwable);
                   }
               }).flatMap(observable -> observable))
       .subscribe(...);

 

 

  

 

Периодический повтор операции

Используемые операторы: repeatWhendelay

 

У нас есть метод getUsers, который возвращает список пользователей.

Observable<List<User>> getUsers();

 

Нам необходимо, чтобы данные загружались с сервера раз в минуту и сохранялись в БД. Для повтора операции мы можем использовать оператор repeatWhen

repository.getUsers()
       .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
           @Override
           public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
               return objectObservable.delay(1, TimeUnit.MINUTES);
           }
       })
       .subscribe(new Consumer<List<User>>() {
           @Override
           public void accept(List<User> users) throws Exception {
               updateUsers(users);
           }
       });

Механизм repeatWhen похож на рассмотренный в предыдущем примере retryWhen. В функции мы получаем objectObservable, который будет постить void, когда из getUsers придет onComplete. Из objectObservable мы можем сделать Observable, элементы которого будут триггером для повторного запуска getUsers. Мы добавляем оператор delay с минутной задержкой. Это значит, что через минуту после каждого onComplete, пришедшего из getUsers, метод getUsers будет перезапущен.

  

Лямбда запись:

repository.getUsers()
       .repeatWhen(objectObservable -> objectObservable.delay(1, TimeUnit.MINUTES))
       .subscribe(users -> updateUsers(users));

 

 

Если вам необходимо количественно ограничить количество повторов, то используйте take для objectObservable.

objectObservable.delay(1, TimeUnit.MINUTES).take(5)

 

Если вам необходимо остановить повтор при получении каких-либо данных, то добавьте takeUntil

repository.getUsers()
        .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
                return objectObservable.delay(1, TimeUnit.MINUTES);
            }
        })
        .takeUntil(new Predicate<List<User>>() {
            @Override
            public boolean test(@NonNull List<User> users) throws Exception {
                return users.isEmpty();
            }
        })
        .subscribe(new Consumer<List<User>>() {
            @Override
            public void accept(List<User> users) throws Exception {
                updateUsers(users);
            }
        });

 Как только придет пустой список, вся цепочка завершится

 

Либо в subscribe используйте полноценный DisposableObserver, в onNext проверяйте ваше условие и, если оно выполняется, вызывайте dispose(). 

.subscribe(new DisposableObserver<List<User>>() {
    @Override
    public void onNext(@NonNull List<User> users) {
        if (users.isEmpty()) {
            dispose();
        }
    }

    @Override
    public void onError(@NonNull Throwable e) {

    }

    @Override
    public void onComplete() {
        log("onComplete ");
    }
})

 

Чтобы остановить всю цепочку извне, просто вызовите dispose для Disposable, полученного из subscribe.

 

 

 

 


Присоединяйтесь к нам в Telegram:

- в канале StartAndroid публикуются ссылки на новые статьи с сайта startandroid.ru и интересные материалы с хабра, medium.com и т.п.

- в чатах решаем возникающие вопросы и проблемы по различным темам: Android, Compose, Kotlin, RxJava, Dagger, Тестирование, Performance 

- ну и если просто хочется поговорить с коллегами по разработке, то есть чат Флудильня




Language

Автор сайта

Дмитрий Виноградов

Подробнее можно посмотреть или почитать.

Никакие другие люди не имеют к этому сайту никакого отношения и просто занимаются плагиатом.

Социальные сети

 

В канале я публикую ссылки на интересные и полезные статьи по Android

В чате можно обсудить вопросы и проблемы, возникающие при разработке



Группа ВКонтакте



Поддержка проекта

Яндекс
410011180491924

WebMoney
R248743991365
Z551306702056

Paypal