RxDart: Magical transformations of Streams

Welcome to part 3 of my series on Flutter Architecture:

  • Introduction
  • Fundamentals of Dart Streams
  • RxDart: Magical transformations of Streams (this post)
  • RxVMS foundations: RxCommand and GetIt
  • RxVMS: Services & Managers
  • RxVMS: The self-responsible widget
  • User Authentication the RxVMS way

This time we will make a shallow dive into the magical realm of Reactive Extensions (Rx). I will focus on the most used functions of Rx and explain their application. If you haven’t read the previous post you really should do this first so that you can follow this one.

RxDart Rx implementation for Dart for which we have Frank Pepermans and Brian Egan to thank. If you already used Rx in other languages you might notice that some functions differ a bit in their naming but you should easily get accustomed.

You find the code in test on https://github.com/escamoteur/stream_rx_tutorial/tree/rx_magic

So far we used Streams as a way to get data from one place in our App to another but they can do much more. Let’s have a look at some features that Rx adds to Streams.

Creating Observables

As stated before Observables are the Rx flavour of Streams that offer more features than normal Streams. There are several interesting ways to create them:

From a Stream

Any Stream can be converted to an Observable by passing it to the Observable's constructor:

var controller = new StreamController<String>();

var streamObservable = new Observable(controller.stream);

streamObservable.listen(print);

Periodic events

var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() );

timerObservable.listen(print);

This will create an Observable that emits values forever at a specific rate. So instead setting up a Timer it might be easier to use this.

From a single Value

Sometimes an API demands a Stream/Observable but you just have a simple value. For this the Observable has a factory method just.

var justObservable = Observable<int>.just(42);

justObservable.listen(print);

// this will print : 42

From a Future

  Future<String> asyncFunction() async {
    return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult");
  }

  test('Create Observable from Future', () async {
    print('start');

    var fromFutureObservable = Observable.fromFuture(asyncFunction());

    fromFutureObservable.listen(print);

Creating an Observable from a Future will wait for the Future to complete and emit the result value of the Future or null if no value is returned. Another way to create a Stream from a Future is calling toStream() on any Future.

You may now wonder what’s the sense in converting a Future to an Observable/Stream instead of just awaiting it. Rest assured, this will get clear when we now explore the available functions to manipulate data while it’s “on the Stream”

Subjects

Subjects are the StreamController of Rx and as you can imagine RxDart implements them using a StreamController under the hood.

They behave a bit different than StreamControllers:

  • You can listen() directly on a Subject without accessing a Stream property.
  • More than just one subscription is possible and all listening parties will get the same data at the same time.
  • There are three flavours of Subjects that are best explained with examples:

PublishSubjects

They behave like StreamControllers besides that multiple listeners are allowed:

var subject = new PublishSubject<String>();

subject.listen((item) => print(item)); 

subject.add("Item1");

// Add a second listener
subject.listen((item) => print(item.toUpperCase())); 

subject.add("Item2");
subject.add("Item3");

// this is only to prevent the testing framework to kill this process before all items on the Stream are processed
await Future.delayed(Duration(seconds: 5));

// This will cancel all Subscriptions
subject.close;

If you run that you will see:

Item1
ITEM2
Item2
ITEM3
Item3
✓ PublishSubject

You realize that the second listener that came late to the party (we will call them late subscribers) missed the first item. If this should not happen you could use a BehaviourSubject

BehaviourSubject

Every new subcriber receives the last received data item:

var subject = new BehaviorSubject<String>();

subject.listen((item) => print(item)); 

subject.add("Item1");
subject.add("Item2");

subject.listen((item) => print(item.toUpperCase())); 

subject.add("Item3");

Will output:

Item1
ITEM2
ITEM3
Item2
Item3
✓ BehaviourSubject

You can see that Item1 is lost for the second subscriber but it gets Item2. You might be surprised that the second Subscriber gets Item3 before the first subscriber gets Item2. This because you cannot guarantee the sequence in which subscribers will get served. Still all subscribers get the data in the correct order. BehaviourSubject only caches the last received item for late Subscribers. If you need more items cached you can use a ReplaySubject. In most cases this is not needed.

Manipulating data on the fly

acme

The true power of Rx is that it allows you to process/manipulate data while it’s passing down a Stream. Each of the Rx methods return a new Stream with the resulting data (like the second conveyor belt above) which means you can chain them together to one processing pipeline which makes them extremely powerful.

Map

If there is any Stream operation that I don’t want to miss any more then it’s map(). What map() does is, it takes each passing data item and applies a function on it and pushes the result to its return Stream. A simple example:

map_to_upper

var subject = new PublishSubject<String>();

subject.map((item) => item.toUpperCase()).listen(print);

subject.add("Item1");
subject.add("Item2");
subject.add("Item3");

Output:

ITEM1
ITEM2
ITEM3
✓ Map toUpper

But map doesn’t need to return the same data type that it gets as input. The next example will take integers instead of Strings. Additionally we will chain two map transformations:

var subject = new PublishSubject<int>();

subject.map((intValue) => intValue.toString())
    .map((item) => item.toUpperCase())
    .listen(print);

subject.add(1);
subject.add(2);
subject.add(3);

or something like this:

map-2

class DataClass{}

class WrapperClass
{
  final A wrapped;

  WrapperClass(this.wrapped); 
}

var subject = new PublishSubject<A>();

subject.map<WrapperClass>((a) => new Wrapper(a));

One of the most useful applications of .map is when you receive data in a format from some REST API or from a database and you want it to convert to you own objects:

class User {
  final String name;
  final String adress;
  final String phoneNumber;
  final int age;

  // In real projects I would recommend some 
  // serializer and not doing that manually
  factory User.fromJson(String jsonString) {
    var jsonMap = json.decode(jsonString);

    return User(
      jsonMap['name'],
      jsonMap['adress'],
      jsonMap['phoneNumber'],
      jsonMap['age'],
    );
  }

  User(this.name, this.adress, this.phoneNumber, this.age);

  @override
  String toString() {
    return '$name - $adress - $phoneNumber - $age';
  }
}

void main() {
  test('Map', () {
    // Some dummy data
    var jsonStrings = [
      '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }',
      '{"name": "Stephen King", "adress": "Castle Rock", "phoneNumber":"123456","age": 71 }',
      '{"name": "Jon F. Kennedy", "adress": "Washington", "phoneNumber":"111111","age": 66 }',
    ];

    // We simulate a Stream of json strings that we get from some API/Database with a Subject
    // In reality this migh look more like some `asyncWebCallFcuntion().asStream()` 
    var dataStreamFromAPI = new PublishSubject<String>();

    dataStreamFromAPI
        .map<User>((jsonString) => User.fromJson(jsonString)) // from here on it's User objects
        .listen((user) => print(user.toString()));


    // Simulate incoming data
    dataStreamFromAPI.add(jsonStrings[0]);
    dataStreamFromAPI.add(jsonStrings[1]);
    dataStreamFromAPI.add(jsonStrings[2]);
  });

As a side note, not only Streams but any Iterable offers a map function that you can use for transformations on Lists.

Where

If you are only interested in certain values that come along the Stream you can use the .where() function instead of using an if statement inside your listen which is more expressive and easier to read:

var subject = new PublishSubject<int>();

subject.where((val) => val.isOdd)
    .listen( (val) => print('This only prints odd numbers: $val'));


subject.where((val) => val.isEven)
.listen( (val) => print('This only prints even numbers: $val'));


subject.add(1);
subject.add(2);
subject.add(3);

prints:
This only prints odd numbers: 1
This only prints even numbers: 2
This only prints odd numbers: 3

Debounce

This is one of the little Rx gems! Imagine you have a search field that triggers a REST API call if its text is changed. Doing an API call on every single key stroke is expensive. So you would like to only make a call if the user pauses for a moment. Exactly for this is debounce() it will swallow all incoming events if they are not followed by a pause.

var subject = new PublishSubject<String>();


subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s));


subject.add('A');
subject.add('AB');

await Future.delayed(Duration(milliseconds: 200));


subject.add("ABC");
// There is no output yet

await Future.delayed(Duration(milliseconds: 700));

// now we receive the latest value: 'ABC'

So if you convert an onChanged handler of a TextField to an Observable you can do so elegantly.

Expand

If your source Stream emits arrays of objects and you want to process each object on its own you can use .expand which will do just that:

expand

You will see an application of this further down in the FireStore example.

Merge

If you have multiple different Streams but you want to handle its objects together you can use .mergeWith (in other Rx implementations just merge) which takes an array of Streams and returns one combined Stream.

rx-merge

.mergeWith does not guarantee any order in that the both streams are combined. They get emitted on the resulting stream like they come in.

For example if you have two components that both report errors over a Stream and you want them both be displayed in a Dialog you could do this like (pseudo code):

@override
initState()
{
  super.initState();

  component1.errors.mergeWith([component2.errors])
    .listen( (error) async => await showDialog(error.message));
}

or if you want a combined display of messages from multiple social networks it could look like (pseudo code)

final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data));
final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data));
final postStream = observableTwitter.mergeWith([observableFacebook]);

Distinct

In the scenario described above it could happen that isBusyOne and isBusyTwo both issue the same value which would lead to an update of the UI with the same data. Do prevent this we can use .distinct(). It makes sure that data is only passed down the Stream if the value of a new item is different from the last one. So we would change the code to:

  Observable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct();

Which also shows that you can chain as many Rx operators as you want! 🙂

In other implementations of Rx distinct is named distinctUntilChanged

ZipWith

zipWith also combines one Stream with another. But, unlike .mergeWith, it doesn’t emit data as soon as it receives an item from one of its source Streams. It actually waits until an item from both source Streams has arrived and then combines them using a provided zipper Function:

rx-zip

The signature of zipWith looks a bit scary but we will explore it together:

// R : the type of the resulting Stream/Observable
// S : the type of the second source Stream/Observable
// zipper: a function that takes 
Observable<R> zipWith<S, R>(Stream<S> other, R zipper(T t, S s))

A very basic example:

new Observable.just(1) // .just() creates an Observable that directly emits the past value
    .zipWith(new Observable.just(2), (one, two) => one + two)
    .listen(print); // prints 3

A more practical application is if you need to wait for two async functions that return a Future and you want to process the data as soon as both results are returned. In this slightly contrived example we imagine two REST APIs, one returning a User, the other a Product as JSON strings and we want to wait for both calls before we return an Invoice object.

class Invoice {
  final User user;
  final Product product;

  Invoice(this.user, this.product);

  printInvoice() {
    print(user.toString());
    print(product.toString());
  }
}

// Simulating an HTTP call to an REST API returning a Product as JSON string
Future<String> getProduct() async {
  print("Started getting product");
  await Future.delayed(Duration(seconds: 2));
  print("Finished getting product");
  return '{"name": "Flux compensator", "price": 99999.99}';
}

// Simulating an HTTP call to an REST API returning a User as JSON string
Future<String> getUser() async {
  print("Started getting User");
  await Future.delayed(Duration(seconds: 4));
  print("Finished getting User");
  return '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }';
}

void main() {
  test('zipWith', () async {
    var userObservable =
        Observable.fromFuture(getUser()).map<User>((jsonString) => User.fromJson(jsonString));

    var productObservable = Observable.fromFuture(getProduct())
        .map<Product>((jsonString) => Product.fromJson(jsonString));

    Observable<Invoice> invoiceObservable = userObservable.zipWith<Product, Invoice>(
        productObservable, (user, product) => Invoice(user, product));


    print("Start listening for invoices");
    invoiceObservable.listen((invoice) => invoice.printInvoice());

    // this is only to prevent the testing framework from killing this process before all items on the Stream are processed
    await Future.delayed(Duration(seconds: 5));
  });
}

Looking at the output you can see how asynchronously this is executed

Started getting User
Started getting product
Start listening for invoices
Finished getting product
Finished getting User
Jon Doe - New York - 424242 - 42
Flux compensator - 99999.99

CombineLatest

combineLatest combines the values of one or more Streams like merge and zip but in slightly different way. It listens on more Streams and emits a combined value whenever a new value from one of the Streams arrive. The interesting part is that it does not only emits the changed value but also the last received values of all other source streams. Observe this animation closely:

combine-latest

Before combineLatest emits its first value all source streams have to deliver at least one item.

Unlike the methods before combineLatest isn’t a instance method of Observable but a static method. Also as Dart does not allow operator overloading there are versions of combineLastest depending on the number of source streams: combineLatest2...combineLatest9

A nice application of combineLatest is for instance if you have two Observable<bool> that signal that some parts of your App are busy and you want to display a busy spinner if one of them is busy. With merge this can look like (pseudo code):

class Model
{
  Observable<bool> get isBusy => 
    Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2);

  PublishSubject<bool> isBusyOne;
  PublishSubject<bool> isBusyTwo;
}

In your UI you then can consume isBusy with a StreamBuilder to display a Spinner if the received value is true.

combineLatest is also powerful function in combination with FireStore snapshots streams.

Imagine you want to build an App that displays a news ticker together with a weather forecast. Ticker messages and weather data are stored in two different FireStore collections. Both get updated independently from some backend service. You want to display the data updates using a StreamBuilder. With combineLatest this is easy:

class WeatherForecast {
  final String forecastText;
  final GeoPoint location;

  factory WeatherForecast.fromMap(Map<String, dynamic> map) {
    return WeatherForecast(map['forecastText'], map['location']);
  }

  WeatherForecast(this.forecastText, this.location);
}

class NewsMessage {
  final String newsText;
  final GeoPoint location;

  factory NewsMessage.fromMap(Map<String, dynamic> map) {
    return NewsMessage(map['newsText'], map['location']);
  }

  NewsMessage(this.newsText, this.location);
}

class CombinedMessage {
  final WeatherForecast forecast;
  final NewsMessage newsMessage;

  CombinedMessage(this.forecast, this.newsMessage);
}

class Model {
  CollectionReference weatherCollection;
  CollectionReference newsCollection;

  Model() {
    weatherCollection = Firestore.instance.collection('weather');
    newsCollection = Firestore.instance.collection('news');
  }

  Observable<CombinedMessage> getCombinedMessages() {
    Observable<WeatherForecast> weatherForecasts = weatherCollection
        .snapshots()
        .expand((snapShot) => snapShot.documents)
        .map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data));

    Observable<NewsMessage> news = newsCollection
        .snapshots()
        .expand((snapShot) => snapShot.documents)
        .map<NewsMessage>((document) => NewsMessage.fromMap(document.data));

    return Observable.combineLatest2(
        weatherForecasts, news, (weather, news) => CombinedMessage(weather, news));
  }
}

In your UI you could then use this like: StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...).

AsyncMap

Besides map() there is also the asyncMap function which allows you to use an async function as mapping function. Let’s imagine a slightly different setting for our FireStore example above. Now the needed WeatherForecast depends on the location of the NewsMessage and only has to get updated when a new NewsMessage is received:

Observable<CombinedMessage> getDependendMessages() {

  Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) {
    return snapShot.documents;
  }).map<NewsMessage>((document) {
    return NewsMessage.fromMap(document.data);
  });

  return news.asyncMap((newsEntry) async {
    var weatherDocuments =
        await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments();
    return new CombinedMessage(
        WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry);
  });
}

The Observable returned by getDependendMessages will emit a new CombinedMessage each time the newsCollection changes.

Debugging Observables

As nice as Rx is when everything works it seems that it’s almost impossible to debug an expression like the above

Observable<NewsMessage> news = newsCollection
    .snapshots()
    .expand((snapShot) => snapShot.documents)
    .map<NewsMessage>((document) => NewsMessage.fromMap(document.data));

But keep in mind that => is only a short form for an anonymous function. Using the handy quick action on the arrows Convert to block body you get:

    Observable<NewsMessage> news = newsCollection
        .snapshots()
        .expand((snapShot) {
          return snapShot.documents;
        })
        .map<NewsMessage>((document) {
          return NewsMessage.fromMap(document.data);
        });

Which means we now can set breakpoint or add print statements on every stage of our processing pipeline.

Beware of side effects

This cannot be emphasised enough! If you want to benefit of the Rx to make your code more reliable you have to buy in that Rx is about transforming data while it moves over the conveyor belt. So never call functions from one of the processing functions that modifies any variables/state outside of the processing pipeline until you reach the .listen function. So instead of doing this:

Observable.fromFuture(getProduct())
        .map<Product>((jsonString) { 
     var product = Product.fromJson(jsonString);
    database.save(product);
    setState((){ _product =  product });
    return product;
}).listen();

do this

Observable.fromFuture(getProduct())
        .map<Product>((jsonString) => Product.fromJson(jsonString))
        .listen( (product) {
          database.save(product);  
          setState((){ _product =  product });
        });

The duty of map() is transforming the data on the Stream, NOTHING else! If the passed mapping function does something else this would be considered as a side effect which are hard to spot when reading code and hide easily bugs.

Some thoughts on freeing resources

To avoid any memory leaks always cancel() your subscriptions or dispose your StreamControllers, or close your Subjects as soon as you do not need them anymore.

Final thoughts

Wow congratulations if you stayed with me to this point. I know at this time it is a lot of stuff to devour. But you are now not only able to use Rx to make your life easier, but you are also prepared for the next posts where we dive into the details of RxVMS

Actually Rx has a lot more great functions than the one I showed you here. If you got the Rx virus and want to explore more check out http://reactivex.io/documentation/operators.html
On Gitter there is even a support channel for RxDart where you will find help if you get stuck somewhere in your processing pipeline https://gitter.im/ReactiveX/rxdart

Contact me:

15 thoughts on “RxDart: Magical transformations of Streams

  1. mTony says:

    Thanks so much Thomas, this was a long awaited post! As always, excellent job explaining complicated concepts, keep it up 🙂

  2. How did you make these amazing animations?
    I really want to make some for my posts as well.

  3. C M Fraser says:

    Keep them coming Thomas, great stuff.

  4. jhon says:

    I fell in love with RxDart,: P, waiting for the next tutorial 🙁

  5. jhon says:

    I can not find another tutorial on RxVMS,
    You know some, apart from yours, of course. 😉

  6. jhon says:

    thank you sensei, I will see it

  7. Luis Pulido says:

    How can we test this on Unit Tests?

Leave a Reply

Your email address will not be published. Required fields are marked *