// server sends messages to workers.
function startServer(messageCount, messageChan) {
print("server starting");
var i = 1;
while (i < messageCount + 1) {
print("server sending: " + i);
i -> messageChan;
print("server sent: " + i);
i = i + 1;
}
}
// workers receive messages over a channel, print them.
// and send a ack back to the sender on a channel.
function worker(name, messageChan, ackChan) {
print("worker " + name + " starting");
var message = null;
while (true) {
message = <- messageChan;
print("worker " + name + " received: " + message);
if (message == null) {
print("worker " + name + " stopped");
return;
}
print("worker " + name + " sending: " + message);
message -> ackChan;
print("worker " + name + " sent: " + message);
}
}
// start workers.
function startWorkers(workerCount, messageChan, ackChan) {
print("workers starting");
var i = 1;
while (i < workerCount + 1) {
function(name) {
spawn worker(name, messageChan, ackChan);
}(i);
i = i + 1;
}
print("workers scheduled to be started");
}
// server waits for acks from workers.
function waitForWorkers(messageCount, ackChan, doneChan) {
print("server waiting for acks");
var i = 1;
var message = null;
while (i < messageCount + 1) {
message = <- ackChan;
print("server received: " + message);
i = i + 1;
}
print("server received all acks");
null -> doneChan;
}
// stop workers.
function stopWorkers(workerCount, messageChan, doneChan) {
var done = <- doneChan;
print("workers stopping");
var i = 1;
while (i < workerCount + 1) {
null -> messageChan;
i = i + 1;
}
print("workers scheduled to be stopped");
}
var workerCount = 3;
var messageCount = 7;
var messageBufferSize = 5;
var ackBufferSize = 1;
var messageChan = newBufferedChannel(messageBufferSize);
var ackChan = newBufferedChannel(ackBufferSize);
var doneChan = newChannel();
startWorkers(workerCount, messageChan, ackChan);
spawn waitForWorkers(messageCount, ackChan, doneChan);
startServer(messageCount, messageChan);
stopWorkers(workerCount, messageChan, doneChan);