Welcome to part 3 of my series on Flutter Architecture:
- Introduction
- Fundamentals of Dart Streams
- RxDart: Magical transformations of Streams (this post)
- RxVMS foundations: RxCommand and GetIt
- RxVMS: Services & Managers
- RxVMS: The self-responsible widget
- User Authentication the RxVMS way
This time we will make a shallow dive into the magical realm of Reactive Extensions (Rx). I will focus on the most used functions of Rx and explain their application. If you haven't read the previous post you really should do this first so that you can follow this one.
RxDart Rx implementation for Dart for which we have Frank Pepermans and Brian Egan to thank. If you already used Rx in other languages you might notice that some functions differ a bit in their naming but you should easily get accustomed.
You find the code in test
on https://github.com/escamoteur/stream_rx_tutorial/tree/rx_magic
So far we used Streams as a way to get data from one place in our App to another but they can do much more. Let's have a look at some features that Rx adds to Streams.
Creating Observables
As stated before Observables
are the Rx flavour of Streams that offer more features than normal Streams. There are several interesting ways to create them:
From a Stream
Any Stream can be converted to an Observable
by passing it to the Observable's
constructor:
var controller = new StreamController<String>();
var streamObservable = new Observable(controller.stream);
streamObservable.listen(print);
Periodic events
var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() );
timerObservable.listen(print);
This will create an Observable
that emits values forever at a specific rate. So instead setting up a Timer it might be easier to use this.
From a single Value
Sometimes an API demands a Stream/Observable but you just have a simple value. For this the Observable has a factory method just
.
var justObservable = Observable<int>.just(42);
justObservable.listen(print);
// this will print : 42
From a Future
Future<String> asyncFunction() async {
return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult");
}
test('Create Observable from Future', () async {
print('start');
var fromFutureObservable = Observable.fromFuture(asyncFunction());
fromFutureObservable.listen(print);
Creating an Observable
from a Future
will wait for the Future
to complete and emit the result value of the Future or null
if no value is returned. Another way to create a Stream from a Future is calling toStream()
on any Future.
You may now wonder what's the sense in converting a Future to an Observable/Stream instead of just awaiting it. Rest assured, this will get clear when we now explore the available functions to manipulate data while it's "on the Stream"
Subjects
Subjects are the StreamController
of Rx and as you can imagine RxDart implements them using a StreamController
under the hood.
They behave a bit different than StreamControllers
:
- You can
listen()
directly on a Subject without accessing a Stream property. - More than just one subscription is possible and all listening parties will get the same data at the same time.
- There are three flavours of Subjects that are best explained with examples:
PublishSubjects
They behave like StreamControllers
besides that multiple listeners are allowed:
var subject = new PublishSubject<String>();
subject.listen((item) => print(item));
subject.add("Item1");
// Add a second listener
subject.listen((item) => print(item.toUpperCase()));
subject.add("Item2");
subject.add("Item3");
// this is only to prevent the testing framework to kill this process before all items on the Stream are processed
await Future.delayed(Duration(seconds: 5));
// This will cancel all Subscriptions
subject.close;
If you run that you will see:
Item1
ITEM2
Item2
ITEM3
Item3
✓ PublishSubject
You realize that the second listener that came late to the party (we will call them late subscribers) missed the first item. If this should not happen you could use a BehaviourSubject
BehaviourSubject
Every new subcriber receives the last received data item:
var subject = new BehaviorSubject<String>();
subject.listen((item) => print(item));
subject.add("Item1");
subject.add("Item2");
subject.listen((item) => print(item.toUpperCase()));
subject.add("Item3");
Will output:
Item1
ITEM2
ITEM3
Item2
Item3
✓ BehaviourSubject
You can see that Item1 is lost for the second subscriber but it gets Item2. You might be surprised that the second Subscriber gets Item3 before the first subscriber gets Item2. This because you cannot guarantee the sequence in which subscribers will get served. Still all subscribers get the data in the correct order. BehaviourSubject
only caches the last received item for late Subscribers. If you need more items cached you can use a ReplaySubject. In most cases this is not needed.
Manipulating data on the fly
The true power of Rx is that it allows you to process/manipulate data while it's passing down a Stream. Each of the Rx methods return a new Stream with the resulting data (like the second conveyor belt above) which means you can chain them together to one processing pipeline which makes them extremely powerful.
Map
If there is any Stream operation that I don't want to miss any more then it's map()
. What map()
does is, it takes each passing
data item and applies a function on it and pushes the result to its return Stream. A simple example:
var subject = new PublishSubject<String>();
subject.map((item) => item.toUpperCase()).listen(print);
subject.add("Item1");
subject.add("Item2");
subject.add("Item3");
Output:
ITEM1
ITEM2
ITEM3
✓ Map toUpper
But map doesn't need to return the same data type that it gets as input. The next example will take integers instead of Strings. Additionally we will chain two map
transformations:
var subject = new PublishSubject<int>();
subject.map((intValue) => intValue.toString())
.map((item) => item.toUpperCase())
.listen(print);
subject.add(1);
subject.add(2);
subject.add(3);
or something like this:
class DataClass{}
class WrapperClass
{
final DataClass wrapped;
WrapperClass(this.wrapped);
}
var subject = new PublishSubject<WrapperClass>();
subject.map<WrapperClass>((a) => new WrapperClass(a));
One of the most useful applications of .map
is when you receive data in a format from some REST API or from a database and you want it to convert to you own objects:
class User {
final String name;
final String adress;
final String phoneNumber;
final int age;
// In real projects I would recommend some
// serializer and not doing that manually
factory User.fromJson(String jsonString) {
var jsonMap = json.decode(jsonString);
return User(
jsonMap['name'],
jsonMap['adress'],
jsonMap['phoneNumber'],
jsonMap['age'],
);
}
User(this.name, this.adress, this.phoneNumber, this.age);
@override
String toString() {
return '$name - $adress - $phoneNumber - $age';
}
}
void main() {
test('Map', () {
// Some dummy data
var jsonStrings = [
'{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }',
'{"name": "Stephen King", "adress": "Castle Rock", "phoneNumber":"123456","age": 71 }',
'{"name": "Jon F. Kennedy", "adress": "Washington", "phoneNumber":"111111","age": 66 }',
];
// We simulate a Stream of json strings that we get from some API/Database with a Subject
// In reality this migh look more like some `asyncWebCallFcuntion().asStream()`
var dataStreamFromAPI = new PublishSubject<String>();
dataStreamFromAPI
.map<User>((jsonString) => User.fromJson(jsonString)) // from here on it's User objects
.listen((user) => print(user.toString()));
// Simulate incoming data
dataStreamFromAPI.add(jsonStrings[0]);
dataStreamFromAPI.add(jsonStrings[1]);
dataStreamFromAPI.add(jsonStrings[2]);
});
As a side note, not only Streams but any Iterable
offers a map
function that you can use for transformations on Lists.
Where
If you are only interested in certain values that come along the Stream you can use the .where()
function instead of using an if
statement inside your listen which is more expressive and easier to read:
var subject = new PublishSubject<int>();
subject.where((val) => val.isOdd)
.listen( (val) => print('This only prints odd numbers: $val'));
subject.where((val) => val.isEven)
.listen( (val) => print('This only prints even numbers: $val'));
subject.add(1);
subject.add(2);
subject.add(3);
prints:
This only prints odd numbers: 1
This only prints even numbers: 2
This only prints odd numbers: 3
Debounce
This is one of the little Rx gems! Imagine you have a search field that triggers a REST API call if its text is changed. Doing an API call on every single key stroke is expensive. So you would like to only make a call if the user pauses for a moment. Exactly for this is debounce()
it will swallow all incoming events if they are not followed by a pause.
var subject = new PublishSubject<String>();
subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s));
subject.add('A');
subject.add('AB');
await Future.delayed(Duration(milliseconds: 200));
subject.add("ABC");
// There is no output yet
await Future.delayed(Duration(milliseconds: 700));
// now we receive the latest value: 'ABC'
So if you convert an onChanged
handler of a TextField
to an Observable you can do so elegantly.
Expand
If your source Stream
emits arrays of objects and you want to process each object on its own you can use .expand
which will do just that:
You will see an application of this further down in the FireStore example.
Merge
If you have multiple different Streams but you want to handle its objects together you can use .mergeWith
(in other Rx implementations just merge
) which takes an array of Streams and returns one combined Stream.
.mergeWith
does not guarantee any order in that the both streams are combined. They get emitted on the resulting stream like they come in.
For example if you have two components that both report errors over a Stream and you want them both be displayed in a Dialog you could do this like (pseudo code):
@override
initState()
{
super.initState();
component1.errors.mergeWith([component2.errors])
.listen( (error) async => await showDialog(error.message));
}
or if you want a combined display of messages from multiple social networks it could look like (pseudo code)
final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data));
final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data));
final postStream = observableTwitter.mergeWith([observableFacebook]);
ZipWith
zipWith
also combines one Stream with another. But, unlike .mergeWith
, it doesn't emit data as soon as it receives an item from one of its source Streams. It actually waits until an item from both source Streams has arrived and then combines them using a provided zipper Function:
The signature of zipWith
looks a bit scary but we will explore it together:
// R : the type of the resulting Stream/Observable
// S : the type of the second source Stream/Observable
// zipper: a function that takes
Observable<R> zipWith<S, R>(Stream<S> other, R zipper(T t, S s))
A very basic example:
new Observable.just(1) // .just() creates an Observable that directly emits the past value
.zipWith(new Observable.just(2), (one, two) => one + two)
.listen(print); // prints 3
A more practical application is if you need to wait for two async functions that return a Future
and you want to process the data as soon as both results are returned. In this slightly contrived example we imagine two REST APIs, one returning a User
, the other a Product as JSON strings and we want to wait for both calls before we return an Invoice
object.
class Invoice {
final User user;
final Product product;
Invoice(this.user, this.product);
printInvoice() {
print(user.toString());
print(product.toString());
}
}
// Simulating an HTTP call to an REST API returning a Product as JSON string
Future<String> getProduct() async {
print("Started getting product");
await Future.delayed(Duration(seconds: 2));
print("Finished getting product");
return '{"name": "Flux compensator", "price": 99999.99}';
}
// Simulating an HTTP call to an REST API returning a User as JSON string
Future<String> getUser() async {
print("Started getting User");
await Future.delayed(Duration(seconds: 4));
print("Finished getting User");
return '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }';
}
void main() {
test('zipWith', () async {
var userObservable =
Observable.fromFuture(getUser()).map<User>((jsonString) => User.fromJson(jsonString));
var productObservable = Observable.fromFuture(getProduct())
.map<Product>((jsonString) => Product.fromJson(jsonString));
Observable<Invoice> invoiceObservable = userObservable.zipWith<Product, Invoice>(
productObservable, (user, product) => Invoice(user, product));
print("Start listening for invoices");
invoiceObservable.listen((invoice) => invoice.printInvoice());
// this is only to prevent the testing framework from killing this process before all items on the Stream are processed
await Future.delayed(Duration(seconds: 5));
});
}
Looking at the output you can see how asynchronously this is executed
Started getting User
Started getting product
Start listening for invoices
Finished getting product
Finished getting User
Jon Doe - New York - 424242 - 42
Flux compensator - 99999.99
CombineLatest
combineLatest
combines the values of one or more Streams like merge and zip but in slightly different way. It listens on more Streams and emits a combined value whenever a new value from one of the Streams arrive. The interesting part is that it does not only emits the changed value but also the last received values of all other source streams. Observe this animation closely:
Before combineLatest
emits its first value all source streams have to deliver at least one item.
Unlike the methods before combineLatest
isn't a instance method of Observable
but a static method. Also as Dart does not allow operator overloading there are versions of combineLastest
depending on the number of source streams: combineLatest2...combineLatest9
A nice application of combineLatest
is for instance if you have two Observable<bool>
that signal that some parts of your App are busy and you want to display a busy spinner if one of them is busy. With merge this can look like (pseudo code):
class Model
{
Observable<bool> get isBusy =>
Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2);
PublishSubject<bool> isBusyOne;
PublishSubject<bool> isBusyTwo;
}
In your UI you then can consume isBusy
with a StreamBuilder
to display a Spinner if the received value is true
.
combineLatest
is also powerful function in combination with FireStore snapshots
streams.
Imagine you want to build an App that displays a news ticker together with a weather forecast. Ticker messages and weather data are stored in two different FireStore collections. Both get updated independently from some backend service. You want to display the data updates using a StreamBuilder. With combineLatest
this is easy:
class WeatherForecast {
final String forecastText;
final GeoPoint location;
factory WeatherForecast.fromMap(Map<String, dynamic> map) {
return WeatherForecast(map['forecastText'], map['location']);
}
WeatherForecast(this.forecastText, this.location);
}
class NewsMessage {
final String newsText;
final GeoPoint location;
factory NewsMessage.fromMap(Map<String, dynamic> map) {
return NewsMessage(map['newsText'], map['location']);
}
NewsMessage(this.newsText, this.location);
}
class CombinedMessage {
final WeatherForecast forecast;
final NewsMessage newsMessage;
CombinedMessage(this.forecast, this.newsMessage);
}
class Model {
CollectionReference weatherCollection;
CollectionReference newsCollection;
Model() {
weatherCollection = Firestore.instance.collection('weather');
newsCollection = Firestore.instance.collection('news');
}
Observable<CombinedMessage> getCombinedMessages() {
Observable<WeatherForecast> weatherForecasts = weatherCollection
.snapshots()
.expand((snapShot) => snapShot.documents)
.map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data));
Observable<NewsMessage> news = newsCollection
.snapshots()
.expand((snapShot) => snapShot.documents)
.map<NewsMessage>((document) => NewsMessage.fromMap(document.data));
return Observable.combineLatest2(
weatherForecasts, news, (weather, news) => CombinedMessage(weather, news));
}
}
In your UI you could then use this like: StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...)
.
Distinct
In the scenario described above it could happen that isBusyOne
and isBusyTwo
both issue the same value which would lead to an update of the UI with the same data. Do prevent this we can use .distinct()
. It makes sure that data is only passed down the Stream if the value of a new item is different from the last one. So we would change the code to:
Observable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct();
Which also shows that you can chain as many Rx operators as you want! 🙂
In other implementations of Rx distinct
is named distinctUntilChanged
AsyncMap
Besides map()
there is also the asyncMap
function which allows you to use an async function as mapping function. Let's imagine a slightly different setting for our FireStore example above. Now the needed WeatherForecast
depends on the location
of the NewsMessage
and only has to get updated when a new NewsMessage
is received:
Observable<CombinedMessage> getDependendMessages() {
Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) {
return snapShot.documents;
}).map<NewsMessage>((document) {
return NewsMessage.fromMap(document.data);
});
return news.asyncMap((newsEntry) async {
var weatherDocuments =
await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments();
return new CombinedMessage(
WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry);
});
}
The Observable
returned by getDependendMessages
will emit a new CombinedMessage
each time the newsCollection
changes.
Debugging Observables
As nice as Rx is when everything works it seems that it's almost impossible to debug an expression like the above
Observable<NewsMessage> news = newsCollection
.snapshots()
.expand((snapShot) => snapShot.documents)
.map<NewsMessage>((document) => NewsMessage.fromMap(document.data));
But keep in mind that =>
is only a short form for an anonymous function. Using the handy quick action on the arrows Convert to block body you get:
Observable<NewsMessage> news = newsCollection
.snapshots()
.expand((snapShot) {
return snapShot.documents;
})
.map<NewsMessage>((document) {
return NewsMessage.fromMap(document.data);
});
Which means we now can set breakpoint or add print statements on every stage of our processing pipeline.
Beware of side effects
This cannot be emphasised enough! If you want to benefit of the Rx to make your code more reliable you have to buy in that Rx is about transforming data while it moves over the conveyor belt. So never call functions from one of the processing functions that modifies any variables/state outside of the processing pipeline until you reach the .listen
function. So instead of doing this:
Observable.fromFuture(getProduct())
.map<Product>((jsonString) {
var product = Product.fromJson(jsonString);
database.save(product);
setState((){ _product = product });
return product;
}).listen();
do this
Observable.fromFuture(getProduct())
.map<Product>((jsonString) => Product.fromJson(jsonString))
.listen( (product) {
database.save(product);
setState((){ _product = product });
});
The duty of map()
is transforming the data on the Stream, NOTHING else! If the passed mapping function does something else this would be considered as a side effect which are hard to spot when reading code and hide easily bugs.
Some thoughts on freeing resources
To avoid any memory leaks always cancel()
your subscriptions or dispose your StreamControllers, or close your Subjects as soon as you do not need them anymore.
Final thoughts
Wow congratulations if you stayed with me to this point. I know at this time it is a lot of stuff to devour. But you are now not only able to use Rx to make your life easier, but you are also prepared for the next posts where we dive into the details of RxVMS
Actually Rx has a lot more great functions than the one I showed you here. If you got the Rx virus and want to explore more check out http://reactivex.io/documentation/operators.html
On Gitter there is even a support channel for RxDart where you will find help if you get stuck somewhere in your processing pipeline https://gitter.im/ReactiveX/rxdart
Thanks so much Thomas, this was a long awaited post! As always, excellent job explaining complicated concepts, keep it up 🙂
How did you make these amazing animations?
I really want to make some for my posts as well.
In the end I only found PowerPoint an easy solution for such Animations.
Keep them coming Thomas, great stuff.
Thanks a lot!
I fell in love with RxDart,: P, waiting for the next tutorial 🙁
Yep if you once get started you don’t wanna stop 😀
I can not find another tutorial on RxVMS,
You know some, apart from yours, of course. 😉
I’m still working on the next post on it. As this is my pattern and its pretty new you won’t find much other information.
If you cannot wait 😀 here is my London Talk on it https://skillsmatter.com/skillscasts/12265-flutterldn-september
Love your talk. I’ve watched it multiple times now.
Is there any chance you can cover how best to manage an existing stream or a periodic interval with RxCommand? Trying to apply the suggestions from https://www.dartlang.org/articles/libraries/creating-streams without much luck.
A simple timer would do. I’d love to see your suggestions.
Could you open an issue for this on the rx_command repo and elaborate a bit more what you want to achieve
thank you sensei, I will see it
thank you sensei, I will see it
How can we test this on Unit Tests?
Have a look here:
https://stackoverflow.com/questions/51956957/how-to-test-a-stream-in-dart
As every Observable is also a Stream you can use the same approach.
Looking forward for next posts!
Amazing!!! when the next articles are coming ???!?!?!?!?!?
What a great article, thanks!
Thanks a lot!
This series is awesome. Cannot wait for you to continue it!
Great post
I’ve used RxJS for a few years, and for beginners I’d advice caution not going overboard with Rx. It doesn’t take much effort to compose very hard to read streams. I haven’t tried RxDart yet, so maybe it doesn’t have the operators that can get you in a lot of trouble, like mergeMap, switchMap etc.
In any case, I wanna compliment you on this article. The animations should make it easy for beginners to understand how streams work, and your article in general was very detailed and gave insight into a lot of great operators.
[…] Автор оригинала: Thomas Burkhart […]
Nice detailed explanation!
Hi, Thomas.
Which tool do you use to make these animated illustrations?
Valery.
In the end I only found PowerPoint an easy solution for such Animations.
[…] my issue: Update streams based on filters in Dart/Flutter Merge Firestore streams using rxDart https://www.burkharts.net/apps/blog/rxdart-magical-transformations-of-streams/ […]
Hi,Thomas
Whats the best approach to display snackbar/toast when a Stream emit error?
It’s a bit tricky from non UI code 🙂
Best is to listen to the stream with an error handler from the UI in some StatefulWidget’s initstate.
Ohh god, you’re really a magician!
Thanks for the excellent content and explanation!
For sure now i can make better apps
Great post! Thank you.