Fundamentals of Dart Streams

Welcome to part 2 of my series on Flutter Architecture:

Streams are the main building block of RxVMS so we will take a closer look at them in this post because understanding Streams is an absolute requirement to understand RxVMS.

It turned out that including Rx in this post would make it too long so I split it two parts.

Let it flow

I read a lot of comments that say Streams and especially Rx is too hard to understand, so they don’t use it.

I like you to know is that I don’t count myself a Rx wizard. Mastering the full power of Rx is not easy I admit that and I’m still learning. But let me set right one misconception from the beginning: You don’t have to be an Rx wizard to gain a lot of benefits using Streams and Rx. I will try to do my best to explain Streams as easy as I can.

What are Streams?

For me, the best analogy for a Stream is a conveyor belt. You can put an item on one side and it will automatically be transported to the other. Streams act like a conveyor belts, but instead of physical items we can put a data object on the belt and it will transport them automatically, but to where? Well, as with a real conveyor, if noone is there to catch them at the end then the items will just drop off and get lost. (Ok, this isn’t entirely true with Dart Streams but it’s best to treat Streams as if it were.)

stream_no_listener

To avoid having your data object drop off into the void you can set up a “trap” at the end of a Stream. The trap will catch any arriving object and do something (react) each time it catches an item that reaches the end of the belt.

stream_listener

Remember:

  • When your data reaches the end of your Stream, if you haven’t set up a trap to handle that even then your data will fall off into the void and never be seen again. (Again, not entirely true with Dart Streams but you’re better off pretending it is.)

  • After putting an object on the Stream you don’t wait for it to reach the end because the Stream will handle all of that in the background.

  • Your trap can receive an item at anytime; it doesn’t have to happen right after you put something on the Stream (don’t worry Streams transport Data really fast). Imagine you don’t know how fast the belt moves or how long it is. This means putting something on a Stream is completely decoupled from reacting on an item at the other end. Your trap will trigger and catch the item whenever it gets there. (Some of you might already realize that this fits nicely into the reactive way that Flutter updates its Widgets.)

  • You can setup a trap long before the first item will arrive.

  • It’s “First In First Out”. Items will always come out in the same order they went in.

And what is Rx?

Rx, short for Reactive Extensions, are Streams on steroids 😉 Rx is a concept very similar to Streams that was invented for the .net framework by a team at Microsoft. As .net already had a type Stream that is used for file I/O, they named them Observables and created a lot of functions to manipulate the data that moves through them. Dart has Streams embedded in its language specification which already offer a lot of this functionality, but not all of it. That’s why the RxDart package was developed; it’s based on Dart Streams, but takes things further by extening their functionality. Wie will take a closer look at Rx in the next part of this series.

Some nomenclature

Dart Streams and Rx use some nomenclature that can look a bit scary, so here’s a translation. I’ll show Dart first, then Rx.

  • Stream/Observable This is the conveyor belt discussed above. A Stream can always be converted to an Observable and you can assign Observables anywhere you use a Stream. So don’t get confused if I use these terms interchangeably.

  • listen/subscribe Setting the “trap”.

  • StreamController/Subject The “left” side of the conveyor belt, where you put data on the Stream. The two differ slightly in their properties and features, but serve the same purpose.

  • Emitting an item/data The moment when data appears at the end of the conveyor belt.

Creating Streams

If you want to follow along with the following chapters, please clone this project for the basic Stream examples. I’ll be using the Dart/Flutter testing system.

To create a Stream you have to create a StreamController

var controller = new StreamController<String>();

controller.add("Item1"); // Put first item on the belt

The generic type (in this case String) that has to be passed when creating a StreamController defines which type of objects we can push onto the Stream. This can be ANY type! You can create a StreamController<List<MyObject>>() if you want to, and it will transport a whole List instead of single objects.

Building the trap

If you run the test above, then nothing will visibly happen because there’s nothing at the end to catch your Strings. So, lets create a trap:

var controller = new StreamController<String>();


controller.stream.listen((item) => print(item)); // this is the trap

controller.add("Item1");
controller.add("Item2");
controller.add("Item3");

Now we’ve got the trap set, using the .listen() method on the stream of the StreamController. It’s written “controller.stream.listen” but if you run it backward, like a record album from the 60’s, it’s real meaning will be revealed. “Listen to the Stream with this Controller.”

You have to pass a function into the .listen() method because it needs to know what to do when it “hears” something (when a new data object arrived at the end of the Stream). This function that you’re passing in needs to accept a parameter that is of the same type the StreamController was created with. In this case, it was a String: new StreamController<String>();

If you run the above code it will output:

Item1
Item2
Item3
✓ Setting the trap
Exited

In my opinion the biggest problem for people who are new to Streams is that you can define the reaction for an emitted item long before the first item is is pushed on the stream that causes that reaction to get called.

Stop listening

The above code example omitted a small but important part. listen() is a function that returns a StreamSubscription object. Calling .cancel() on it frees the subscription, preventing your listening function from being called after you want to cancel it:

var controller = new StreamController<String>();

StreamSubscription subscription = controller.stream.listen((item) => print(item)); // This is the Trap

controller.add("Item1");
controller.add("Item2");
controller.add("Item3");

// This is to prevent the testing framework from killing this process 
// before all items from the Stream have been taken care of
await Future.delayed(Duration(milliseconds: 500));

subscription.cancel;

Listen to the details

As wrote above, listen() expects a function that takes a parameter, and this parameter needs to be of the same type that the StreamController was created with. This function can be a lambda, like in the example above, or it can be any other function.

The following are all valid examples:

void myPrint(String message) {
  print(message);
}

StreamSubscription subscription = controller.stream.listen((item) => print(item)); // using a lambda function

StreamSubscription subscription2 = controller.stream.listen(myPrint); // using tear-off

StreamSubscription subscription3 = controller.stream.listen((item) {
    print(item);
    print(item.toUpperCase);
}); // using lambda block

Important: Most Dart Streams are single subscription streams, meaning that after they’ve listened once they can’t listen a second time without throwing an exception. This is different than other implementations of Rx

The full signature of listen() looks like this:

    /* excerpt from the API doc
   * The [onError] callback must be of type `void onError(error)` or
   * `void onError(error, StackTrace stackTrace)`. If [onError] accepts
   * two arguments it is called with the error object and the stack trace
   * (which could be `null` if the stream itself received an error without
   * stack trace).
   * Otherwise it is called with just the error object.
   * If [onError] is omitted, any errors on the stream are considered unhandled,
   * and will be passed to the current [Zone]'s error handler.
   * By default unhandled async errors are treated
   * as if they were uncaught top-level errors.
   *
   * If this stream closes and sends a done event, the [onDone] handler is
   * called. If [onDone] is `null`, nothing happens.
   *
   * If [cancelOnError] is true, the subscription is automatically canceled
   * when the first error event is delivered. The default is `false`.
    */
  StreamSubscription<T> listen(void onData(T event),
      {Function onError, void onDone(), bool cancelOnError});

This means you can do more than simply pass one handler for emitted data, you can also have a handler in case of errors and if the Stream is closed from the side of the Stream controller (onDone). Exceptions that get thrown from inside the Stream will call onError() if you provide it, otherwise they’re just swallowed and you’ll never know that something went wrong.

Flutter Stream Example

To make it easier for you to follow the next chapters, I made a separate branch in the repository.
Please clone/switch to this branch https://github.com/escamoteur/stream_rx_tutorial/tree/counter_stream

For the first example I took the well known counter App that you get when you create a new Flutter Project and refactored it a bit. I added a model class to hold the state of the App, which is basically just the counter’s value:

class Model
{
    int _counter = 0;
    StreamController _streamController = new StreamController<int>();

    Stream<int> get counterUpdates => _streamController.stream;

    void incrementCounter()
    {
        _counter++;
        _streamController.add(_counter);
    }
}

here you can see a very typical pattern: Instead of publishing the whole StreamController, we just publish its Stream property.

In order to make the Model accessible by the UI, I made it a static field in the App object because I didn’t want to introduce an InheritedWidget or a ServiceLocator. We can get away with this for a simple example but I wouldn’t want to do it with a real App!

In main.dart we add:

class _MyHomePageState extends State<MyHomePage> {
  int _counter = 0;
  StreamSubscription streamSubscription;

  @override
  void initState() {
    streamSubscription = MyApp.model.counterUpdates.listen((newVal) => setState(() {
          _counter = newVal;
        }));

    super.initState();
  }

  // Although this state will not get destroyed as long as the App is running its good
  // style to always free subscriptions
  @override
  void dispose() {
      streamSubscription?.cancel();
      super.dispose();
    }

InitState() is a good place to set up the ‘trap’, and being good Dart citizens we always make sure to free the subscription in dispose(), right?

In the widget tree we just need to adapt the onPressed handler of the FAB (Floating Action Button).

    floatingActionButton: new FloatingActionButton(
            onPressed: MyApp.model.incrementCounter,
    tooltip: 'Increment',
    child: new Icon(Icons.add),
    ),

By doing this, we’ve created a clean separation between View and Model, using a Stream.

Using a StreamBuilder

Source: https://github.com/escamoteur/stream_rx_tutorial/tree/streambuilder

Instead of using initState() and setState() for our needs, Flutter comes with a handy Widget called StreamBuilder. As you may have guessed, it takes a Stream and a builder function, calling it whenever a new value is emitted by the Stream. We don’t need initState or dispose for that:

body: new Center(
  child: new Column(
    mainAxisAlignment: MainAxisAlignment.center,
    children: <Widget>[
      new Text(
        'You have pushed the button this many times:',
      ),
      StreamBuilder<int>(
          initialData: 0,
          stream: MyApp.model.counterUpdates,
          builder: (context, snappShot) {
            String valueAsString = 'NoData';

            if (snappShot != null && snappShot.hasData) {
              valueAsString = snappShot.data.toString();
            }

            return Text(
              valueAsString,
              style: Theme.of(context).textTheme.display1,
            );
          }),
    ],
  ),
),

We’re almost at the end, I promise. Here are three things that are good to know:

  • In the above layout, one big advantage of using StreamBuilder compared to the first solution is that calling setState() in listen() will always rebuild the full page, while StreamBuilder will only call its builder.

  • The snapShot variable contains the most recent data that was received from the Stream. Always check to ensure it contains valid data before using it.

  • Because of the sequence things are initialized in during build(), the StreamBuilder cannot receive a value during the very first frame. To get around this, we pass a value for initialData that gets used for the first build, meaning the first frame, of the screen. If we don’t pass initialData our builder function will get called with an invalid snapShot on the first frame. An alternative to using initialData is to return a ‘place holder widget’ if `snapShot is invalid that is displayed until we receive valid data like:

// Let's assume our stream does not emit items based on a button but on some update in a database
StreamBuilder<int>(
    stream: MyApp.model.databaseUpdates,
    builder: (context, snappShot) {

    if (snappShot != null && snappShot.hasData) {
        return Text(
          snappShot.data.toString(),
          style: Theme.of(context).textTheme.display1,
        );
    }

    // Until we receive a valid data we display a waiting Spinner
    return CircularProgressIndicator ();
  }),

In the next post we’ll take a look at how to transform the data on our Streams and do it on the fly. But please be patient, I won’t be back from vacation till the end of August. Many thanks to Scott Stoll for proof reading and important feedback.

It is really important to me that you understand Streams. If you didn’t understand something in this post or think it could be better explained please let me know so that I can improve it.

Contact me:

14 thoughts on “Fundamentals of Dart Streams

  1. 바카라 says:

    Simply want to say your article is as astonishing. The clearness in your post is simply nice and i can assume you’re
    an expert on this subject. Fine with your permission allow me to grab your feed to keep updated
    with forthcoming post. Thanks a million and please continue the
    rewarding work.

  2. GabrieleMazzola says:

    Hey these posts are awesome! Thank you very much for the hard work you’re doing.

    Can’t wait to read your next posts of this series! Have a good vacation!

  3. Yeah bookmaking this wasn’t a high risk decision outstanding
    post!

  4. mTony says:

    Keep up the excellent work, looking forward to the next one 🙂

  5. Tuan Vu says:

    Didn’t realize Dart comes with StreamController. Looks like a good “gateway” library before your next post on Rx. Have a great vacation. The wait will be worse than the Avengers cliffhanger:)

  6. Valerio Filhote says:

    Congratulations !! This article open my mind about how usefull, easy and clean to work with stream in Flutter. Thanks a lot.

  7. Joseph says:

    wow, can’t wait for your next post! enjoy your vacation!

  8. Ken Lee says:

    Hi,

    Thanks a lot for the example. Just want to get your thoughts of ‘what-if’ below:

    What if we just expose out the stream (which is counterUpdates) and then create another reference for Sink called inCounter (StreamSink get inCounter => _streamController.stream;) so that we only expose out *sink* and *stream* and letting the business logic/counter increase fully independent on the model class itself?

    Of course, from the FAB, during onPressed it will just MyApp.model.inCounter…

    This is to separate the business logic independently. Any widget/page that need to access it whether for the purpose of listening the new data or updating the data, the model/bloc will manage itself.

    • Thomas Burkhart says:

      Yes that’s the way the BLOC pattern does this. I wanted to keep the example as easy as possible. In the next posts I will show you how I use RxCommands for exactly this purpose

Leave a Reply

Your email address will not be published. Required fields are marked *