Communicating Sequential Processes in a Scheme with Goroutines

A few days ago I posted a little scheme interpreter, it’s now time to continue this adventure and do some interesting things with it!

If you played around with Go already, you probably have heard about goroutines, channels or communicating sequential processes (CSP). If not, I advise you to read up a little before you continue. It’s worth your time if you want to do concurrent or parallel programming. The essence of CSP is that processes wait at the send/receive operation until “the other end” also requests communication. This in contrast with other models where communication is supposed to be direct.

In the 1978 paper by Sir Tony Hoare, the uses of CSP are demonstrated by a set of interesting examples. What I would like to do here is first add the go expression (and channel primitives) to scm.go of last time, and then implement a few of these examples.

Adding goroutines and channels to Scheme

The go expression

In Go, you can execute function foo() in a separate goroutine by writing go foo(). The compiler will pick up that pattern and create the construction for actually multiplexing foo()‘s execution on system threads.
In our Scheme interpreter, we’d like to write an expression (go foo) that will make foo evaluate in a separate goroutine, letting Go do the big work.

Unfortunately, following our earlier strategy, adding a go procedure to the global environment wouldn’t work. If go would be a primitive (or compound) procedure, the arguments would get evaluated before applying go, and only a value list would be left, not much to put on a goroutine.

Fortunately, there’s a way out; Lisp-based languages offer a few strategies for exactly these kinds of problems: expressions where the operands cannot be evaluated before applying the operator. One that we’re not going to use is macros, a language feature that allows you to programmatically transform Lisp code into other Lisp code before evaluating it. This can be used to create new kinds of expressions in terms of existing expressions, which has been largely done in core.async to get CSP features in Clojure.
We’re going change the evaluator itself. This makes sense when you want to expose host language features an interpreter or create a new Lisp. So let’s get on with our little ad-hoc language.

Adding the following snippet to the switch statement in eval is all we need to implement go expressions. It launches the evaluation of the second part of the expression in a new goroutine.

		case "go":
			go func() { eval(e[1], en) }();
			value = "ok"

Channels

Channels are slightly easier to add, their operators can be integrated in the language by adding primitive procedures. I’m going to use (-> chan value) for sending and (<- chan) for receiving, these prefix operators resemble the use of channels in go Go: the arrow pointing to or from the channel shows the operation.
Constructing a channel needs one argument: the buffer length. In Go, make(chan <type>) creates unbuffered or synchronous channels by default, so it’s equivalent to write make(chan , 0).

                        "make-chan": func(a ...scmer) scmer {
                                return make(chan scmer, int(a[0].(number)))
                        },
                        "->": func(a ...scmer) scmer {
                                a[0].(chan scmer) <- a[1]
                                return "ok"
                        },
                        "<-": func(a ...scmer) scmer {
                                return <-a[0].(chan scmer)
                        },

You can find the complete interpreter at scm.go’s github repo.

Some examples

Before we dive into the deeper stuff, let’s do some basic stuff in the REPL.

> (define fact (lambda (n) (if (<= n 1) 1 (* n (fact (- n 1))))))
==> ok
> (define chan (make-chan 0))
==> ok
> (go (begin (fact 1e5) (-> chan 1)))
==> ok

The machine is now calculating the factorial of 100 000 in a new goroutine on the background. We can do something else in the REPL, and later see if the factorial is calculated by listening on the channel. Doing this will block until the factorial goroutine puts 1 on chan, effectively signaling its end.
Of course, this factorial won’t take much time on a modern computer, and (fact 1e6) ends with a Goroutine stack error, so you can add a few more (fact 1e5) sequences to the begin expression ((go (begin (fact 1e5) (fact 1e5) (fact 1e5) (-> chan 1)))).

> (quote something-else)
==> something-else
> (<- chan)

Here we wait…

==> 1

And go on to more interesting stuff!

Communicating Sequential Processes

The inspiration for this post came through Thomas Kappler’s implementation of CSP in Go, which was useful in understanding the original CSP paper. Just as him, I got particularly interested by the examples of an iterative array to calculate factorials, and the highly concurrent matrix multiplication. Other examples, such as a concurrent version of Erastothenes’ prime sieve are already covered in a multitude of other languages.

The iterative array

Section 4.2 goes: “Problem: Compute a factorial by the recursive method, to a given limit”. This problem’s solution given in the paper’s extended BNF language is:

[fac(i:1..limit)::
*[n:integer; fac(i-1)?n->
  [n=0 -> fac(i-1)!1
   |n>0 -> fac(i+1)!n-1;
    r:integer; fac(i+1)?r; fac(i-1)!(n*r)
  ]
 ]
||fac(O)::USER
]

Wherein a!b and c?d respectively mean send value of expression b to destination a, and save to target variable d what source c receives. In the CSP examples, this communication is literally the input and output of a process, and there is always only one receiver/sender (another process) on such an I/O channel. This is different from what we know from Go, where chans are named and thus separated from standard I/O, and where multiple senders/receivers are also allowed. Hoare touches upon this possible extension of his implementation in CSP paper section 7.3 Port Names.

In the code, Hoare defines a series of limit processes, where every process is specified by a repetitive command. Those processes are named fac(1) until fac(limit), and are all started simultaneously by what is expressed before ||. A local variable n is created, and filled in by output of neighbor process fac(i-1). If this yields value zero, fac(i) sends 1 right back to fac(i-1). If it is not, it sends n-1 to its other neighbor fac(i+1), and waits for its answer; this answer will be stored in local variable r. After receiving, it sends n*r to fac(i-1).
The factorials will thus be calculated by a chain of processes, communicating only with their respective neighbors. This is a very unrealistic example, but for the same matter, the processes are running on different CPUs in a server park, and each process would be calculating a part of a much more complex problem. Things can quickly start to get very interesting. Calculation is started by passing user input to process fac(0), and ends with the factorial as output to this same process.

As I said before, CSP processes communicate directly with each other via naming another process. This is possible because a process’ scope extends over the whole of the parallel command. In scm.go, we’re going to use named ports (unbuffered Golang chans) instead, which essentially means that communication will be routed through pairs of port variables. This will give functionally equivalent, though much more readable code, as scm.go is a lexically scoped language. In this text, the words port and unbuffered channel can be freely interchanged.
Please do allow a slight renaming of the CSP processes fac(i-1), fac(i+1) to port variables left and right. We’re later going to chain these loops as processes (goroutines) communicating on those two ports.

connections in the chain of processes

An equivalent for the loop running in each process (denoted by *[] above) can then be expressed in scm. Since scm.go doesn’t have any looping constructs, the procedure fac-i recursively calls itself upon completion, effectively starting it again.

(define fac-i (lambda (left right)
	(begin
		(define n (<- left))
		(if (equal? n 1)
			(-> left 1)
			(begin
				(-> right (- n 1))
				(-> left (* n (<- right)))))
		(fac-i left right))))

We’ll build the chain of processes from right to left, creating the rightmost process, fac(limit), first. Every other fac(i) until i=0 will be chained onto the left of the rest. If the chain is complete, we’ll be communicating with its leftmost port.

We define a recursive procedure make-facchain that will recurse i times to create the communicating structure depicted above. Its second argument is an accumulator which holds the leftmost port to what’s already built. This accumulator will be used in the tail call, so each level of recursion holding the port to a longer chain of fac-is.

(define make-facchain (lambda (i acc)
	(if (equal? i 0)
		acc
		(make-facchain (- i 1) (chain-left fac-i acc)))))

The actual chaining of a new two-port process to acc happens as follows. Each time chain-left is called, a new channel is created; this will be the left port of the new process, also the return value of this procedure. The second argument of chain-left will be connected to the process’ right port. The procedure accepting two ports is run in a new goroutine after connecting. It can use those to communicate with its neighbors.

(define chain-left (lambda (procedure right)
	(begin
		(define left (make-chan 0))
		(go (procedure left right))
		left)))

This is all the machinery we need to create the iterative array from CSP. I wrote a small function fac that takes a number, creates a chain long enough, sends the number to its (left) port, and returns what comes back.
Note that fac(limit) doesn’t use its right port, the complete rightmost port of the chain. We can therefore start make-facchain up with the accumulator being just about anything, in this case: the number 0.

(define fac (lambda (x)
	(begin
		(define facport (make-facchain (+ x 1) 0))
		(-> facport x)
		(<- facport))))

Behold, running scm.go:

> (define fac-i (lambda (left right) (begin (define n ( left 1) (begin (-> right (- n 1)) (-> left (* n (<- right))))) (fac-i left right))))
==> ok
> (define make-facchain (lambda (i acc) (if (equal? i 0) acc (make-facchain (- i 1) (chain-left fac-i acc)))))
==> ok
> (define chain-left (lambda (procedure right) (begin (define left (make-chan 0)) (go (procedure left right)) left)))
==> ok
> (define fac (lambda (x) (begin (define facport (make-facchain (+ x 1) 0)) (-> facport x) (<- facport))))
==> ok
> (fac 9)
==> 362880

A concurrent matrix multiplication

From CSP Section 6.2 An Iterative Array: Matrix Multiplication:

Problem: A square matrix A of order 3 is given. Three streams are to be input, each stream representing a column of an array IN. Three streams are to be output, each representing a column of the product matrix IN × A. After an initial delay, the results are to be produced at the same rate as the input is consumed. Consequently, a high degree of parallelism is required. The solution should take the form shown in Figure 2. Each of the nine nonborder nodes inputs a vector component from the west and a partial sum from the north. Each node outputs the vector component to its east, and an updated partial sum to the south. The input data is produced by the west border nodes, and the desired results are consumed by south border nodes. The north border is a constant source of zeros and the east border is just a sink. No provision need be made for termination nor for changing the values of the array A.

Figure 2

Solution: There are twenty-one nodes, in five groups, comprising the central square and the four borders:

[M(i:1..3,0)::WEST
||M(0,j:1..3)::NORTH
||M(i:1..3,4)::EAST
||M(4,j:1..3)::SOUTH
||M(i:1..3,j:1..3)::CENTER
]

The WEST and SOUTH borders are processes of the user program; the remaining processes are:

NORTH = *[true -> M(1,j)!0]
EAST = *[x:real; M(i,3)?x -> skip]
CENTER = *[x:real; M(i,j-1)?x ->
    M(i,j+1)!x; sum:real;
    M(i-1,j)?sum; M(i+1,j)!A(i,j)*x + sum)]

We’ll first define these last three processes, which are all infinite loops. These loops work on ports named after their place on the nodes in the matrix, e.g. the north loop continuously sends 0 to its south port.

(define north-loop (lambda (south)
	(begin
		(-> south 0)
		(north-loop south))))

(define east-loop (lambda (west)
	(begin
		(<- west)
		(east-loop west))))
		
(define center-loop (lambda (north east south west aij)
	(begin
		(define x (<- west))
		(-> east x)
		(-> south  (+ (* aij x) (<- north)))
		(center-loop north east south west aij))))

It is possible, like it was in the previous example, to have the loops scope extend over the whole parallel command or handle the connections with list-lookups. I’m going to construct the square matrix recursively in Γ-shaped pieces, starting from the north-west and ending in the south-east. Each successive (smaller) piece will be connected with its north and west inputs respectively to the south and east outputs of the preceding level of recursion. The figure shows the first piece (center, east-wing and south-wing) of our 3×3-matrix.

Independently of the size of the matrix, this process will end with creating the one-node south-east piece with as inputs the east and south ports of the previous piece. The east output of this last piece will get connected to an east-loop process. The south port of this node — as with all south-most ports of other pieces — is one we actually care about: in this 3×3 case it will produce the 3rd column of the matrix product.

We can launch each center node with north and west input ports as arguments, as in procedure make-center. This procedure will create, connect and return their east and south output ports in a cons. I define east and south as shorthand for retrieving these.

(define make-center (lambda (north west aij)
	(begin
		(define east (make-chan 0))
		(define south (make-chan 0))
		(go (center-loop north east south west aij))
		(cons east south))))

(define east (lambda (center)
	(car center)))

(define south (lambda (center)
	(car (cdr center))))

The creation of Γ-pieces is part of recursive procedure make-matrix. This procedure will take two series of input ports (north-ports and west-ports), and accumulate the output ports in south-ports. Matrix A and its size are also supplied.

A corner node is simply a center node holding a diagonal element of A. The east-wing and south-wing, connected to this corner node, take up the rest of the supplied port inputs. They will produce the input ports for the next iteration.
chain-east also makes sure its east-most port is connected to an east-loop, while the south-wing returns its south-most (output) port for accumulation in south-ports.

The series of ports in make-matrix‘ arguments are lists, or better queues of ports, of which the contents are read in the same order as they are put in (FIFO) in the previous level. The west-most and north-most ports are consumed — and created — first, the east-most and south-most ports last. There is no queue in standard Scheme, and I refrained from implementing one, because we just introduced buffered channels, which are in fact queues with a finite length. The -> and <- operators are equivalent to push and pop for buffered channels, though you can still call them send/receive for ports.

The queues of ports for the next recursion level are created in make-matrix, filled up in chain-east and chain-south. The south-most port of chain-south is eventually pushed onto south-ports, which is also a queue.

(define make-matrix (lambda (north-ports west-ports south-ports i a)
	(if (null? a)
		south-ports
		(begin
			(define corner (make-center (<- north-ports) (<- west-ports) (car (car (car a)))))
			(define eastwing-ports (make-chan (- i 1)))
			(define southwing-ports (make-chan (- i 1)))
			(chain-east (east corner) north-ports eastwing-ports (cdr (car (car a))))
			(-> south-ports (chain-south (south corner) west-ports southwing-ports (car (cdr (car a)))))
			(make-matrix eastwing-ports southwing-ports south-ports (- i 1) (cdr a))))))

Chaining the matrix nodes looks a lot like the chaining of factorial nodes above, only four instead two ports are connected at each recursion level, and a row/column of A has to be supplied. In the east-wing, north-ports are popped from the queue, wile south-ports are pushed in the accumulator, and subsequent nodes chained to the east. In the south-wing, the analogue happens southwards.

(define chain-east (lambda (west north-ports south-ports ai)
	(if (null? ai)
		(go (east-loop west))
		(begin
			(define cur (make-center (<- north-ports) west (car ai)))
			(-> south-ports (south cur))
			(chain-east (east cur) north-ports south-ports (cdr ai))))))

(define chain-south (lambda (north west-ports east-ports aj)
	(if (null? aj)
		north
		(begin
			(define cur (make-center north (<- west-ports) (car aj)))
			(-> east-ports (east cur))
			(chain-south (south cur) west-ports east-ports (cdr aj))))))

Matrix A is passed to make-matrix in a list, such that at every level of recursion, the top row and left column of the remaining matrix are in the car. At iteration i, this means that (car a) is a pair of two lists, (car (car a)) contains the elements Ai,i to Ai,n; and (cdr (car a)) the elements Ai+1,i to An,i.
For example the 3×3 matrix with elements Ai,j = 10i+j will be the list (((11 12 13) (21 31)) ((22 23) (32)) ((33) ()))

It’s of course possible to write a list processing system to transform user input into the internal data structure, if this particular format means an inconvenience to the user.

As an example, I’ll now run this multiplication. We need to supply a queue of west and north inputs to make-matrix, so defining north nodes and input ports:

(define add-port (lambda (queue)
	(begin
		(define port (make-chan 0))
		(-> queue port)
		port)))

(define make-northwing (lambda (i)
(begin
	(define south-ports (make-chan i))
	(define add-northloops (lambda (i)
		(if (equal? i 0)
			0
			(begin
				(define south (add-port south-ports))
				(go (north-loop south))
			(add-northloops (- i 1))))))
	(add-northloops i)
	south-ports)))

(define north-ports (make-northwing 3))

(define west-ports (make-chan 3))
(define x (add-port west-ports))
(define y (add-port west-ports))
(define z (add-port west-ports))

It’s also time to define matrix A, and null?, which wasn’t there yet.

(define a (quote
	(((11 12 13) (21 31)) ((22 23) (32)) ((33) ()))
	))

(define null? (lambda (lis)
	(equal? lis (quote ()))))

Ready to launch the whole thing?

(define multiplicator (make-matrix north-ports west-ports (make-chan 3) 3 a))

(define xo (<- multiplicator))
(define yo (<- multiplicator))
(define zo (<- multiplicator))

Now we can send input columns and receive them multiplicated — at the same rate. Let’s do it by hand.
Launching $ cat examples/6.2.scm - | go run scm.go we can type

> (begin (-> x 1) (-> y 2) (-> z 3))
==> ok
> (<- xo)
==> 28
> (<- yo)
==> 33
> (<- zo)
==> 29
> (begin (-> x 3) (-> y 2) (-> z 1))
==> ok
> (<- xo)
==> 28
> (<- yo)
==> 31
> (<- zo)
==> 31
> (begin (-> x 2) (-> y 1) (-> z 3))
==> ok
> (<- xo)
==> 26
> (<- yo)
==> 33
> (<- zo)
==> 31