The dataflow engine I gave in my last post can be seen as an
implementation of self-adjusting computation,
in the style of Acar, Blelloch and Harper's original POPL 2002
paper *Adaptive Functional Programming*.
(Since then, state of the art implementation techniques have
improved a lot, so don't take my post as indicative of what
modern libraries do.)

Many people have seen resemblances between self-adjusting computation and functional reactive programming --- a good example of this is Jake Donham's Froc library for Ocaml. Originally, I was one of those people, but that's no longer true: I think SAC and FRP are completely orthogonal.

I now think that FRP libraries can be very minimalistic --- my ICFP
2013 paper
*Higher-Order Reactive Programming without Spacetime Leaks*
gives a type system, implementation, and correctness proof for
an FRP language with full support for higher order constructions
like higher-order functions and streams of streams, while at the
same time statically ruling out space and time leaks.

The key idea is to distinguish between *stable values* (like
ints and bools) whose representation doesn't change over time from
*dynamic values* (like streams) whose representation is
time-varying. Stable values are the usual datatypes, and can be used
whenever we like. But dynamic values have a scheduling constraint: we
can only use them at certain times. For example, with a stream, we
want to look at the head at time 0, the head of the tail at time 1,
the head of the tail of the tail at time 2, and so on. It's a
mistake to look at the head of the tail of a stream at time 0, because
that value might not be available yet.

With an appropriate type discipline, it's possible to ensure scheduling correctness statically, but unfortunately many people are put off by modal types and Kripke logical relations. This is a shame, because the payoff of all this is that the implementation strategy is super-simple -- we can just use plain-vanilla lazy evaluation to implement FRP.

Recently, though, I've figured out how to embed this kind of FRP
library into standard functional languages like Ocaml. Since we can't
define modal type operators in standard functional languages, we have
to give up some static assurance, and replace the *static*
checks of time-correctness with *dynamic* checks, but we are
still able to rule out space leaks by construction, and still get a
runtime error if we mis-schedule a program. Essentially, we can
replace *type checking* with *contract checking*.
As usual, you can find the code on Github here.

Let's look at an Ocaml signature for this library:

module type NEXT = sig type 'a t exception Timing_error of int * int val delay : (unit -> 'a) -> 'a t val map : ('a -> 'b) -> 'a t -> 'b t val zip : 'a t * 'b t -> ('a * 'b) t val unzip : ('a * 'b) t -> 'a t * 'b t val ($) : ('a -> 'b) t -> 'a t -> 'b t (* This op is redundant but convenient *) val fix : ('a t -> 'a) -> 'a (* Use these operations to implement an event loop *) module Runtime : sig val tick : unit -> unit val force : 'a t -> 'a end end

The NEXT signature introduces a single type constructor ```
'a
t
```

, which can be thought of as the type of computations which
are scheduled to be evaluated on precisely the next tick of the
clock. The elements of `'a t`

are *dynamic* in the
sense that I mention above: we are only permitted to evaluate it on
the next tick of the clock, and evaluating it at any other time is an
error.

To model this kind of error, we also have a `Timing_error`

exception, which signals an error whose first argument contains the
time a thunk was scheduled to be evaluated, and whose second argument
contains the actual time.

Elements of `'a t`

are the only primitive way to create
dynamic values -- other values (like function closures) can be
dynamic, but only if they end up capturing a next-step thunk.

The `delay`

function lets us create a next value from a
thunk, and the `map`

function maps a function over a thunk.
The `zip`

and `unzip`

are used for pairing,
and the `$`

operation is the McBride/Paterson idiomatic
application operator. (Technically, it's derivable from `zip`

,
but it's easiest to throw it in to the basic API.)

The `fix`

operation is the one that really makes
reactive programming possible -- it says that *guarded recursion*
is allowed. So if we have a function which takes an `'a next`

and returns an `'a`

, then we can take a fixed point. This
fixed point will *always* never block the event loop, because its
type ensures that we always delay by a tick before making a recursive
call.

This raw interface is, honestly, not so useful as is, but the slightly miraculous fact is that this is the complete API we need to build all the higher-level abstractions --- like events and streams --- that we need to do real reactive programming.

Now, let's see what an implementation of this library could look like.

module Next : NEXT = struct let time = ref 0We can keep track of the current time in a reference cell.

type 'a t = { time : int; mutable code : 'a Lazy.t }The type of a thunk is a record consisting of a lazy thunk, and the time when it is safe to force it.

type s = Hide : 'a t -> s let thunks : s list ref = ref []We also have a list that stores all of the references that we've allocated. We'll use this list to enforce space-safety, by mutating any thunk that gets too old.

exception Timing_error of int * int let delay t = let t = { time = 1 + !time; code = Lazy.from_fun t} in thunks := (Hide t) :: !thunks; tWhen we create a thunk with the

`delay`

function, we are
creating a thunk to be forced on the next time tick. So we can dererefence
`time`

in order to find out the current time, and add 1 to
get the scheduled execution time for the thunk. We also add it to the
list `thunks`

, so that we can remember that we created it.
let force t = if t.time != !time then raise (Timing_error(t.time, !time)) else Lazy.force t.code

Forcing a thunk just forces the code thunk, *if* the current time
matches the scheduled time for the thunk. Otherwise, we raise a
`Timing_error`

. Note that memoization is handled by Ocaml's
built-in `'a Lazy.t`

type.

let map f r = delay (fun () -> f (force r)) let zip (r, r') = delay (fun () -> (force r, force r')) let unzip r = (map fst r, map snd r) let ($) f x = delay (fun () -> force f (force x))The

`map`

, `zip`

, `unzip`

, and `($)`

operators just force and delay things in the obvious places.
let rec fix f = f (delay (fun () -> fix f))The fixed point operation looks exactly like the standard lazy fixed point.

module Runtime = struct let force = forceThe runtime exposes the

`force`

function to the event loop.
let cleanup (Hide t) = let b = t.time < !time in (if b then t.code <- lazy (raise (assert false))); b let tick () = time := !time + 1; thunks := List.filter cleanup !thunks end end

The `tick`

function advances time by doing two things. First,
it increments the current time.
Then, it filters the list of thunks using the `cleanup`

function, which does two things. First, `cleanup`

returns
true if its argument is older than the current time. As a result, we
only retain thunks in `thunks`

which can be forced now or in
in the future.

Second, if the argument thunk to `cleanup`

is old, it
replaces the code body with an assertion failure, since no
time-correct program should ever force this thunk. Updating the code
ensures that by construction next-step thunks *always* lose
their reference to their data once they age out, because every
thunk is placed onto `thunks`

when it is created, and
when the clock is ticked past its time, it is guaranteed to drop
its references to its data.

This guarantees that spacetime leaks are *impossible*, since
we dynamically zero out any thunks that get too old! So here we see
how essential data abstraction is for *imperative* programming,
and not just functional programming.

As you can see, the implementation of the `Next`

library is
pretty straightforward. The only mildly clever thing we do is to keep
track of the next-tick computations so we can null them out when they
get too old.

You should be wondering now how we can actually write reactive
programs, when the primitive the API provides only lets you schedule a
computation to run on the next tick, and that's it. The answer is
*datatype declarations*. Now that we have a type that lets
us talk about time, We can re-use our host language's facility to
define types which say more interesting things about time.

Let's start with the classic datatype of functional reactive programming: streams. Streams are a kind of lazy sequence, which recursively give you a value now, and a stream starting tomorrow, thereby giving you a value on every time step.

module Stream : sig type 'a stream = Cons of 'a * 'a stream Next.t val head : 'a stream -> 'a val tail : 'a stream -> 'a stream Next.t val unfold : ('a -> 'b * 'a Next.t) -> 'a -> 'b stream val map : ('a -> 'b) -> 'a stream -> 'b stream val zip : 'a stream * 'b stream -> ('a * 'b) stream val unzip : ('a * 'b) stream -> 'a stream * 'b stream end = struct

We give a simple signature for streams above. They are
a datatype exactly following the English description above,
as well as a collection of accessor and constructor functions,
like `head`

, `tail`

, `map`

,
`unfold`

and so on. All of these have pretty much
the expected types.

The only difference from the usual stream types is that sometimes
we need a `Next.t`

to tell us *when* a value
needs to be available. Now let's look at the implementation.

open Next type 'a stream = Cons of 'a * 'a stream Next.t let head (Cons(x, xs)) = x let tail (Cons(x, xs)) = xsWe can write accessor functions for streams, for convenience.

let map f = fix (fun loop (Cons(x, xs)) -> Cons(f x, loop $ xs))The

`map`

function uses the `fix`

fixed point
operator in our API, because we want to call the recursive function at
a later time.
let unfold f = fix (fun loop seed -> let (x, seed) = f seed in Cons(x, loop $ seed))The unfold function uses a function

`f`

and an initial
seed value to incrementally produce a sequence of values. This is
exactly like the usual unfold, except we have to use the applicative
interface to the `'a Next.t`

type to apply the function.
let zip pair = unfold (fun (Cons(x, xs), Cons(y, ys)) -> ((x, y), Next.zip (xs, ys))) pair let unzip xys = fix (fun loop (Cons((x,y), xys')) -> let (xs', ys') = Next.unzip (loop $ xys') in (Cons(x, xs'), Cons(y, ys'))) xys end

`zip`

and `unzip`

work about the way we'd expect,
in that we use `Next.zip`

and `Next.unzip`

to
put together and take apart delayed pairs to build the ability to
put together and take apart streams.
This is all very nice, but the real power of giving a reactive API based on a
next-step type is that we can build types which *aren't* streams. For
example, let's give a datatype of `events`

, which is the type
of values which will become available at some point in the future,
but we don't know exactly when.

module Event : sig type 'a event = Now of 'a | Wait of 'a event Next.t val map : ('a -> 'b) -> 'a event -> 'b event val return : 'a -> 'a event val bind : 'a event -> ('a -> 'b event) -> 'b event val select : 'a event -> 'a event -> 'a event end = struct open Next type 'a event = Now of 'a | Wait of 'a event Next.tWe represent this with a datatype

`'a event`

, which has
two constructors. We say that an `'a event`

is either a value
of type `'a`

available `Now`

, or we have to
`Wait`

to get another event tomorrow. So this is a
single value of type `'a`

that could come at any time ---
and we don't know when!
let map f = fix (fun loop e -> match e with | Now x -> Now (f x) | Wait e' -> Wait (loop $ e'))We can map over events, by waiting until the value becomes available and then applying a function to the result.

let return x = Now x let bind m f = fix (fun bind m -> match m with | Now v -> f v | Wait e' -> Wait (bind $ e')) mEvents also form a monad, which corresponds to the ability to sequence promises or futures in the promises libraries you'll find in Javascript or Scala. (The

`bind`

here
is a bit like the code `promise.then()`

method in JS.
let select e1 e2 = fix (fun loop e1 e2 -> match e1, e2 with | Now a1, _ -> Now a1 | _, Now a2 -> Now a2 | Wait e1, Wait e2 -> Wait (loop $ e1 $ e2)) e1 e2 end

The really cool thing is that we can also join on two events to wait for the first one to complete! This can be extended to lists of events, if desired, but the pattern is easiest to see in the binary case.

Of course, here's a small example of how you can actually put
this together to actually run a program. The `run`

function gives an event loop that runs for `k`

steps and halts, and prints out the first `k`

elements of the stream it gets passed as an argument.

module Test = struct open Next open Stream let ints n = unfold (fun i -> (i, delay(fun () -> i+1))) n let rec run k xs = if k = 0 then () else let (x, xs) = (head xs, tail xs) in Printf.printf "%d\n" x; Runtime.tick(); run (k-1) (Runtime.force xs) end

I'm confused. It's been a little while since I've written OCaml, but it looks like your Next.fix takes only a single argument of function type, but your Stream.map is calling Next.fix with a two argument function, ie. fix (fun loop (Cons(x, xs)) -> ...)

ReplyDeleteWhat am I missing?

Because of currying, a "two argument" function is really a one argument function returning another one argument function. So defining n-ary recursive functions works just fine.

DeleteWhat is possible but annoying to do is to define mutually recursive functions, because then you need to take the fixed point of a whole tuple of functions at once. Semantically it makes perfect sense but the syntax is pretty ugly.

Right, I realized belatedly that it was probably the currying, which I'm just not used to seeing anymore.

DeleteWhile implementing this in C#, it wasn't clear that Next.fix was actually needed for Stream and Event. Next.fix simply recurses indefinitely with a delay at each tick.

Now consider Stream.Select from that link, which is .NET's map function. It invokes Next.Select (map), which itself already calls Next.delay, and the subsequent Next constructed for that Stream element simply calls Stream.Next recursively to continue this process indefinitely. That would seem to replicate the semantics of Next.fix without all the intermediate closures.

Is that sufficient, or am I missing something obvious here too?

Yes, that works fine with the current implementation. I'm personally leery of doing that because in the denotational semantics, the existence of fixed points is due to a property of the next-step type. As a result, it's possible in principle to give an implementation where the native fixed point is not the one you want, and so this leaks information about the implementation strategy. (However, it seems unlikely that you'd ever want such an implementation, so you're almost certainly okay -- it just rubs my semanticist intuition the wrong way.)

DeleteC# is already pretty leaky, so that seems like the best choice here. Finally added join with explicit forcing (I think it's correct), and the simple console test works. Very nice!

DeleteThis comment has been removed by a blog administrator.

ReplyDeleteA question...

ReplyDeleteIt looks like this implementation is geared towards applications which are tightly synchronized, in that all streams change at the same rate. If you have an event source which changes rarely (e.g. a configuration file) then it's rate needs to be increased to the rate of the fastest event source.

Does this seem correct to you? There are advantages to synchronized implementations (notably a lot less buffering) but also costs (lots of "my value hasn't changed" events).

Hi Alan, yes, you're right. Type-theoretically, the whole setup is basically the modal mu-calculus with a next-step constructor. This means that there is a single global notion of time which everything is synchronized with respect to. You could call this style of FRP "higher-order synchronous dataflow", if you like.

ReplyDeleteUnfortunately, most people think synchronous means as in I/O, hence calling it synchronized FRP. The naming of APIs is a difficult matter.

DeleteThe Oz language has "Declarative variables" which can be bound only once, and act as a sort of first-class futures.

ReplyDeleteStreams are implemented as a pair of "current value" and "next value" variables, in a way that resembles the NEXT treatment here.