Sterling has too many projects

Blogging about Raku programming, microcontrollers & electronics, 3D printing, and whatever else...

»

Today I want to discuss the use of locks to make an object thread safe. That is, by employing a simple pattern for locking access to your object, you can effectively guarantee that only a single thread has access to any part of the object at a time. This, therefore, guarantees that the state of the object can never be corrupted, even when multiple threads attempt to access it concurrently.

Object-oriented Design

Before discussing concurrency with shared, mutable objects, let’s first consider what makes for a well-designed object in general. A newby mistake in object-oriented design is to think of objects as being containers of information. When designed this way, objects get mapped based on the data they contain and there is a temptation to expose that data through a series of getters and setters. A well-designed object, however, will contain state as a means of encapsulation functionality which needs that state to operate. It does this by allowing other objects to send it messages via methods to perform those operations which may update the internal state as a side-effect.

As a contrived example, if we’re building a linked list with only a push operation, we could build a simple list like this:

class LinkedListRaw {
    has $.data;
    has $.next is rw;
}

my $list = LinkedListRaw.new(data => 1);
$list.next = LinkedListRaw.new(data => 2);
$list.next.next = LinedListRaw.new(data => 3);

Whereas a much better design would be more like this:

class LinkedList {
    has $.data;
    has $!next;

    method !tail() {
        my $here = self;
        loop { return $here without $here.next; $here = $here.next }
    }

    method push($data) {
        self!tail.next = LinkedList.new(:$data);
    }
}

my $list = LinkedList.new(data => 1);
$list.push: 2;
$list.push: 3;

Now, the reason I took a moment to mention good object-oriented design practice is because the monitor pattern we will now consider relies on well-designed objects to be effective. (I also mention it because bad object-oriented design practices are pervasive even among otherwise good engineers.)

Monitors

Concurrency is easiest when you have a stateless system or a system whose state is based on transforming immutable objects. For example, if you just need to feed calculations from one thread to another, it’s very easy for each stage to keep a copy of the current value, transform that value, and pass a new copy on to the next stage.

However, stateful objects can present a challenge because a change to the object’s state might be only partially applied in one thread when another thread begins a new operation with it. If we don’t protect our object from performing multiple state changes at once or from reads while a state change is only partially complete, our code will not be thread safe. You should never use such an object from multiple threads at once. (Most built-in Raku objects are such objects!)

If you are in a situation were copying your object state is not practical and you need to share state between threads, a very easy solution is to employ the monitor pattern. Here’s a thread safe version of our push-only linked list from before using the monitor pattern:

class LinkedListSafe {
    has $.data;
    has $!next;
    has Lock $!lock .= new;

    method !tail() {
        my $here = self;
        loop { return $here without $here.next; $here = $here.next }
    }

    method push($data) {
        $!lock.protect: {
            self!tail.next = LinkiedListSafe.new(:$data);
        }
    }
}

That’s it. That’s the whole monitor pattern, just using a Lock to protect all thec code of every public method that reads or writes from the mutable state of the object. While this is easy, there are a couple of downsides to employing this pattern:

  1. This is generally a low-performing solution if contention for state changes is high. For example, if many threads will be performing many pushes frequently on this linked list, the performance will not be very good.

  2. Adding the $!lock.protect: { ... } bit around every section is tedious to do and easy to forget during development.

To improve the first situation make sure that your monitor only contains the code related to encapsulating object state. Create secondary objects that are not monitors for any calculations and actions that are not monitors for any other stateless work.

For the second, I recommend using a module by Jonathan Worthington. He has written a tool to automate the implementation of the monitor pattern. If you install OO::Monitors, you can rewrite the above linked list as:

use OO::Monitors;

monitor LinkedListMonitor {
    has $.data;
    has $!next;

    method !tail() {
        my $here = self;
        loop { return $here without $here.next; $here = $here.next }
    }

    method push($data) {
        self!tail.next = LinkiedListSafe.new(:$data);
    }
}

A monitor is a class implementing the monitor pattern. Every method is automatically protected by a lock for you.

In cases where you need to make a stateful object thread safe and you want a simple mechanism for doing it, this is a reasonable pattern to follow. If performance is a primary concern, a monitor object may not be for you, though. Finally, be aware that this pattern depends on thoughtful OO design to work at all.

Cheers.

»

In Raku, there are different ways to pause your code. The simplest and most obvious way to do so is to use sleep:

my $before = now;
sleep 1;
my $after = now - $before;
say $after;

Assuming your system is not bogged down at the moment, the output of $after should be a number pretty close to 1. Not very exciting: you didn’t do anything for a second. Woo. Woo.

In real code, however, you do sometimes need to pause. For example, you might be trying to send an email and it fails. When sending email, it is nice if it arrives quickly, but essential that it arrives eventually. Thus, when queueing up a message, you want to watch for errors. When they occur, you want to keep trying for a long time before giving up. However, you don’t want to keep trying constantly. You need to pause between retries:

If I were building a naïve implementation in memory (rather than a more reasonable on disk queue), I could do something like this:

start {
    my $retries = 10;
    my $success;
    for ^10 -> $retry {
        $success = sendmail();
        
        last if $success;

        sleep 60 * 2 ** $retry;
    }

    die "Never could send that email." unless $success;

    $success;
}

In this code, you have a sendmail function you assume does all the right things to send an email. You will try to send 10 times. You check for success and then you sleep using an exponentially lengthening interval that will spread the retries out for about the next 18 hours. After that, you will give up. To avoid blocking the regular work of the process for up to 18 hours, you ran the entire thing in a start block. The Promise returned will be kept whenever the email is sent or broken if the email fails.

There’s a problem, though. This code will block a thread for up to 18 hours. And Threads are a very finite resource: There are up to 64 in the default thread pool. That’s not good. That means our process can still work, but this thread is locked up doing a whole lot of nothing. Threads are expensive resources to use this way. If you have to send email more than once every 15 minutes, you will run out of threads.

How do you solve this? You could reconfigure Raku to use a scheduler with more threads in the resource pool, but the goal of threads is do stuff. Why would you want to waste a thread doing nothing unless you really have no work for them to do?

You can fix it in a way that frees up your threads to keep working and pauses the task. The await statement is a way for your code to tell Raku, “You can have my thread back, if you need it.” So let’s change that code above to read this way instead:

start {
    my $retries = 10;
    my $success;
    for ^10 -> $retry {
        $success = sendmail();
        
        last if $success;

        await Promise.in(60 * 2 ** $retry);
    }

    die "Never could send that email." unless $success;

    $success;
}

Now the code will go to sleep for the allotted time, but the thread is freed up for Raku to reuse, which means your application won’t be stuck waiting 18 hours while the mail server is down the next time you need to spam 65 people at once.

This is true in general, not just for pausing for sleep. Any time you use an await (so long as you have a version of Raku supporting spec 6.d or later), your Raku will able to reuse that thread if the thing being awaited is not ready yet.

Cheers.

»

Map-reduce is a common way of solving problems in functional programming. You have a list of items, you iterate through them to process them, and then you take the set and summarize them. We call this map-reduce because the iteration step is mapping values to new values and the summarize step reduces the number of values.

In Raku, map-reduce is a common programming pattern:

my $fibonacci = (1, 1, * + * ... *);
my $double-sum = $fibonacci[^100].grep(*.is-prime).map(* * 2).reduce(* + *);

This creates a Seq with the sequence operator. This is the classic Fibonacci sequence. We then take the first 100 elements of Fibonacci, filter out any non-primes, double the primes, and the sum the values.

That’s a weird operation, but demonstrates the kind of tasks you would perform with the map-reduce pattern. We take a sequence of data (the first 100 elements of the Fibonacci sequence, in this case), we filter the data to keep only the prime numbers, we double the values, and then we add them together to get a final sum.

In this case, the answer is not a difficult calculation and probably instantaneous, but what if we needed to do that operation for the first 4,000 instead? That will likely take seconds on a typical system today. As the .grep and .map must iterate over each value to filter and transform, there’s no particular reason we have to do those operations sequentially for every value.1

Raku provides tools that let you parallelize this task in different ways with only a small change. Consider this variation:

my $fibonacci = (1, 1, * + * ... *);
my $double-sum = $fibonacci[^100].race.grep(*.is-prime).map(* * 2).reduce(* + *);

By inserting the .race at the beginning, we tell Raku to perform the operation in parallel. It will split the task into parts, run the parts in separate tasks, which will be scheduled on separate threads. On my system, that operation runs 2-times to 3-times faster than the first.

Raku provides a couple different strategies for parallelizing map-reduce tasks.

  • The .race method breaks the operation up to perform the work in batches that are processed concurrently. It does nothing to guarantee that the order of the original items is preserved, though. The items are just returned as the threads finish.

  • The .hyper method is very similar to .race, but it guarantees that the order of the items from the original list is preserved. This means processing on later parts of the list will be held up if parts earlier in the list take longer. It is not quite as a efficient, but if you need to preserve the order, .hyper will guarantee that the order is preserved.

Each of these operations take a :batch and :degree parameter in case you want to customize the way the work is broken up and executed. Raku attempts to pick reasonable defaults, but tuning these for your particular setup will probably yield some improvements when you need to eek out even better performance.

The :degree option selects how many workers to start. For CPU bound work, it is generally best to pick a number equal to the number of CPU cores available. There’s no reason to go higher because you’ll never run more than that many jobs at once in such a case. However, if the work involves waiting on the disk or network, it’s very likely your code will be paused for milliseconds or longer waiting for IO. In those cases, it can be wise to increase :degree to several times the number of CPUs to account for the wait time.

The :batch option decides how to break the work up. A large number is useful when the work to be done is fast. This will keep your throughput high. A small number, even down to 1, is reasonable when the work is long or you want to get each result as soon as you can.

So, with that in mind, we could further tune the work above like this:

my $fibonacci = (1, 1, * + * ... *);
my $double-sum = $fibonacci[^4000].race(:batch(1000), :4degree).grep(*.is-prime).map(* * 2).reduce(* + *);

In this case, tuning doesn’t make a large difference on my 4-core laptop, but when there are more than 4 cores on your system, it is likely that tuning will help some.

So, any time you have a task where you need to iterate through items and operate on them and have CPU time to spare to speed them up, consider employing .hyper or .race in your code.

Cheers.


  1. Raku does some amount of optimization already for this operation. For example, it does not perform a separate loop for .grep and .map. These operations will be chained together in a sequence so there's essentially only a single iteration being performed.  ↩

»

Let’s consider now how to solve a big problem with concurrency. If you have an algorithmic problem with lots of data that needs to be processed, you want to maximize the amount of load you can place on the available CPU cores to process it as quickly as possible. To demonstrate how we can go about this in Raku, we will consider Conway’s Game of Life, played on an effectively infinite game board.1

Let’s start by defining Conway’s Game of Life, just in case you haven’t run across it before. The Game of Life is a simulation invented by British mathematician John Conway. It is played on a simple grid where each square is called a cell. Each cell may be either alive or dead during a given turn. Each cell has 8 neighbors, which are the cells directly above, below, left, right, and each of the 4 diagonals. To determine the state of a cell for the next turn, you perform the following checks using the state of the current cell and its neighbors from the current turn using these rules:

  1. Any live cell with fewer than two live neighbors dies.
  2. Any live cell with two or three neighbors lives on.
  3. Any live cell with more than three neighbors dies.
  4. Any dead cell with three neighbors comes to life.

If you want more details about this, you should check out the Wikipedia article on Conway’s Game of Life.

I have implemented Conway’s Game of Life in a nice long program with graphics (or text) output and all. However, it’s about 400 lines of code, so I’m not going to include all of it here. You can check out the ongoing evolution of this project on my github.

The simulator has a role named Game::Life::Player which defines the requirements for the player object. This object is responsible for performing the rules of the game. Specifically, the .next-turn-for method is given an immutable copy of the current game board, a set of bounds, and a mutable copy of the next game board to write to. It is responsible for turning the current board into the board for the next turn according to the rules just mentioned.

Here is a copy of that from the Game::Life::Player::Basic implementation, which is basically the simplest way to go about this:

role Game::Life::Player {
    ...
    method next-turn-for-cell(
        Int:D $x,
        Int:D $y,
        Board:D $current,
        Board:D $next,
    ) {
        # Is the cell currently live?
        my $live      = $current.cell($x, $y);

        # How many live neighbors does it currently have?
        my $neighbors = [+] $current.neighbors($x, $y);

        # If alive and has too many or too few neighbors, die.
        if $live && !(2 <= $neighbors <= 3) {
            return $next.kill($x, $y);
        }

        # if dead and has the right number of neighbors, come to life.
        elsif !$live && $neighbors == 3 {
            return $next.raise($x, $y);
        }

        else {
            return Nil;
        }
    }
}

class Game::Life::Player::Basic does Game::Life::Player {
    ...
    method next-turn-for(
        Int:D $l,
        Int:D $t,
        Int:D $r,
        Int:D $b,
        Board:D $current,
        Board:D $next,
    ) {
        for $l..$r -> $x {
            for $t..$b -> $y {
                self.next-turn-for-cell($x, $y, $current, $next);
            }
        }
    }
}

The implementation is simply iterating through every cell within the boundary and running .next-turn-for-cell on it. This method is implemented in the .role and just implements the rules as they apply to a single cell. Easy.2

Iterating through this in a single chunk is going to take a long time even for relatively small playing fields. To improve this situation, we can divide the work up into reasonable sized chunks and process each in a separate task. With multiple threads, we should be able to cut the time required to do the work down by up to a factor of N, where N is the number of cores available for computation. In reality, you will get somewhat less than that, but we should definitely be able to improve speed this way.

How might we do it? Here’s one possible solution that makes sure we never process chunks that are larger than 20 by 20, making for about 400 computations in a row. Gaining maximum efficiency requires some tuning, so a given system might do better with different numbers, but you get the idea.

Here’s an implementation of parallel-next-turn-for that’s part of the Game::Life::Player::DivideAndConquer player class:

class Game::Life::Player::DivideAndConquer is Game::Life::Player::Basic {
    ...
    method parallel-next-turn-for(
        Int:D $l,
        Int:D $t,
        Int:D $r,
        Int:D $b,
        Board:D $current,
        Board:D $next,
    ) {
        my @jobs = gather {
            if $r - $l > 20 {
                my $m = ceiling($l + ($r - $l)/2);
                #dd $l, $m, $r;

                take start self.parallel-next-turn-for($l, $t, $m - 1, $b, $current, $next);
                take start self.parallel-next-turn-for($m, $t, $r, $b, $current, $next);
            }

            elsif $b - $t > 20 {
                my $m = ceiling($t + ($b - $t)/2);
                #dd $t, $m, $b;

                take start self.parallel-next-turn-for($l, $t, $r, $m - 1, $current, $next);
                take start self.parallel-next-turn-for($l, $m, $r, $b, $current, $next);
            }

            else {
                take start self.next-turn-for($l, $t, $r, $b, $current, $next);
            }
        }

        await Promise.allof(@jobs);
    }
}

This takes the same input as before, but if there are too many columns to process, we cut the work in half by columns. If the columns are reasonable, but there are too many rows, we cut the work in half by rows. If the size is just right, we use the next-turn-for we inherited from Game::Life::Player::Basic.

Whether we split into two tasks or just do the work for some section of cells, we use start blocks to schedule the work and then await the outcome. Subdividing this way means we create a hierarchy of tasks that could subdivide and subdivide again. The Raku scheduler will then schedule the tasks to run as threads come available.

On my 2015 Macbook Pro, the game runs through 200 turns of the Gosper glider gun in around 35 seconds using 100% CPU when running sequentially. The same program runs around 20–25 seconds at close to 300% CPU when running in parallel. It would probably be higher if I didn’t also have the rendering task occasionally using CPU to redraw the graphics window. But what fun would that be?

So, that’s a concurrency pattern you can employ when you have multiple cores available and an algorithm that lends itself to being broken up into parts.

Cheers.


  1. I am not at all claiming this is an efficient way overall to simulate Conway's Game of Life, but I am demonstrating how to take one version of that simulator and making it more efficient with the use of concurrency.  ↩

  2. This is grossly inefficient as large sections of the board are very likely to be blank. As I already mentioned, though, my goal here is not to implement an efficient player, but to show how we can improve efficiency using concurrency.  ↩

»

What’s faster than locks? Compare-and-swap. Modern CPUs have multiple cores. As such, all modern CPUs must have tools for performing absolutely atomic operations to allow those multiple cores to work together. One of those operations is the compare-and-swap or cas operation.

In the abstract, a cas operation takes three arguments, a variable to modify, a given value, and a new value. The variable is set to the new value modified only if the current value held by the variable is equal to the given value. And this is performed as a single operation guaranteed to be safe from interference by any other concurrent operation. Along the way, the system sets a flag marking the success or failure of the operation. If the variable has a different value than expected, the variable is left unchanged and the operation fails. To complete an atomic change, you repeatedly run a calculation and end with a cas operation until it succeeds. That may not sound very efficient, but it turns out that in most cases, it is faster than locks.

Raku provides direct access to this operation through the cas function on the atomicint type and also through a number of operators which help you perform these atomic changes.

To demonstrate one possible use of atomicint, let’s first consider the ATM problem. Two people have access to a bank account. Let’s say the account has $1000 in it at the beginning of the day. Person One withdraws $100 from an ATM in city center and Person Two deposits $250 at the ATM in the airport. At the end of the day, we clearly want the new balance to be $1150. However, if both transactions happen simultaneously, that is not guaranteed unless we take some care to guarantee it.

Let’s start by writing our code in the naïve way:

my Int $balance = 1000;
start { $balance = $balance - 100 } # one
start { $balance = $balance + 250 } # two
say $balance;

Unfortunately, it is now possible to end up with a balance that is different form $1150. This is because if these two tasks actually run simultaneously, they perform operations that could be interleaved like this:

  1. Block one reads a $balance of 1000.
  2. Block two reads a $balance of 1000.
  3. Block one subtracts 100 from 1000 to get 900.
  4. Block two adds 250 to 1000 to get 1250.
  5. Block one sets the $balance to 900 for one.
  6. Block two sets the $balance to 1250 for two.

The read and write operations must be executed sequentially in order for this to code to work correctly. We could use a lock or semaphore or some other traditional construct to form a critical section around modifications to the variable, but these are typically pretty dang slow.

Instead, we can use a cas operation to perform the operation sequentially. So, if we rewrite the above code using atomicint we end up with this:

my atomicint $balance = 1000;
start { $balance ⚛️= ⚛️$balance - 100 } # one
start { $balance ⚛️= ⚛️$balance + 250 } # two
say $balance;

Here the result will always be the expected 1150. Let’s assume these two tasks run simultaneously exactly as before, but the final assignment is a compare-and-swap operation rather than a regular set. It would play out like this:

  1. Block one reads a $balance of 1000.
  2. Block two reads a $balance of 1000.
  3. Block one subtracts 100 from 1000 to get 900.
  4. Block two adds 250 to 1000 to get 1250.
  5. Block one performs compare-and-swap $balance from 1000 to 900 and succeeds.
  6. Block two performs compare-and-swap $balance from 1000 to 1250 and fails.
  7. Block two reads a $balance of 900.
  8. Block two adds 250 to 900 to get 1150.
  9. Block two performs compare-and-swap $balance from 900 to 1150 and succeeds.

The extra steps 7–9 occur because in a cas operation, the resolution of failure requires repeating the operation until it succeeds. Unless the levels of contention for write to a single variable are extreme, this should not lead to any task failing indefinitely.

If you don’t like emoji in your code, that’s fine. There’s a Texas function for performing each of the operations provide by an atomic emoji operator. The atomicint provides the following operations:

  • atomic-assign or ⚛️=
  • atomic-fetch or the ⚛️ prefix for performing atomic reads of the value
  • atomic-fetch-inc or ⚛️++ postfix
  • atomic-fetch-dec or ⚛️– postfix
  • atomic-fetch-add or ⚛️+=
  • atomic-fetch-sub or ⚛️-=
  • atomic-inc-fetch or ++⚛️ prefix
  • atomic-dec-fetch or –⚛️ prefix
  • cas

Let’s quickly consider the cas function itself which underlies the implementation for the other operators. This operation allows you to implement any operation that involves a compare-and-swap of an atomicint. It two basic forms and just to demonstrate how they work, we can re-implement our ATM problem from above using both forms.

First, consider this program using the cas function:

my atomicinc $balance = 1000;
sub update-balance($change-by) {
    my $new-balance;
    loop {
        my $old-balance = $balance;
        $new-balance = $balance + $change-by;
        if cas($balance, $old-balance, $new-balance) == $old-balence {
            last;
        }
    }
    return $new-balance;
}
start update-balance(-100); # one
start update-balance(250);  # two
say $balance;

This is functionally identical, though more verbose, than the operation above. This is basically giving you direct access to the cas operation itself and lets you control the number of retries and the specific operation performed. This function works something like this, though the entire operation is atomic:

multi sub cas(atomicint $target is rw, int $expected-value, int $new-value --> int) {
    my int $seen = $target;
    if $seen == $expected-value {
        $target = $new-value;
    }
    return $seen;
}

That means it will return the value it saw in every case, but will have changed the value stored in the target when what it saw matches what you expected.

There is a second version of this method, which can help streamline your code a bit by performing the looping for you. To write the ATM problem yet again, we can also write it like this:

my atomicint $balance = 1000;
sub update-balance($change-by) {
    cas $balance, -> $seen-value { $seen-value + $change-by }
}
start update-balance(-100); # one
start update-balance(250);  # two
say $balance;

This greatly shortens the code required to implement this operation. It does so by performing the given operation repeatedly in a loop until the cas operation succeeds. In this case, the cas function is defined like this, but again, using atomic operations where appropriate:

multi sub cas(atomicint $target is rw, &operation) {
    loop {
        my int $seen = $target;
        my $new-value = operation($seen);
        if $seen == $target { # still equal?
            $target = $new-value;
        }
        return $new-value;
    }
}

Be aware that the cas subroutine alternative that takes a function returns the new value that was set, while the cas that takes two integers returns the value that was seen. That is, this second and compact form treats the return value like an assignment operation, but the way the other form works does not allow for that treatment.

I’d really like to share with you how cas can be used on any scalar value in Raku, but this post has already gone long. I will cover that later this Advent.

Cheers.

»

When writing concurrent code in Raku, we want to avoid sharing data between tasks. This is because code that shares no data is automatically safe and doesn’t have to worry about interdependencies with other code. So, when you can, you should do your work through Supply, Promise, and Channel objects that are then synchronized together by a central thread. That way, all the state changes are safe.

This is not always practical, though. Sometimes it really is more efficient to share data that can change between tasks running on separate threads. However, such sharing is inherently unsafe.

For example, here’s a very simple multi-threaded application in Raku that is not thread safe and will lead to incorrect results:

my $x = 0;
my @p = (
    start for ^100 { $x++; sleep rand/10; },
    start for ^100 { $x++; sleep rand/10; },
);
await Promise.allof(@p);
say $x;

We might expect the value of $x to be 200, but it is very unlikely it will be 200. It will almost certainly be lower. This is because the simple $x++ operation needs to:

  1. Read the value of $x.
  2. Add one to the value read from $x.
  3. Store the calculated value back int $x.

If it happens that the first task performs step 1 and 2 and then second task performs steps 1 through 3 before the first task completes step 3, at least one of the increment operations will be lost. Over the course of 100 iterations in each task, I would expect 5 or 10 writes to be lost and each run is likely to give a slightly different answer. This is what it means to be unsafe when it comes to concurrency.

For this particular program, the three steps above constitute a critical section. A critical section is a piece of code that must be performed sequentially if it is to work at all. In the Raku code itself, the critical section is just the $x++ statement in each loop.

One mechanism for ensuring a critical section in your code is handled in a thread safe way is to use a mutual exclusion lock. Raku provides the Lock class which can be used for this purpose.

When you create a Lock object, you can .lock or .unlock the lock in your code. Calling the .lock method will block your code from running further if any other code has called .lock on the same object without calling .unlock. As soon as the thread holding the lock calls .unlock, another thread waiting for the lock to release will be allowed to continue.

In our example above, we might modify it as follows to make it thread safe:

my $x = 0;
my $lock = Lock.new;
my @p = (
    start for ^100 { $lock.lock; $x++; $lock.unlock; sleep rand/10; },
    start for ^100 { $lock.protect: { $x++ }; sleep rand/10; }
);
await Promise.allof(@p);
say $x;

I didn’t mention .protect, but it does the same thing as calling .lock, running the given block, and then running .unlock. It has the advantage though of doing a thorough job of making the .unlock call happens if something goes wrong inside the block. In the first loop above where we use .lock and .unlock, it is possible for an exception thrown to cause the lock to remain permanently locked. Using .protect automatically avoids this risk, so it’s the preferred way to use Lock.

Before I conclude, I want to mention a couple negatives to locks. First, locks do not perform very well. They are simple to implement and straightforward to use, but mutual exclusion can come at a high cost to performance. You probably want to make sure your locks are used sparingly and used only to protect the critical sections.

The other major drawback is that when locks are used, you can get thread safety, but may add the risk for deadlock. I mentioned one deadlock risk already: a task causing an error that prevents a lock from being unlocked. That lock is now dead and no task can take it. When multiple locks are involved, the risk for deadlock can be much more subtle and very difficult to find. Unlike Supply or Promise, a Lock is not safely composeable. This means two libraries using locks to secure themselves might deadlock when used together if you are not careful.

Despite their drawbacks, locks are useful things for making code thread safe. Later in this Advent Calendar we will use locks to demonstrate how to create thread safe data structures.

Cheers.

»

The react block in Raku is the primary means of re-synchronizing asynchronous coding activity. Using it, you can easily pull together promises, supplies, and channels to make a coherent whole of your program or a subsystem.

A react block itself can run any code you want plus one or more whenever blocks. The code in the block will run once and the block will exit either when a done subroutine is called or when all the objects associated with whenever blocks are finished (i.e., all promises are kept or broken, all supplies have quit or closed, and all channels have failed or closed).

Aside from the syntax, the key thing to note about a react block is that all code inside of the block will always run as if single-threaded (i.e., multiple threads might be employed, but never will any code within this block be run concurrently).

Let’s take a look at an example react block:

my $commands = Channel.new;
my $input = Supplier.new;
my $output = Supplier.new;
my $quit = Promise.new;
react {
    print '> ';

    start loop { $input.emit: $*IN.getc }

    whenever $input.Supply.lines.map({ .trim }) {
        when /^add \s+ (\d+) \s+ (\d+)$/ { $commands.send: ('add', +$0, +$1) }
        when /^sub \s+ (\d+) \s+ (\d+)$/ { $commands.send: ('sub', +$0, +$1) }
        when 'quit' | 'exit' { $quit.keep }
        default { $output.emit: 'syntax error' }
    }

    whenever $commands -> @command {
        multi doit('add', Int $a, Int $b) { $a + $b }
        multi doit('sub', Int $a, Int $b) { $a - $b }

        $output.emit: doit(|@command);
    }

    whenever $output.Supply { .say; print '> ' }

    whenever $quit {
        say 'Quitting.';
        done;
    }
}

This program provides a small interactive shell that can perform addition and subtraction. When run, you might use it as follows:

> add 4 5
9
> sub 10 7
3
> exit
Quitting.

As you can see the code operates with a couple Supply objects, a Channel, and a Promise. Each of them work with whenever in the expected way. All of the code within the run block runs as if in a single thread (though, there’s no particular guarantee that only a single thread is used, only that no code will run concurrently).

Running from a single task with no concurrency does, however, present a problem in this case. The $*IN file handle only performs blocking reads, even if you use the .Supply method to get the data asynchronously. Therefore, we must pull input in a task running in a background thread, which is why we put a start before the loop that reads character input. Without this concurrent task, we’d have to hit the Return key extra times to give the other whenever clauses a chance to run.

That said, we could move the work of the other whenever blocks into separate concurrent tasks and just pull each together in this react block and it would work just as well. The goal of the react block is to synchronize the asynchronous work in a straightforward syntax. I think it does the job pretty well.

Cheers.

»

A large number of concurrency-oriented coding in Raku depends on the use of a Scheduler. Many async operations depend on the default scheduler created by the VM at the start of runtime. You can access this via the dynamic variable named $*SCHEDULER.

The most important feature of a Scheduler is the .cue method. Calling that method with a code reference will schedule the work for execution. The type of scheduler will determine what exactly that means.

That said, this is a low-level interface and you probably shouldn’t be calling .cue in most code. The best practice is to rely on high-level tools like start blocks which due this and construct a Promise for you to monitor the work.

Every scheduler provides three methods:

  1. The .uncaught_handler is an accessor that returns a routine or can be set to a routine which is called whenever an exception is thrown by the scheduler and is not handled by the task code itself. If no handler is provided and a cued task throws an exception, the application will exit on that exception. If you use the high level concurrency tools, such as start blocks, the .uncaught_handler will never be used because they each provide their own exception handling.

  2. The .cue method is used to add a task to the schedule. The scheduler will perform that task as resources allow (depending on how the scheduler operates).

  3. The .loads method returns an integer indicating the current load on the scheduler. This is an indication of the size of the current job queue.

So, you could build a very simple scheduler like this:

class MyScheduler does Scheduler {
    method cue(&code, Instant :$at, :$in, :$every, :$times = 1; :&catch) {
        sleep $at - now if $at && $at > now;
        sleep $in if $in;


        for ^$times {
            code();
            CATCH { 
                default {
                    if &catch {
                        catch($_);
                    }
                    elsif self.uncaught_handler {
                        self.uncaught_handler.($_);
                    }
                    else {
                        .throw;
                    }
                }
            }
            sleep $every if $every;
        }

        class { method cancel() { } }
    }

    # We don't really queue jobs, so always return 0 for the load
    method loads(--> 0) { } 
}

This is somewhat similar to what the CurrentThreadScheduler does.

Rakudo has two schedulers built-in:

  • ThreadPoolScheduler is the usual default $*SCHEDULER. When it is constructed, you can set the number of threads it is permitted to use simultaneously. It then manages a pool of threads and will schedule cued tasks on those threads. As tasks complete, freeing up threads, the next tasks will be scheduled to run. Tasks may run concurrently with this scheduler. When .cue returns, the tasks may not have started yet. The .cancel method of the returned object may be used to request cancelling the work of a given task.

  • CurrentThreadScheduler is an alternate scheduler. It basically just executes the task immediately and returns after the task is complete. The returned cancellation object has a .cancel method, but it is a no op as the work will always have completed by the time the scheduler returns.

Many async methods, such as the .start method on Promise, take a named :scheduler argument where you can pass a custom scheduler. In general, you can stick to the default scheduler. Probably the most common adjustment to a scheduler would be to change the number of threads in the thread pool or switch to using the current thread scheduler under some circumstances. Chances are you will need to do neither of these. And if you need something exotic, it may be reasonable to define your own scheduler as well. Things to consider.

Cheers.

»

Warning! We are delving into the inner depths of Raku now. Threads are a low-level API and should be avoided by almost all applications. However, if your particular application needs direct Thread access, it is here for you.1

Use of the Thread class in Raku is straight-forward and looks very similar to what you would expect if you are familiar with threading tools in other languages:

my $t = Thread.start:
    name => 'Background task',
    :app_lifetime,
    sub {
        if rand > 0.5 { say 'weeble' }
        else { say 'wobble' }
    },
    ;

say "Starting $t.id(): $t.name()";
say "Main App Thread is $*THREAD.id(): $*THREAD.name()";

$t.finish; # wait for the thread to stop

Give the Thread.start method some code to run and you’re off. The name and app_lifetime options are optional. If app_lifetime is False (which is the default), the thread will be terminated when the main application thread terminates. If set to True, the application will continue to run as long as this thread is running. Under normal circumstances, only the main thread of your application has this privilege.

All code, runs within a thread. Your code can access the thread it is running in using the dynamic variable named $*THREAD. This can be helpful for pulling the .id when debugging to help understand which thread a task is running in at the moment.

When you want to pause the current thread to wait for another thread to finish, you do that with the .finish method (or you can use .join, which is a synonym for .finish).

Another way to run a thread is to use a combination of .new and .run. This is similar to .start, but code must be passed as a named argument to .new:

my $t2 = Thread.new:
    name => 'Another task',
    code => sub {
        loop {
            say 'stuff';
            sleep 1;
        }
    },
    ;

# The thread does not start until we...
$t2.run;

# And then we'd better wait for it or we'll exit immediately
$t2.finish;

For the most part, I make mention of threads in the advent calendar as a way of describing the “lanes” in which code runs. However, I will make use $*THREAD.id from time to time to help illustrate that code does run in different threads. Otherwise, I will generally ignore the Thread object directly.

Almost all Raku programs should stick to using start blocks or Promise.start to start tasks to run on another thread. You should only make use of Thread directly if you really need it, which is probably never or close to it for most Raku developers.

Cheers.


  1. As a point of clarification, a Thread object does not necessarily represent a specific OS thread, but it should get you as close as the implementation is able.  ↩

»

A Channel, in Raku, is an asynchronous queue of data. You can feed data in to one end of the queue and receive data at the other end safely, even when multiple threads are involved.

Let’s consider a variant of the Dining Philosopher Problem: We have five philosophers eating soup at the table. They do not talk to one another because they are too busy thinking about philosophy. However, there are only 2 spoons. Each time a philosopher wishes to take a sip of soup, she needs to acquire a spoon. Fortunately, each philosopher is willing to share spoons and sets her spoon at the center of the table after finishing each bite.

We can model this problem like this:

my $table = Channel.new;
my @philosophers = (^5).map: -> $p { 
    start {
        my $sips-left = 100;
        while $sips-left > 0 {
            my $spoon = $table.receive;
            say "Philosopher $p takes a sip with the $spoon.";
            $sips-left--;
            sleep rand;
            $table.send($spoon);
            sleep rand;
        }
    }
}
$table.send: 'wooden spoon';
$table.send: 'bamboo spoon';
await Promise.allof(@philosophers);

Here we have five tasks running in five threads, each contending for one of two resources. They will each take 100 sips of soup. Running this program will give you 500 lines of output similar to this:

...
Philosopher 0 takes a sip with the wooden spoon.
Philosopher 2 takes a sip with the bamboo spoon.
Philosopher 3 takes a sip with the wooden spoon.
Philosopher 1 takes a sip with the bamboo spoon.
Philosopher 4 takes a sip with the bamboo spoon.
Philosopher 2 takes a sip with the bamboo spoon.
Philosopher 0 takes a sip with the wooden spoon.
Philosopher 1 takes a sip with the wooden spoon.
Philosopher 0 takes a sip with the wooden spoon.
...

The code itself is very simple. We start five threads each representing a philosopher. Each philosopher calls to .receive the next available spoon. This method will block until a spoon becomes available. The philosopher takes a sip and then uses .send to return the spoon to the table for anyone else to use. Eventually, the philosopher finishes 100 sips and the Promise returned by start will be kept.

The main thread kicks off the process by placing two spoons on the table using .send. Then, it uses await to keep the program running until all the philosopher tasks are complete.

Channels have very low overhead as far as CPU is concerned. The senders will not wait on the receivers. The receivers can either block until something is available in the queue using .receive or they can use .poll to check for items without blocking. The cost is transferred to memory. The Channel must store all sent items internally until they are received and the queue will continue to grow until the program runs out of memory.

Therefore, a Channel is helpful when you have resources or messages you want to distribute, but not share between tasks. Or when you just need to communicate point to point. Job queues, resource pools, and peer-to-peer task communication are good examples of the sort of problems Channels are ideally suited for.

Cheers.