Sterling has too many projects

Blogging about Raku programming, microcontrollers & electronics, 3D printing, and whatever else...

»

In Raku we have a couple basic ways of getting at the events emitted from a Supply, which begs the question, what’s the difference between each? I want to answer that question by creating a react block with a couple intervals and then emulate the same basic functionality using tap.

Let’s start with our base react block:

sub seconds { state $base = now; now - $base }
react {
    say "REACT 1: {seconds}";

    whenever Supply.interval(1) {
        say "INTERVAL 1-$_: {seconds}";
        done if $_ > 3;
    }

    say "REACT 2: {seconds}";

    whenever Supply.interval(0.5) {
        say "INTERVAL 2-$_: {seconds}";
    }

    say "REACT 3: {seconds}";
}

The seconds routine is just a helper to give us time in seconds from the start of the block to work from. The output from this block will typically be similar to this:

REACT 1: 0.0011569
REACT 2: 0.0068571
REACT 3: 0.008015
INTERVAL 1-0: 0.0092906
INTERVAL 2-0: 0.0101116
INTERVAL 2-1: 0.5103139
INTERVAL 1-1: 1.007995
INTERVAL 2-2: 1.022309
INTERVAL 2-3: 1.5124228
INTERVAL 1-2: 2.0137509
INTERVAL 2-4: 2.014717
INTERVAL 2-5: 2.517795
INTERVAL 1-3: 3.016291
INTERVAL 2-6: 3.0182612
INTERVAL 2-7: 3.521018
INTERVAL 1-4: 4.0182113

So what’s it mean? Well, first thing to note is that all the code in the react block itself runs first. That is, it runs all the commands, including each whenever block to register the event taps for each Supply, but not to run the code yet. Once the react block finishes running, it blocks until either all the whenever blocks are done or the done statement is encountered. At that point, all the supplies are untapped and execution continues.

By the way, if you want to have a block run after a react block has completed (or a supply block for that matter), you can use the special CLOSE phaser. A LEAVE phaser will exit immediately when the code in the react block finishes setting up the react.

Aside from that, it must be noted that everything related to the react block will only run in sequence. Raku doesn’t promise to run it in a single thread, but it does promise that no two parts of the code inside of a react block will run concurrently. This includes the first run through executing the react block itself as well as executing the whenever blocks in reaction to emitted values to supplies.

So, how would we go about this behavior using .tap? We could do it like this:

sub seconds { state $base = now; now - $base }
REACT: {
    say "REACT 1: {seconds}";

    my $ready = Promise.new;
    my $mutex = Lock.new;
    my $finished = my $done = Promise.new;

    my $interval1 = Supply.interval(1).tap: {
        await $ready;
        $mutex.protect: {
            say "INTERVAL 1-$_: {seconds}";
            $done.keep if $_ > 3;
        }
    }

    $finished .= then: -> $p {
        $interval1.close;
    }

    say "REACT 2: {seconds}";

    my $interval2 = Supply.interval(0.5).tap: {
        await $ready;
        $mutex.protect: {
            say "INTERVAL 2-$_: {seconds}";
        }
    }

    $finished .= then: -> $p {
        $interval2.close;
    }

    say "REACT 3: {seconds}";

    $ready.keep;
    await $finished;
}

This is similar to what the react block is actually doing, but with several additional manual steps. First we must prepare a couple of promises. The $ready Promise is kept at the end of the “REACT” block to release the taps to do their work. The $done Promise is where we hold the main thread until execution is complete.

I have not implemented the additional logic of automatically keeping $done if all supplies become done. Doing so could be done by creating another Promise for each tap that is kept when the tap done block is executed. A .then block could be attached to a Promise.allof() promise for all those promises. I leave solving that as an exercise for the reader.

The other major addition is the $mutex Lock object. This prevents the individual tap blocks from running simultaneously.

That should be enough. This is probably not the most efficient solution, but it does demonstrate the extra help the react block gives you. You may notice that the tap version is ever so slightly faster. This should not be a surprise. This tap version is not taking as much care and organization as the react block. Therefore, if eeking out a few extra milliseconds matters to your code, you may want to consider implementing your async coordination code directly using tap and some other tools rather than a react block. However, be aware that the react block is likely saving you a pile of headaches in debugging by doing all those fiddly little details for you.

And one final note, the documentation of the act method states that it works like tap, but the given code is executed by only one thread at a time. I’m really uncertain as to what this really means because this same basic guarantee is inherent to tap as well. This is because a Supply is unable to continue with another emitted message until all taps have finished running. In practice, taps all run synchronously for each message too. I haven’t found any evidence in all my work that taps on a given supply ever run concurrently. Anyway, if someone can go on to the Reddit thread for this post and explain what the actual difference is between tap and act, I would appreciate it.

Cheers.

»

A semaphore is a system of sending messages using flags. Oh wait, that’s what a semaphore is outside of computing. Among computers, a semaphore is like a kind of lock that locks after being acquired N times. This is useful for situations where you have a resource of N items, want to quickly distribute them when you know they are available, and then immediately block until a resource has been released. Raku provides a built-in Semaphore class for this reason:

class ConnectionPool {
    has @.connections;
    has Semaphore $!lock;

    submethod BUILD(:@!connections) {
        $!lock .= new(@!connections.elems);
    }

    method use-connection() {
        $!lock.acquire;
        pop @!connections;
    }

    method return-connection($connection) {
        push @!connections, $connection;
        $!lock.release;
    }
}

Here we have a connection pool where we can quickly and safely pull entries out of the stack of connections. However, as soon as the last connection has been pulled, the .use-connection method will block until a connection is returned using the .return-connection.

There is an additional .try_acquire method that can be used instead of .acquire, which returns a Bool that determines success or failure. For example, we might have a buffer for key presses that we want to fail if it fills up, rather than continuing to store key events:

class KeyBuffer {
    has UInt $.size;
    has UInt $!read-cursor = 0;
    has UInt $!write-cursor = 0;
    has byte @!key-buffer;
    has Semaphore $!buffer-space;
    has Semaphore $!lock .= new(1);

    submethod BUILD(UInt :$!size) {
        @!key-buffer = 0 xx $!size;
        $!buffer-free .= new($!size);
    }

    method !increment-cursor($cursor is rw) {
        $cursor++;
        $cursor %= $!size;
    }

    method store(byte $key) {
        $!buffer-space.try_acquire or die "buffer is full!"

        $!lock.acquire;
        LEAVE $!lock.free;

        @!key-buffer[ $!write-cursor ] = $key;
        self!increment-cursor($!write-cursor);
    }

    method getc(--> byte) {
        my $result = 0;

        $!lock.acquire;
        LEAVE $!lock.release;

        if $!read-cursor != $!write-cursor {
            $result = @!key-buffer[ $!read-cursor ];
            self!increment-cursor($!read-cursor);

            $!buffer-space.release;
        }
        
        $result;
    }
}

This data structure uses two Semaphores. One, named $!lock, is used in the same way a Lock works to guard the critical sections and make sure they are atomic. The other, $!buffer-space, is used to make sure the write operations fail when the buffer fills up.

As you can see, we use .try_acquire to acquire a resource from the Semaphore. If that method returns False, we throw an exception to let the caller know the operation failed. If the method returns True, then we have acquired permission to add another entry to the buffer. When we read from the buffer, we still use .release to mark the space available again.

I’ve used Semaphore for the mutual exclusion lock because it can be use that way and that’s what we’re talking about. However, the protect method of Lock or Lock::Async may be a better choice here as you don’t need to be careful to make sure .release gets called as the .protect block takes care of that for you. With that said, a LEAVE phaser is a good way to make sure .release is called as LEAVE phasers will be called no matter how the block exits (i.e., it runs even on an exception).

It should be noted that if an exception happens in the .getc method above after the $!read-cursor is incremented, but before $!buffer-space.release is called, you could end up with the buffer in a bad state where it no longer has as much space. As such, an improvement that might be worth doing is making sure that exceptions in that if-block are caught and dealt with if such an exception is possible.

A general thing to keep in mind is that whenever dealing with concurrency, the seemingly trivial edge cases can easily become important. Sometimes it becomes important in unforeseen ways.

Cheers.

»

One challenge I’ve often faced when writing asynchronous code is trying to figure out how to break the problem down in a reasonable way. How far do I go in breaking up my code? How many steps do I want to take? How do I deal with tasks that branch or fork? How do I deal with the interdependencies I’ve created. My hope in this article is to give some guidelines I’ve learned and some tools for answering these questions.

This advent calendar post will be focused on async problems. Later in the calendar, I come back and consider the same with a focus on concurrency.

The Special Character of Async Problems

This problem is different in character, but not different in real substance when compared to more traditional programming problems. Just as you do when you write software that needs to be broken up into pieces using functions, methods, and subroutines, you largely do the same with async.

So what is the special “character” that sets these async problems apart. Well, what makes an async program async? It is the separation between call and result. You initiate work and your code that works with the results works whenever two factors are true:

  1. You are ready to handle the work and
  2. The result is available.

Therefore, the special character of async problems is that you want to the work in such a way as to make sure both of those conditions are true as often as possible so that your code is ready to process the work for processing a result at the moment the result is ready to be processed.

That is the only special consideration you need when looking at breaking down async problems. Otherwise, your programming problems are all typical programming problems. That said, let’s now consider some practical considerations for various types of async coding methods in Raku.

Use react Blocks

My first piece of practical advice is to always use a [reactp(https://docs.raku.org/language/concurrency#index-entry-react) block whenever you need to gather your work together. A react block is the perfect place to coordinate multiple asynchronous processes that work together.

As an example, I use a Raku program to render this web site statically. The tool I developed for doing this has a mode in it called build-loop which watches for changes to files and rebuilds the site when those changes occur. In production, it monitors a socket which gets pinged whenever the sync tool detects a change to the master git repo. In development, it uses IO::Notification to watch for changes on disk and also runs a micro-web server so I can serve those files in a way that emulates the deployed system.

It has a master react block which looks something like this:

react {
    my $needs-rebuild = True;
    with $notify-port {
        whenever IO::Socket::Async.listen('127.0.0.1', $notify-port) -> $conn {
            # manage a connection to set $needs-rebuild on ping
        }
    }

    with $server-port {
        whenever IO::Socket::Async.listen('127.0.0.1', $server-port) -> $conn {
            # micro-web server for developer mode here
        }
    }

    whenever Supply.interval($interval, :$delay) {
        if $needs-rebuild {
            $needs-trigger--;
            build-site();
        }

        once {
            # configure IO::Notification to set $needs-rebuild on change
        }
    }
}

I’ve left out a lot of the details, but this should give you a feel for it. I am able to coordinate different means by which I can discover changes that cause a site rebuild. I have a tool for keeping the number of rebuilds to only as often as $interval seconds so groups of changes don’t re-trigger builds endlessly. I can simultaneously run a small web server for serving content in developer mode. And I’m doing all of this in the same event loop using a single thread.

The nice thing is that for every whenever within a react, we can share variables and state without ever having to worry about thread safety. The blocks may or may not be running on the same thread, but in any case, Raku guarantees they won’t run concurrently.

Therefore, a react block is perfect for coordinating various tasks together. Almost every async program I write has a main loop like this in it somewhere. If tasks are strongly independent, I may have one event loop for each group of tasks, with the react block running within a start. For example, I could have event loop for handling graphics updates and another for running a networking back-end.

Prefer Pipelines

Whenever you break down your problem you will often have a choice of creating a Supplier object and feeding data into it or pipelining. If you can pipeline, then you should pipeline. The simplest example of a pipeline is the .map method on Supply:

my $original = Supply.interval(1);
my Supply $plus-one = $original.map(* + 1);

Doing this vastly simplifies your processing. It clearly demonstrates the dependency one task has on the one before it. It is easy to read and follow. It will save you many headaches.

See the documentation for Supply for other similar mapping functions that are built in. One of my favorites is .lines for turning a Supply that emits strings into a list of strings broken up by newlines.

The Cro services platform formalizes this idea of a platform into transformations. Almost the entire system is one pipeline from request to response where each step of the way transforms input one step close to the eventual output. This is a very robust means of handling async processing.

Make Anything a Supply

If you are constructing a list, you can make that list using a supply block. This works best with non-trivial processes or when you have the need to reuse the Supply frequently.

my $primes = supply {
    for 1...* -> $n {
        emit $n if $n.is-prime;
    }
}

In the case of a trivial bit of processing like the supply block above and in a situation where you only need to tap it once, the simpler method may be to turn a Seq into a Supply by calling .Supply on the sequence:

my $primes = (1...*).grep(*.is-prime).Supply;

That latter example is functionally equivalent to the first and, in my opinion, much easier to read and follow. However, keep supply ready for when you need to generate a reusable Supply or one based on non-trivial logic.

Use Supplier for Splits and Joins

When you have a set of objects coming in that require different processing, you can insert an if statement to handle each case here or you can re-emit those items to be processed in separate streams. If the processing is non-trivial, consider using a separate Supplier object for each type of processing. Then, use one more Supplier to join the streams back together if necessary:

This is similar to deciding whether to use a separate subroutine or not for a given problem with multiple solutions. Instead of separate subs, you can use separate whenevers.

Consider this problem where we have a combined log and we want to treat error objects differently from access object:

react {
    my Supplier $emitter .= new;
    my Supplier $error .= new;
    my Supplier $access .= new;

    whenever $emitter.Supply { .say }
    whenever $error.Supply -> %e {
        $emitted.emit: "%e<timestamp> Error %e<code>: %e<message>";
    }
    whenever $access.Supply -> %a {
        $emitted.emit: "%a<timestamp> Access: %a<url>";
    }
    whenever $log.Supply.lines -> $line {
        given $line.&from-json {
            when so .<type> eq 'error' { $error.emit: $_ }
            when so .<type> eq 'access' { $access.emit: $_ }
            default { die "invalid log type" }
        }
    }
}

The reason splitting and joining can be better is that it can be a little easier to read and follow as each whenever is focused on a single task. In a case where one branch involves a longer process and the other branch involves a shorter process, it also allows you to consider how to best optimize each task separately.

On Demand vs. Live Supplies

You should be aware of the difference between kinds of supplies. The differences are somewhat subtle and can be used somewhat interchangeably.

  1. A live supply is created using the Supplier class. There is a single stream of events that are received by the current taps on the associated Supply object. If there are no taps, the events are not processed. If there are N taps, the .emit method of Supplier blocks until every tap has finished processing that event.

  2. An on-demand supply is created using a supply block or by calling the .Supply method on a list. Each tap of the Supply is effectively a separate process, receiving all the items generated by that supply object from start to end. The code that generates each item in the supply is run and again, the emit in the supply blocks until the single tap completes.

Essentially, a live supply uses a fan-out architecture while on-demand is really just a variation of Seq in how it behaves. I think of on-demand supplies as being just that, an adapter to make functions that return sequences work with whenever blocks.

Avoid Supplier::Preserving

Then, there is Supplier::Preserving. Some think of this as a middle ground between the two types. However, the semantics of this object are identical to a live supply, but with one exception: the object buffers events emitted when there are no taps and immediately dumps those objects into the first tap that comes along.

Therefore, it is primarily a convenience in cases where it can be difficult to initialize your taps before you begin emitting. For example:

my Supplier::Preserving $msg .= new;
$msg.emit($_) for ^10;
$msg.Supply.tap: { .say };

Even though the tap happens after emitting to $msg, the program will print the numbers 1 through 10.

The problem is that Supplier::Preserving has risks associated with it, such as ballooning memory or long iterations over old data when first tapped. Instead, you should prefer to use Supplier and make sure all of your taps are in place before emitting.

my Supplier $msg .new new;
$msg.Supply.tap: { .say }
$msg.emit($_) for ^10;

Or just be able to miss a few at the start. In some cases, you might actually want to use a Channel instead.

There are cases where Supplier::Preserving is handy, so make it use of it as needed. I’ve just found it to be an easy crutch for proper bootstrapping when I’m being lazy, but in most cases it annoys me as time goes on.

Break up Long Running whenever Blocks

What is reasonable for your task may vary, but remember that code running inside of a react block one whenever block will prevent all others from running. A react block is really just a thin veneer on the old fashioned event loop where any sub-task can starve the others of processing time.

For example, consider the react block I mention for the build-loop tool above. When the build-site() routine runs, my web server cannot refresh. Is that okay?

  1. It is a development process so I can tolerate some oddity in how the web server runs.
  2. I’m the only developer.
  3. It means that my web site waits until the site finishes building to refresh.
  4. I’d prefer to wait and only see fresh content.

Sounds like a win for me.

In production, I wouldn’t tolerate it. Old content now is almost always better than the freshest content if it is going to take more than a few milliseconds to build when it comes to web content. In that case, I would setup a separate web server thread. In this particular case, there’s no application server at all, just static content, so that’s not necessary.

That’s the sort of trade-off you have to decide when designing your whenever blocks. If a whenever block runs long, the other blocks are put off. If that’s a bad thing, break that whenever up into a series of smaller whenever blocks which are chained together. Each time you finish a whenever block in some long running process is an opportunity for a potentially starving task to take it’s turn.

If a task like this is still a problematic, you might need to move it to another thread via a start block.

Batch Short Tasks

Alternately, trivial tasks involve a certain amount of overhead for the react block to switch between. If a task is super fast, you might want to consider using the .batch method on Supply to allow you to loop over groups of elements to avoid switching tasks as often. The .batch method is handy because it will let you break up a problem on both a time delay and a number of elements. This will let your program spend more time doing real work and less time doing the busy work of deciding which task to schedule next.

Avoid Sleep

If you are in a react block, you do not want to call sleep unless your purpose is to block all execution on the current thread. Otherwise, you should prefer using an await to pause your task. If you do this, your react block can continue handling events until the await completes. If you need to add an await for a number of seconds, you can do this:

await Promise.in(10); # sleep 10 seconds

Beware of Deadlocks

Even though Raku’s interfaces are composable, it is still possible to end up with deadlocks if you use them inappropriately. Anything inside a react block is guaranteed to run in a sequential manner. This means that if you expect two whenever blocks to be able to run simultaneously, you will be disappointed when the code stops abruptly. I mention this because I run into this problem from time to time. Even though I know a react block enforces that single thread at a time rule, I still manage to imagine that multiple whenever blocks could run at the same time every now and then.

If you really need that, it is easy to fix. Just put a start block inside a whenever block and you can have two pieces of code running simultaneously.

Conclusion

That does it for now. In a few days, we will take up this conversation again, but instead async, we will consider the guidelines for divvying up work for concurrent processing.

Cheers.

»

Previously, I discussed the compare-and-swap operation as an operation to perform on atomicint variables. That’s just the tip of the iceberg. While most of the atomic emoji ⚛️ operators are only for atomicints, the cas function, atomic-fetch (or prefix ⚛️ operator), and atomic-assign (or ⚛️= operator) can all be used on any kind of Scalar variable.

First, we need to make sure we know what a Scalar is. In Raku, every variable name is associated with a container. If you want the full description of how that works, I recommend reading the language docs on Containers. For our purpose it will suffice to say that almost every regular variable starting with $ sigil is contained by a Scalar. If you do something special to initialize such a variable, it may not have a Scalar container.

Here’s a quick example that will should clarify enough for our purposes here:

# Any typical $ sigil variable represents a Scalar container
my $value = 42;

# Each index of an array is normally a Scalar container
my @array;
@array[0] = 10;

# Binding directly to Int, so this is NOT a Scalar.
my $constant := 100; # NOT Scalar

# Binding direclty on an array index is also NOT a Scalar.
@array[1] := 20; # NOT Scalar

# Proxy containers are NOT Scalar containers
my $special := Proxy.new(
    FETCH => method () { 10 }
    STORE => method ($v) { 10 }
)

If you try to use cas on a non-Scalar container, Raku will throw an exception that says something like “A Proxy container does not know how to do atomic compare and swap” so it should be pretty obvious what went wrong in most instances.

Enough of that. How do you use it? Let’s try an example:

my $atomic-string = '';
start {
    loop {
        cas $atomic-string, -> $v {
            if $v.ends-with('A') { "$vB" }
            else { $v }
        }
        sleep rand;
    }
}
start {
    loop {
        cas $atomic-string, -> $v {
            if $v eq '' || $v.ends-with('B') { "$vA" }
            else { $v }
        }
        sleep rand;
    }
}
start {
    loop {
        given ⚛️$atomic-string {
            if .ends-with('B') && .chars %% 10 { .say }
        }
        sleep rand;
    }
}

sleep 10;

This is kind of a useless program, but it demonstrates what you can do. We have just a regular variable storing an empty string to start. We then have three tasks running concurrently. The first uses a cas operation to see if the string ends with an “A” and adds a “B”, if so. The second uses the cas operation to see if the string is empty or ends with a “B” and adds a “A”, if so. The third will only output the string if it ends with a “B” and has a length that is a multiple of 10. It runs for 10 seconds and quits. It is an inefficient program and one run on my laptop had output like this:

ABABABABAB
ABABABABAB
ABABABABAB
ABABABABABABABABABAB
ABABABABABABABABABABABABABABAB

Essentially, every Scalar has access to an atomicint internally, which is used to lock the scalar while the change is being made. These can be more efficient than using Lock objects. When contention for access to some data is high, cas is likely to lose because each thread trying to work with the data will be busy waiting. However, when contention for the item is low and you don’t need to efficiently block and resume threads, a cas operation can be more efficient. It may require some AB testing of each approach to determine which is most efficient for your particular case.

Later this month, I plan to demonstrate in greater detail how this cas operation can be used to implement data structures without locks. So we will return to this topic again soon.

Cheers.

»

Today I want to discuss the use of locks to make an object thread safe. That is, by employing a simple pattern for locking access to your object, you can effectively guarantee that only a single thread has access to any part of the object at a time. This, therefore, guarantees that the state of the object can never be corrupted, even when multiple threads attempt to access it concurrently.

Object-oriented Design

Before discussing concurrency with shared, mutable objects, let’s first consider what makes for a well-designed object in general. A newby mistake in object-oriented design is to think of objects as being containers of information. When designed this way, objects get mapped based on the data they contain and there is a temptation to expose that data through a series of getters and setters. A well-designed object, however, will contain state as a means of encapsulation functionality which needs that state to operate. It does this by allowing other objects to send it messages via methods to perform those operations which may update the internal state as a side-effect.

As a contrived example, if we’re building a linked list with only a push operation, we could build a simple list like this:

class LinkedListRaw {
    has $.data;
    has $.next is rw;
}

my $list = LinkedListRaw.new(data => 1);
$list.next = LinkedListRaw.new(data => 2);
$list.next.next = LinedListRaw.new(data => 3);

Whereas a much better design would be more like this:

class LinkedList {
    has $.data;
    has $!next;

    method !tail() {
        my $here = self;
        loop { return $here without $here.next; $here = $here.next }
    }

    method push($data) {
        self!tail.next = LinkedList.new(:$data);
    }
}

my $list = LinkedList.new(data => 1);
$list.push: 2;
$list.push: 3;

Now, the reason I took a moment to mention good object-oriented design practice is because the monitor pattern we will now consider relies on well-designed objects to be effective. (I also mention it because bad object-oriented design practices are pervasive even among otherwise good engineers.)

Monitors

Concurrency is easiest when you have a stateless system or a system whose state is based on transforming immutable objects. For example, if you just need to feed calculations from one thread to another, it’s very easy for each stage to keep a copy of the current value, transform that value, and pass a new copy on to the next stage.

However, stateful objects can present a challenge because a change to the object’s state might be only partially applied in one thread when another thread begins a new operation with it. If we don’t protect our object from performing multiple state changes at once or from reads while a state change is only partially complete, our code will not be thread safe. You should never use such an object from multiple threads at once. (Most built-in Raku objects are such objects!)

If you are in a situation were copying your object state is not practical and you need to share state between threads, a very easy solution is to employ the monitor pattern. Here’s a thread safe version of our push-only linked list from before using the monitor pattern:

class LinkedListSafe {
    has $.data;
    has $!next;
    has Lock $!lock .= new;

    method !tail() {
        my $here = self;
        loop { return $here without $here.next; $here = $here.next }
    }

    method push($data) {
        $!lock.protect: {
            self!tail.next = LinkiedListSafe.new(:$data);
        }
    }
}

That’s it. That’s the whole monitor pattern, just using a Lock to protect all thec code of every public method that reads or writes from the mutable state of the object. While this is easy, there are a couple of downsides to employing this pattern:

  1. This is generally a low-performing solution if contention for state changes is high. For example, if many threads will be performing many pushes frequently on this linked list, the performance will not be very good.

  2. Adding the $!lock.protect: { ... } bit around every section is tedious to do and easy to forget during development.

To improve the first situation make sure that your monitor only contains the code related to encapsulating object state. Create secondary objects that are not monitors for any calculations and actions that are not monitors for any other stateless work.

For the second, I recommend using a module by Jonathan Worthington. He has written a tool to automate the implementation of the monitor pattern. If you install OO::Monitors, you can rewrite the above linked list as:

use OO::Monitors;

monitor LinkedListMonitor {
    has $.data;
    has $!next;

    method !tail() {
        my $here = self;
        loop { return $here without $here.next; $here = $here.next }
    }

    method push($data) {
        self!tail.next = LinkiedListSafe.new(:$data);
    }
}

A monitor is a class implementing the monitor pattern. Every method is automatically protected by a lock for you.

In cases where you need to make a stateful object thread safe and you want a simple mechanism for doing it, this is a reasonable pattern to follow. If performance is a primary concern, a monitor object may not be for you, though. Finally, be aware that this pattern depends on thoughtful OO design to work at all.

Cheers.

»

In Raku, there are different ways to pause your code. The simplest and most obvious way to do so is to use sleep:

my $before = now;
sleep 1;
my $after = now - $before;
say $after;

Assuming your system is not bogged down at the moment, the output of $after should be a number pretty close to 1. Not very exciting: you didn’t do anything for a second. Woo. Woo.

In real code, however, you do sometimes need to pause. For example, you might be trying to send an email and it fails. When sending email, it is nice if it arrives quickly, but essential that it arrives eventually. Thus, when queueing up a message, you want to watch for errors. When they occur, you want to keep trying for a long time before giving up. However, you don’t want to keep trying constantly. You need to pause between retries:

If I were building a naïve implementation in memory (rather than a more reasonable on disk queue), I could do something like this:

start {
    my $retries = 10;
    my $success;
    for ^10 -> $retry {
        $success = sendmail();
        
        last if $success;

        sleep 60 * 2 ** $retry;
    }

    die "Never could send that email." unless $success;

    $success;
}

In this code, you have a sendmail function you assume does all the right things to send an email. You will try to send 10 times. You check for success and then you sleep using an exponentially lengthening interval that will spread the retries out for about the next 18 hours. After that, you will give up. To avoid blocking the regular work of the process for up to 18 hours, you ran the entire thing in a start block. The Promise returned will be kept whenever the email is sent or broken if the email fails.

There’s a problem, though. This code will block a thread for up to 18 hours. And Threads are a very finite resource: There are up to 64 in the default thread pool. That’s not good. That means our process can still work, but this thread is locked up doing a whole lot of nothing. Threads are expensive resources to use this way. If you have to send email more than once every 15 minutes, you will run out of threads.

How do you solve this? You could reconfigure Raku to use a scheduler with more threads in the resource pool, but the goal of threads is do stuff. Why would you want to waste a thread doing nothing unless you really have no work for them to do?

You can fix it in a way that frees up your threads to keep working and pauses the task. The await statement is a way for your code to tell Raku, “You can have my thread back, if you need it.” So let’s change that code above to read this way instead:

start {
    my $retries = 10;
    my $success;
    for ^10 -> $retry {
        $success = sendmail();
        
        last if $success;

        await Promise.in(60 * 2 ** $retry);
    }

    die "Never could send that email." unless $success;

    $success;
}

Now the code will go to sleep for the allotted time, but the thread is freed up for Raku to reuse, which means your application won’t be stuck waiting 18 hours while the mail server is down the next time you need to spam 65 people at once.

This is true in general, not just for pausing for sleep. Any time you use an await (so long as you have a version of Raku supporting spec 6.d or later), your Raku will able to reuse that thread if the thing being awaited is not ready yet.

Cheers.

»

Map-reduce is a common way of solving problems in functional programming. You have a list of items, you iterate through them to process them, and then you take the set and summarize them. We call this map-reduce because the iteration step is mapping values to new values and the summarize step reduces the number of values.

In Raku, map-reduce is a common programming pattern:

my $fibonacci = (1, 1, * + * ... *);
my $double-sum = $fibonacci[^100].grep(*.is-prime).map(* * 2).reduce(* + *);

This creates a Seq with the sequence operator. This is the classic Fibonacci sequence. We then take the first 100 elements of Fibonacci, filter out any non-primes, double the primes, and the sum the values.

That’s a weird operation, but demonstrates the kind of tasks you would perform with the map-reduce pattern. We take a sequence of data (the first 100 elements of the Fibonacci sequence, in this case), we filter the data to keep only the prime numbers, we double the values, and then we add them together to get a final sum.

In this case, the answer is not a difficult calculation and probably instantaneous, but what if we needed to do that operation for the first 4,000 instead? That will likely take seconds on a typical system today. As the .grep and .map must iterate over each value to filter and transform, there’s no particular reason we have to do those operations sequentially for every value.1

Raku provides tools that let you parallelize this task in different ways with only a small change. Consider this variation:

my $fibonacci = (1, 1, * + * ... *);
my $double-sum = $fibonacci[^100].race.grep(*.is-prime).map(* * 2).reduce(* + *);

By inserting the .race at the beginning, we tell Raku to perform the operation in parallel. It will split the task into parts, run the parts in separate tasks, which will be scheduled on separate threads. On my system, that operation runs 2-times to 3-times faster than the first.

Raku provides a couple different strategies for parallelizing map-reduce tasks.

  • The .race method breaks the operation up to perform the work in batches that are processed concurrently. It does nothing to guarantee that the order of the original items is preserved, though. The items are just returned as the threads finish.

  • The .hyper method is very similar to .race, but it guarantees that the order of the items from the original list is preserved. This means processing on later parts of the list will be held up if parts earlier in the list take longer. It is not quite as a efficient, but if you need to preserve the order, .hyper will guarantee that the order is preserved.

Each of these operations take a :batch and :degree parameter in case you want to customize the way the work is broken up and executed. Raku attempts to pick reasonable defaults, but tuning these for your particular setup will probably yield some improvements when you need to eek out even better performance.

The :degree option selects how many workers to start. For CPU bound work, it is generally best to pick a number equal to the number of CPU cores available. There’s no reason to go higher because you’ll never run more than that many jobs at once in such a case. However, if the work involves waiting on the disk or network, it’s very likely your code will be paused for milliseconds or longer waiting for IO. In those cases, it can be wise to increase :degree to several times the number of CPUs to account for the wait time.

The :batch option decides how to break the work up. A large number is useful when the work to be done is fast. This will keep your throughput high. A small number, even down to 1, is reasonable when the work is long or you want to get each result as soon as you can.

So, with that in mind, we could further tune the work above like this:

my $fibonacci = (1, 1, * + * ... *);
my $double-sum = $fibonacci[^4000].race(:batch(1000), :4degree).grep(*.is-prime).map(* * 2).reduce(* + *);

In this case, tuning doesn’t make a large difference on my 4-core laptop, but when there are more than 4 cores on your system, it is likely that tuning will help some.

So, any time you have a task where you need to iterate through items and operate on them and have CPU time to spare to speed them up, consider employing .hyper or .race in your code.

Cheers.


  1. Raku does some amount of optimization already for this operation. For example, it does not perform a separate loop for .grep and .map. These operations will be chained together in a sequence so there's essentially only a single iteration being performed.  ↩

»

Let’s consider now how to solve a big problem with concurrency. If you have an algorithmic problem with lots of data that needs to be processed, you want to maximize the amount of load you can place on the available CPU cores to process it as quickly as possible. To demonstrate how we can go about this in Raku, we will consider Conway’s Game of Life, played on an effectively infinite game board.1

Let’s start by defining Conway’s Game of Life, just in case you haven’t run across it before. The Game of Life is a simulation invented by British mathematician John Conway. It is played on a simple grid where each square is called a cell. Each cell may be either alive or dead during a given turn. Each cell has 8 neighbors, which are the cells directly above, below, left, right, and each of the 4 diagonals. To determine the state of a cell for the next turn, you perform the following checks using the state of the current cell and its neighbors from the current turn using these rules:

  1. Any live cell with fewer than two live neighbors dies.
  2. Any live cell with two or three neighbors lives on.
  3. Any live cell with more than three neighbors dies.
  4. Any dead cell with three neighbors comes to life.

If you want more details about this, you should check out the Wikipedia article on Conway’s Game of Life.

I have implemented Conway’s Game of Life in a nice long program with graphics (or text) output and all. However, it’s about 400 lines of code, so I’m not going to include all of it here. You can check out the ongoing evolution of this project on my github.

The simulator has a role named Game::Life::Player which defines the requirements for the player object. This object is responsible for performing the rules of the game. Specifically, the .next-turn-for method is given an immutable copy of the current game board, a set of bounds, and a mutable copy of the next game board to write to. It is responsible for turning the current board into the board for the next turn according to the rules just mentioned.

Here is a copy of that from the Game::Life::Player::Basic implementation, which is basically the simplest way to go about this:

role Game::Life::Player {
    ...
    method next-turn-for-cell(
        Int:D $x,
        Int:D $y,
        Board:D $current,
        Board:D $next,
    ) {
        # Is the cell currently live?
        my $live      = $current.cell($x, $y);

        # How many live neighbors does it currently have?
        my $neighbors = [+] $current.neighbors($x, $y);

        # If alive and has too many or too few neighbors, die.
        if $live && !(2 <= $neighbors <= 3) {
            return $next.kill($x, $y);
        }

        # if dead and has the right number of neighbors, come to life.
        elsif !$live && $neighbors == 3 {
            return $next.raise($x, $y);
        }

        else {
            return Nil;
        }
    }
}

class Game::Life::Player::Basic does Game::Life::Player {
    ...
    method next-turn-for(
        Int:D $l,
        Int:D $t,
        Int:D $r,
        Int:D $b,
        Board:D $current,
        Board:D $next,
    ) {
        for $l..$r -> $x {
            for $t..$b -> $y {
                self.next-turn-for-cell($x, $y, $current, $next);
            }
        }
    }
}

The implementation is simply iterating through every cell within the boundary and running .next-turn-for-cell on it. This method is implemented in the .role and just implements the rules as they apply to a single cell. Easy.2

Iterating through this in a single chunk is going to take a long time even for relatively small playing fields. To improve this situation, we can divide the work up into reasonable sized chunks and process each in a separate task. With multiple threads, we should be able to cut the time required to do the work down by up to a factor of N, where N is the number of cores available for computation. In reality, you will get somewhat less than that, but we should definitely be able to improve speed this way.

How might we do it? Here’s one possible solution that makes sure we never process chunks that are larger than 20 by 20, making for about 400 computations in a row. Gaining maximum efficiency requires some tuning, so a given system might do better with different numbers, but you get the idea.

Here’s an implementation of parallel-next-turn-for that’s part of the Game::Life::Player::DivideAndConquer player class:

class Game::Life::Player::DivideAndConquer is Game::Life::Player::Basic {
    ...
    method parallel-next-turn-for(
        Int:D $l,
        Int:D $t,
        Int:D $r,
        Int:D $b,
        Board:D $current,
        Board:D $next,
    ) {
        my @jobs = gather {
            if $r - $l > 20 {
                my $m = ceiling($l + ($r - $l)/2);
                #dd $l, $m, $r;

                take start self.parallel-next-turn-for($l, $t, $m - 1, $b, $current, $next);
                take start self.parallel-next-turn-for($m, $t, $r, $b, $current, $next);
            }

            elsif $b - $t > 20 {
                my $m = ceiling($t + ($b - $t)/2);
                #dd $t, $m, $b;

                take start self.parallel-next-turn-for($l, $t, $r, $m - 1, $current, $next);
                take start self.parallel-next-turn-for($l, $m, $r, $b, $current, $next);
            }

            else {
                take start self.next-turn-for($l, $t, $r, $b, $current, $next);
            }
        }

        await Promise.allof(@jobs);
    }
}

This takes the same input as before, but if there are too many columns to process, we cut the work in half by columns. If the columns are reasonable, but there are too many rows, we cut the work in half by rows. If the size is just right, we use the next-turn-for we inherited from Game::Life::Player::Basic.

Whether we split into two tasks or just do the work for some section of cells, we use start blocks to schedule the work and then await the outcome. Subdividing this way means we create a hierarchy of tasks that could subdivide and subdivide again. The Raku scheduler will then schedule the tasks to run as threads come available.

On my 2015 Macbook Pro, the game runs through 200 turns of the Gosper glider gun in around 35 seconds using 100% CPU when running sequentially. The same program runs around 20–25 seconds at close to 300% CPU when running in parallel. It would probably be higher if I didn’t also have the rendering task occasionally using CPU to redraw the graphics window. But what fun would that be?

So, that’s a concurrency pattern you can employ when you have multiple cores available and an algorithm that lends itself to being broken up into parts.

Cheers.


  1. I am not at all claiming this is an efficient way overall to simulate Conway's Game of Life, but I am demonstrating how to take one version of that simulator and making it more efficient with the use of concurrency.  ↩

  2. This is grossly inefficient as large sections of the board are very likely to be blank. As I already mentioned, though, my goal here is not to implement an efficient player, but to show how we can improve efficiency using concurrency.  ↩

»

What’s faster than locks? Compare-and-swap. Modern CPUs have multiple cores. As such, all modern CPUs must have tools for performing absolutely atomic operations to allow those multiple cores to work together. One of those operations is the compare-and-swap or cas operation.

In the abstract, a cas operation takes three arguments, a variable to modify, a given value, and a new value. The variable is set to the new value modified only if the current value held by the variable is equal to the given value. And this is performed as a single operation guaranteed to be safe from interference by any other concurrent operation. Along the way, the system sets a flag marking the success or failure of the operation. If the variable has a different value than expected, the variable is left unchanged and the operation fails. To complete an atomic change, you repeatedly run a calculation and end with a cas operation until it succeeds. That may not sound very efficient, but it turns out that in most cases, it is faster than locks.

Raku provides direct access to this operation through the cas function on the atomicint type and also through a number of operators which help you perform these atomic changes.

To demonstrate one possible use of atomicint, let’s first consider the ATM problem. Two people have access to a bank account. Let’s say the account has $1000 in it at the beginning of the day. Person One withdraws $100 from an ATM in city center and Person Two deposits $250 at the ATM in the airport. At the end of the day, we clearly want the new balance to be $1150. However, if both transactions happen simultaneously, that is not guaranteed unless we take some care to guarantee it.

Let’s start by writing our code in the naïve way:

my Int $balance = 1000;
start { $balance = $balance - 100 } # one
start { $balance = $balance + 250 } # two
say $balance;

Unfortunately, it is now possible to end up with a balance that is different form $1150. This is because if these two tasks actually run simultaneously, they perform operations that could be interleaved like this:

  1. Block one reads a $balance of 1000.
  2. Block two reads a $balance of 1000.
  3. Block one subtracts 100 from 1000 to get 900.
  4. Block two adds 250 to 1000 to get 1250.
  5. Block one sets the $balance to 900 for one.
  6. Block two sets the $balance to 1250 for two.

The read and write operations must be executed sequentially in order for this to code to work correctly. We could use a lock or semaphore or some other traditional construct to form a critical section around modifications to the variable, but these are typically pretty dang slow.

Instead, we can use a cas operation to perform the operation sequentially. So, if we rewrite the above code using atomicint we end up with this:

my atomicint $balance = 1000;
start { $balance ⚛️= ⚛️$balance - 100 } # one
start { $balance ⚛️= ⚛️$balance + 250 } # two
say $balance;

Here the result will always be the expected 1150. Let’s assume these two tasks run simultaneously exactly as before, but the final assignment is a compare-and-swap operation rather than a regular set. It would play out like this:

  1. Block one reads a $balance of 1000.
  2. Block two reads a $balance of 1000.
  3. Block one subtracts 100 from 1000 to get 900.
  4. Block two adds 250 to 1000 to get 1250.
  5. Block one performs compare-and-swap $balance from 1000 to 900 and succeeds.
  6. Block two performs compare-and-swap $balance from 1000 to 1250 and fails.
  7. Block two reads a $balance of 900.
  8. Block two adds 250 to 900 to get 1150.
  9. Block two performs compare-and-swap $balance from 900 to 1150 and succeeds.

The extra steps 7–9 occur because in a cas operation, the resolution of failure requires repeating the operation until it succeeds. Unless the levels of contention for write to a single variable are extreme, this should not lead to any task failing indefinitely.

If you don’t like emoji in your code, that’s fine. There’s a Texas function for performing each of the operations provide by an atomic emoji operator. The atomicint provides the following operations:

  • atomic-assign or ⚛️=
  • atomic-fetch or the ⚛️ prefix for performing atomic reads of the value
  • atomic-fetch-inc or ⚛️++ postfix
  • atomic-fetch-dec or ⚛️– postfix
  • atomic-fetch-add or ⚛️+=
  • atomic-fetch-sub or ⚛️-=
  • atomic-inc-fetch or ++⚛️ prefix
  • atomic-dec-fetch or –⚛️ prefix
  • cas

Let’s quickly consider the cas function itself which underlies the implementation for the other operators. This operation allows you to implement any operation that involves a compare-and-swap of an atomicint. It two basic forms and just to demonstrate how they work, we can re-implement our ATM problem from above using both forms.

First, consider this program using the cas function:

my atomicinc $balance = 1000;
sub update-balance($change-by) {
    my $new-balance;
    loop {
        my $old-balance = $balance;
        $new-balance = $balance + $change-by;
        if cas($balance, $old-balance, $new-balance) == $old-balence {
            last;
        }
    }
    return $new-balance;
}
start update-balance(-100); # one
start update-balance(250);  # two
say $balance;

This is functionally identical, though more verbose, than the operation above. This is basically giving you direct access to the cas operation itself and lets you control the number of retries and the specific operation performed. This function works something like this, though the entire operation is atomic:

multi sub cas(atomicint $target is rw, int $expected-value, int $new-value --> int) {
    my int $seen = $target;
    if $seen == $expected-value {
        $target = $new-value;
    }
    return $seen;
}

That means it will return the value it saw in every case, but will have changed the value stored in the target when what it saw matches what you expected.

There is a second version of this method, which can help streamline your code a bit by performing the looping for you. To write the ATM problem yet again, we can also write it like this:

my atomicint $balance = 1000;
sub update-balance($change-by) {
    cas $balance, -> $seen-value { $seen-value + $change-by }
}
start update-balance(-100); # one
start update-balance(250);  # two
say $balance;

This greatly shortens the code required to implement this operation. It does so by performing the given operation repeatedly in a loop until the cas operation succeeds. In this case, the cas function is defined like this, but again, using atomic operations where appropriate:

multi sub cas(atomicint $target is rw, &operation) {
    loop {
        my int $seen = $target;
        my $new-value = operation($seen);
        if $seen == $target { # still equal?
            $target = $new-value;
        }
        return $new-value;
    }
}

Be aware that the cas subroutine alternative that takes a function returns the new value that was set, while the cas that takes two integers returns the value that was seen. That is, this second and compact form treats the return value like an assignment operation, but the way the other form works does not allow for that treatment.

I’d really like to share with you how cas can be used on any scalar value in Raku, but this post has already gone long. I will cover that later this Advent.

Cheers.

»

When writing concurrent code in Raku, we want to avoid sharing data between tasks. This is because code that shares no data is automatically safe and doesn’t have to worry about interdependencies with other code. So, when you can, you should do your work through Supply, Promise, and Channel objects that are then synchronized together by a central thread. That way, all the state changes are safe.

This is not always practical, though. Sometimes it really is more efficient to share data that can change between tasks running on separate threads. However, such sharing is inherently unsafe.

For example, here’s a very simple multi-threaded application in Raku that is not thread safe and will lead to incorrect results:

my $x = 0;
my @p = (
    start for ^100 { $x++; sleep rand/10; },
    start for ^100 { $x++; sleep rand/10; },
);
await Promise.allof(@p);
say $x;

We might expect the value of $x to be 200, but it is very unlikely it will be 200. It will almost certainly be lower. This is because the simple $x++ operation needs to:

  1. Read the value of $x.
  2. Add one to the value read from $x.
  3. Store the calculated value back int $x.

If it happens that the first task performs step 1 and 2 and then second task performs steps 1 through 3 before the first task completes step 3, at least one of the increment operations will be lost. Over the course of 100 iterations in each task, I would expect 5 or 10 writes to be lost and each run is likely to give a slightly different answer. This is what it means to be unsafe when it comes to concurrency.

For this particular program, the three steps above constitute a critical section. A critical section is a piece of code that must be performed sequentially if it is to work at all. In the Raku code itself, the critical section is just the $x++ statement in each loop.

One mechanism for ensuring a critical section in your code is handled in a thread safe way is to use a mutual exclusion lock. Raku provides the Lock class which can be used for this purpose.

When you create a Lock object, you can .lock or .unlock the lock in your code. Calling the .lock method will block your code from running further if any other code has called .lock on the same object without calling .unlock. As soon as the thread holding the lock calls .unlock, another thread waiting for the lock to release will be allowed to continue.

In our example above, we might modify it as follows to make it thread safe:

my $x = 0;
my $lock = Lock.new;
my @p = (
    start for ^100 { $lock.lock; $x++; $lock.unlock; sleep rand/10; },
    start for ^100 { $lock.protect: { $x++ }; sleep rand/10; }
);
await Promise.allof(@p);
say $x;

I didn’t mention .protect, but it does the same thing as calling .lock, running the given block, and then running .unlock. It has the advantage though of doing a thorough job of making the .unlock call happens if something goes wrong inside the block. In the first loop above where we use .lock and .unlock, it is possible for an exception thrown to cause the lock to remain permanently locked. Using .protect automatically avoids this risk, so it’s the preferred way to use Lock.

Before I conclude, I want to mention a couple negatives to locks. First, locks do not perform very well. They are simple to implement and straightforward to use, but mutual exclusion can come at a high cost to performance. You probably want to make sure your locks are used sparingly and used only to protect the critical sections.

The other major drawback is that when locks are used, you can get thread safety, but may add the risk for deadlock. I mentioned one deadlock risk already: a task causing an error that prevents a lock from being unlocked. That lock is now dead and no task can take it. When multiple locks are involved, the risk for deadlock can be much more subtle and very difficult to find. Unlike Supply or Promise, a Lock is not safely composeable. This means two libraries using locks to secure themselves might deadlock when used together if you are not careful.

Despite their drawbacks, locks are useful things for making code thread safe. Later in this Advent Calendar we will use locks to demonstrate how to create thread safe data structures.

Cheers.