Getting started with Core.Async, Var, Atom, Agents & Refs

Introduction

Clojure propose different primitives to build concurrent programs. When I started looking at Clojure, the topic was fairly blurry in my mind and I couldn’t understand why it was done differently.

The way I see it now is:

The concurrency primitives of Clojure was made for the same reason we went from assembly to some higher level language: Easier abstraction that makes your program simpler to reason about.

At the end of the day, It still relies on the low level primitives we have in other languages such as Mutex and Semaphore but the abstraction it creates allows for programs that are not only safer but also simpler to understand and reason about.

Once we understand that, we need to grasp that fundamentally, there’s two ways to communicate and interact with subsystems:

  1. Message passage communication: that’s what we found in the nature! Independant subsytem that communicate with each other by sending messages.
  2. Share memory communication: usually relying on some form of locking to coordinate between threads.

We’ll introduce each of those approachs and provide the pointers to get to know more about those technics.

Message passing communication: Core.Async

This mechanisms allows different subsystems to communicate with each other using messaging queues. The interfaces to thoses systems in clojure are channels. A channel is a simple beast providing two basics operations:

  1. put a message to a channel
  2. take a message from a channel

From an architectural standpoint, this model allows subsystems to remain independant from each other.

The simplest possible code

(require '[clojure.core.async :as async])
;; create a channel
(def channel (async/chan))

;; put a message into the channel and detach it from the local thread
;; detaching the channel from the current thread is important so that it doesn't block the main thread
(async/go (async/>!! channel "insert0"))

;; take the message from the channel
(async/<!! channel)

Tips

  • If you’re using leiningen, you need to add core.async as a dependency for your project in your project.clj so that you have something like this: :dependencies [[org.clojure/clojure "1.8.0"] [org.clojure/core.async "0.2.395"]]

What else?

Everything else about core.async is an implementation detail to answers questions such as:

  • Can we let a producer or a consumer block our thread ? In other words should things happen synchronously / asynchronously? The response depends on your application and that’s why we have primitives to deal with this:
    • <! and <!! take a message from the channel. The first one is asynchronous and the latter one is synchronous.
    • >! and >!! put a value to a channel. The first one is asynchronous and the latter one is synchronous.
  • Do you want your channel to run on a separate thread? be part of a pool of thread manage by clojure?
    • go: reuse a pool of thread and let clojure manage it (good choice for things that are CPU bounded)
    • thread: spawn a new thread in the OS (good for things that aren’t CPU bounded)
  • Manage producer and consumer behaviors one on the other using different strategies:
    • losing a message is not acceptable (lossless strategy). The producer is block for the necessary time required by the consumer to do its tasks. That’s the fixed length buffer strategy: (def channel (async/chan 10))
    • losing a message is acceptable (aka backpressure operator):
      • drop the older message strategy: (def channel (async/chan (async/dropping-buffer 10)))
      • drop te newest message strategy: (def channel (async/chan (async/dropping-buffer 10)))
  • Manage messages from multiple channels simultaneously: alt! and alt!!

Going deeper:

Shared memory communication: var, ref, atoms & agents

Communication accross thread has to be done carefully to avoid problematic phenomenons such as deadlock. Things like the dining philosophers problem happens and are the main reasons that invite us to be cautious on how we do things. Clojure has powerfull primitive to deal with concurrency allowing shared memory communication.

Vars

The simplest construct that we should have already encounter:

(def my-var)

Vars are global values. If we update a var from a thread, the change won’t be seen by another thread. Vars are only sharing a default value (aka initial value) with every thread.

Key takeaway: a var is not a way to share a ressource within multiple threads.

Tips: The full documentation for a var

Atoms and Agents

Atom and Agents makes sharing ressources accross threads safe. There are similar on everything except on how they behave while confronted to asynchrony

;; --------------------------------------------------
;; Create & Initialize an atom and an agent
;; --------------------------------------------------
;; create an atom and initialize its value to "state-0"
(def my-atom (atom "state-0"))

;; create an agent and initialize its value to "state-0"
(def my-agent (agent "state-0"))
;; --------------------------------------------------
;; Read the value an atom/agent contains
;; => dereference an atom/agent
;; --------------------------------------------------
(println @my-atom) ;; => "state-0"

(println @my-agent) ;; => "state-0"
;; --------------------------------------
;; Track mutation happening in a entity
;; => we'll simply print the value pointed by our atom & agent.

;; for agents
(add-watch my-agent :my-listener (fn [key watched old-state new-state] (println "> new agent state: " new-state)))

;; for atoms
(add-watch my-atom :my-listener (fn [key watched old-state new-state] (println "> new atom state: " new-state)))
;; ------------------------------------
;; Change the value pointed by an atom/agent
;; => Mutate an agent and an atom
;; ------------------------------------

;; for an agent
(send my-agent (fn [_] "new-value"))
;; display: > new agent state: "new-value"

;; for an atom
(swap! my-atom (fn [_] "new-value"))
;; display: > new atom state: "new-value"

As you can see until now, both agent and atom behave in the same way. You update them by providing a callback that return the value the new value the agent/atom will take. The only difference beeing in the name of the function you’re using to mutate an agent/atom, respectively send and swap!.

The difference between both agent an atom is the following:

  • an agent is a ressources that can be shared accross threads in a asynchronous manner
  • an atom is a ressource that can be shared accross threads in a synchronous manner

In other words, an atom can block your thread, an agent can’t.

(defn create-new-value
  "a function that always return the same thing but is blocking the main thread where it's run"
  [_]
  (Thread/sleep 5000)
  "async-value")

;; ATOM
(do
  (swap! my-atom create-new-value)
  (println @my-atom))
;; display after 5 seconds:
;; > new atom state: "async-value"
;; "async-value"

;; AGENT
(do
  (send my-agent create-new-value)
  (println @my-agent))
;; display immediatly:
;; "new-value"
;; display 5 seconds later:
;; > new  state: "async-value"

What we should note here is:

  • the atom is blocking the main thread waiting for the callback to complete before dereferencing its value
  • the agent didn’t block the main thread, keeping the previous value and updating it once the callback completed.

A example of agents was demonstrated by Rich Hickey to simualte ants. If you thing about it, it makes a lot of sense as we’d want to simulate each ant as a separate entity behaving not in a sequential manner (aka one after the other) but concurrently in a non blocking way (aka asynchronosly).

A little secret about agents: we as programmers have two ways to update their value. Until now we’ve seen the send function but there’s another one: the send-off function that can be used in the same way. The only difference is the send will reuse a number of limitted thread define by clojure itself without having to go throught the overhead of creating a new thread with the OS while send-off will go throught the creation of a brand new thread. The key takeaway here is:

  • use send for CPU bounded operations so that you don’t block the pool of thread manage by clojure
  • use send-off for operations not bounded by the CPU (network request and other IO operations).

What we haven’t talk about here is:

  • How to manage error occuring while updating an agent/atom
  • Validator or how you can ensure an atom always remains valid from the perspective of your application
  • Lifecyle of an agent

Going further

Refs

A Refs has an interesting property in such as it behaves in the same way as a transatiction that would happen in a database but in memory. we call it a STM (aka a software transactionnal memory).

The idea here is to allow multiple entities to mutate together in a way that can be retry or roll back if an error occur.

Let’s simulate a transfert of monney from paul account to marie account.

(def paul-account (ref 300))
(def marie-account (ref 1000))

(defn log-account
  "A debug function showing the monney in paul and marie account"
  []
  (println "> account paul: " @paul-account)
  (println "> account marie: " @marie-account))


(defn transfer-monney
  "Make a transfer"
  [from-account to-account amount]
  (dosync
   (println "> doing a transfer")
   (alter from-account - amount)
   (alter to-account + amount)
   ))

(do
  (transfer-monney paul-account marie-account 400)
  (log-account))
;; display:
;; > account paul:  -100
;; > account marie:  1400

There’s two way to update a ref’s state withing a transation, we have alter and commute. They are interchangable but do different things on your behalf:

  • Alter is very strict and provide the safest approach to transaction: It used the function you pass to change the values and retries if it cannot guarantee that the value was unchanged from the start of the transaction
  • Commute is a bit less strict than alter as the transaction will be record even if it has conflicts on commute operation (but none on alter operation) For case where we don’t care about ordering, commute is good enough