pubsub.co

// server sends messages to workers.
function startServer(messageCount, messageChan) {
  print("server starting");
  var i = 1;
  while (i < messageCount + 1) {
    print("server sending: " + i);
    i -> messageChan;
    print("server sent: " + i);
    i = i + 1;
  }
}

// workers receive messages over a channel, print them.
// and send a ack back to the sender on a channel.
function worker(name, messageChan, ackChan) {
  print("worker " + name + " starting");
  var message = null;
  while (true) {
    message = <- messageChan;
    print("worker " + name + " received: " + message);
    if (message == null) {
      print("worker " + name + " stopped");
      return;
    }
    print("worker " + name + " sending: " + message);
    message -> ackChan;
    print("worker " + name + " sent: " + message);
  }
}

// start workers.
function startWorkers(workerCount, messageChan, ackChan) {
  print("workers starting");
  var i = 1;
  while (i < workerCount + 1) {
    function(name) {
      spawn worker(name, messageChan, ackChan);
    }(i);
    i = i + 1;
  }
  print("workers scheduled to be started");
}

// server waits for acks from workers.
function waitForWorkers(messageCount, ackChan, doneChan) {
  print("server waiting for acks");
  var i = 1;
  var message = null;
  while (i < messageCount + 1) {
    message = <- ackChan;
    print("server received: " + message);
    i = i + 1;
  }
  print("server received all acks");
  null -> doneChan;
}

// stop workers.
function stopWorkers(workerCount, messageChan, doneChan) {
  var done = <- doneChan;
  print("workers stopping");
  var i = 1;
  while (i < workerCount + 1) {
    null -> messageChan;
    i = i + 1;
  }
  print("workers scheduled to be stopped");
}

var workerCount = 3;
var messageCount = 7;
var messageBufferSize = 5;
var ackBufferSize = 1;
var messageChan = newBufferedChannel(messageBufferSize);
var ackChan = newBufferedChannel(ackBufferSize);
var doneChan = newChannel();

startWorkers(workerCount, messageChan, ackChan);
spawn waitForWorkers(messageCount, ackChan, doneChan);
startServer(messageCount, messageChan);
stopWorkers(workerCount, messageChan, doneChan);