Reimplementation of Observables in Typescript

Not reinventing the wheel is, most of the time, a good idea. In my daytime job, I work in the healthcare industry. In regulated industries, a lot of controls are imposed to ensure the quality of your deliverables and we are all thankful for that ! One of the consequences is when you want to use a third party library you have to perform a thorough analysis of the currently outstanding issues, how these issues impact you and what controls will you implement to edge any risks if applicable. This analysis is not easy to do, it can take quite some time and have to be repeated each time you either 1/ update the dependency or 2/ significantly change the way you use it.

Consequently, each time you need to use a wheel, you will have to analyses this wheel, understand its weaknesses and explain what you are going to do to make sure those weaknesses do not affect the car you are making and do this for each iteration of your car or of the wheel itself. In those circumstances, reinventing the wheel can sometime be a good idea !

Introducing the Observer pattern. Described in the Gang Of Four book, it is a very useful and simple design pattern that has gain a lot of traction in the last few years. In web design it is a natural extension of the promise pattern which generalize the concept of asynchronous call to asynchronous streams. As it happened, we needed some sort of observable implementation in its most simple expression. We did not need anything fancy like operators or the ability to create various types of observables. Only the basic implementation was needed for our case and we thought introducing a third party for its basic implementation of the pattern would have generated a ton of paper work for little benefit. In the end we opted to re-implement the pattern in Typescript, which turns out to be simpler and more instructive that we would have initially thought.

General overview of the pattern

The starting point is an asynchronous process which generates events. The observer wants to be notified of these events to execute some action. The observable is the object in charge of notifying the observers when those events are generated.

First of, we will define what is an observer:


      export interface Observer<T> {
        next?(value: T): void;
      }
    
An observer is an object with a simply a next method which will be called on any new event. Pay attention to the generic parameter. We could have use any here to allow anything, but we want the typescript compiler to let us know if we are trying to do anything stupid like expecting a number if the event is of a string type.

The API of the observable object is pretty simple too. A subscription method to which a callback is provided:


      interface Observable<T> {
        subscribe(observer: Observer<T>): void {}
      }
    
All well and good. But all this is fine in the nominal case. The observer is subscribed to the observable and then latter calls back on every event. But what happens if no more event are expected? The observer should be notified through another channel.

      export interface Observer<T> {
        next?(value: T): void;
        complete?(): void;
        error?(errValue: any): void;
      }
    
For good measure, will also throw an error handling callback too, in case our observable have some kind of failure. Note than none of those functions are mandatory, we might be interested in only the closure signal for example.

So how do we use this? Well assuming those interface have their proper base implementation, we'll use them this way:


      const observable = new BaseObservable();
      observable.subscribe({
        next: (value: number) => console.log('event', value),
        error: (errValue: any) => throw new Error(any),
      });
    
The events are generated by the Observable once the subscribe function is called. What if the observer want to unsubscribe to the notifications even though the stream is not yet over? For this, the subscribe function can return an unsubscription function:

      interface Observable<T> {
        subscribe(observer: Observer<T>): () => void {}
      }
    
Should you want to unsubscribe, you would just call that function:

      const unsubscription = observable.subscribe({
        next: (value: number) => console.log('event', value),
      });
      // later
      unsubscription();
    

Of course, this is a rather simplistic interface definition of the Observable pattern, but a reasonably useful implementation is not that far away. Check this one out which I have implemented to respect the interface defined in the tc-39 proposition and which is largely inspired by the tc-39 proof of concept.