Concurrency In Clojure
Getting started with Core.Async, Var, Atom, Agents & Refs
Introduction
Clojure propose different primitives to build concurrent programs. When I started looking at Clojure, the topic was fairly blurry in my mind and I couldn’t understand why it was done differently.
The way I see it now is:
The concurrency primitives of Clojure was made for the same reason we went from assembly to some higher level language: Easier abstraction that makes your program simpler to reason about.
At the end of the day, It still relies on the low level primitives we have in other languages such as Mutex and Semaphore but the abstraction it creates allows for programs that are not only safer but also simpler to understand and reason about.
Once we understand that, we need to grasp that fundamentally, there’s two ways to communicate and interact with subsystems:
- Message passage communication: that’s what we found in the nature! Independant subsytem that communicate with each other by sending messages.
- Share memory communication: usually relying on some form of locking to coordinate between threads.
We’ll introduce each of those approachs and provide the pointers to get to know more about those technics.
Message passing communication: Core.Async
This mechanisms allows different subsystems to communicate with each other using messaging queues. The interfaces to thoses systems in clojure are channels. A channel is a simple beast providing two basics operations:
- put a message to a channel
- take a message from a channel
From an architectural standpoint, this model allows subsystems to remain independant from each other.
The simplest possible code
Tips
- If you’re using leiningen, you need to add core.async as a dependency for your project in your project.clj so that you have something like this:
:dependencies [[org.clojure/clojure "1.8.0"] [org.clojure/core.async "0.2.395"]]
What else?
Everything else about core.async is an implementation detail to answers questions such as:
- Can we let a producer or a consumer block our thread ? In other words should things happen synchronously / asynchronously? The response depends on your application and that’s why we have primitives to deal with this:
<!
and<!!
take a message from the channel. The first one is asynchronous and the latter one is synchronous.>!
and>!!
put a value to a channel. The first one is asynchronous and the latter one is synchronous.
- Do you want your channel to run on a separate thread? be part of a pool of thread manage by clojure?
- go: reuse a pool of thread and let clojure manage it (good choice for things that are CPU bounded)
- thread: spawn a new thread in the OS (good for things that aren’t CPU bounded)
- Manage producer and consumer behaviors one on the other using different strategies:
- losing a message is not acceptable (lossless strategy). The producer is block for the necessary time required by the consumer to do its tasks. That’s the fixed length buffer strategy:
(def channel (async/chan 10))
- losing a message is acceptable (aka backpressure operator):
- drop the older message strategy:
(def channel (async/chan (async/dropping-buffer 10)))
- drop te newest message strategy:
(def channel (async/chan (async/dropping-buffer 10)))
- drop the older message strategy:
- losing a message is not acceptable (lossless strategy). The producer is block for the necessary time required by the consumer to do its tasks. That’s the fixed length buffer strategy:
- Manage messages from multiple channels simultaneously: alt! and alt!!
Going deeper:
- This amazing video, and the corresponding code
- More information on channels on the official documentation
- Full API documentation
Shared memory communication: var, ref, atoms & agents
Communication accross thread has to be done carefully to avoid problematic phenomenons such as deadlock. Things like the dining philosophers problem happens and are the main reasons that invite us to be cautious on how we do things. Clojure has powerfull primitive to deal with concurrency allowing shared memory communication.
Vars
The simplest construct that we should have already encounter:
Vars are global values. If we update a var from a thread, the change won’t be seen by another thread. Vars are only sharing a default value (aka initial value) with every thread.
Key takeaway: a var is not a way to share a ressource within multiple threads.
Tips: The full documentation for a var
Atoms and Agents
Atom and Agents makes sharing ressources accross threads safe. There are similar on everything except on how they behave while confronted to asynchrony
As you can see until now, both agent and atom behave in the same way. You update them by providing a callback that return the value the new value the agent/atom will take. The only difference beeing in the name of the function you’re using to mutate an agent/atom, respectively send and swap!.
The difference between both agent an atom is the following:
- an agent is a ressources that can be shared accross threads in a asynchronous manner
- an atom is a ressource that can be shared accross threads in a synchronous manner
In other words, an atom can block your thread, an agent can’t.
What we should note here is:
- the atom is blocking the main thread waiting for the callback to complete before dereferencing its value
- the agent didn’t block the main thread, keeping the previous value and updating it once the callback completed.
A example of agents was demonstrated by Rich Hickey to simualte ants. If you thing about it, it makes a lot of sense as we’d want to simulate each ant as a separate entity behaving not in a sequential manner (aka one after the other) but concurrently in a non blocking way (aka asynchronosly).
A little secret about agents: we as programmers have two ways to update their value. Until now we’ve seen the send function but there’s another one: the send-off function that can be used in the same way. The only difference is the send will reuse a number of limitted thread define by clojure itself without having to go throught the overhead of creating a new thread with the OS while send-off will go throught the creation of a brand new thread. The key takeaway here is:
- use send for CPU bounded operations so that you don’t block the pool of thread manage by clojure
- use send-off for operations not bounded by the CPU (network request and other IO operations).
What we haven’t talk about here is:
- How to manage error occuring while updating an agent/atom
- Validator or how you can ensure an atom always remains valid from the perspective of your application
- Lifecyle of an agent
Going further
Refs
A Refs has an interesting property in such as it behaves in the same way as a transatiction that would happen in a database but in memory. we call it a STM (aka a software transactionnal memory).
The idea here is to allow multiple entities to mutate together in a way that can be retry or roll back if an error occur.
Let’s simulate a transfert of monney from paul account to marie account.
There’s two way to update a ref’s state withing a transation, we have alter and commute. They are interchangable but do different things on your behalf:
- Alter is very strict and provide the safest approach to transaction: It used the function you pass to change the values and retries if it cannot guarantee that the value was unchanged from the start of the transaction
- Commute is a bit less strict than alter as the transaction will be record even if it has conflicts on commute operation (but none on alter operation) For case where we don’t care about ordering, commute is good enough