This guide covers:

  • Clojure's identity/value separation
  • Clojure reference types and their concurrency semantics: atoms, refs, agents, vars
  • Dereferencing
  • Delays, futures and promises
  • Watches and validators
  • How to use java.util.concurrent from Clojure
  • Other approaches to concurrency available on the JVM
  • Other topics related to concurrency

This work is licensed under a Creative Commons Attribution 3.0 Unported License (including images & stylesheets). The source is available on Github.

What Version of Clojure Does This Guide Cover?

This guide covers Clojure 1.11.

Before You Read This Guide

This is one of the most hardcore guides of the entire Clojure documentation project. It describes concepts that are simple but may seem foreign at first. These concepts are some of the key points of Clojure design. Understanding them may take some time for folks without a concurrent programming background. Don't let this learning curve discourage you.

If some parts are not clear, please ask for clarification in the #clojure-doc channel on Slack (self-signup at clojurians.net) or file an issue on GitHub. We will work hard on making this guide easy to follow with edits and images to illustrate the concepts.

Introduction and Terminology

Before we get to the Clojure features related to concurrency, lets lay a foundation and briefly cover some terminology.

TermDefinition This Guide Uses
ConcurrencyWhen multiple threads are making progress, whether it is via time-slicing or parallelism
ParallelismA condition that arises when at least two threads are executing simultaneously, e.g., on multiple cores or CPUs.
Shared StateWhen multiple threads of execution need to mutate (modify) one or more pieces of program state (e.g., variables, identities)
Mutable Data StructuresData structures that, when changed, are updated "in place"
Immutable Data StructuresData structures that, when changed, produce new data structures (copies), possibly with optimizations such as internal structural sharing
Concurrency HazardsConditions that occur in concurrent programs that prevent program from being correct (behaving the way its authors intended).
Shared Mutable StateWhen shared state is made of mutable data structures. A ripe ground for concurrency hazards.

There are many concurrency hazards, some of the most common and well known are:

Concurrency HazardBrief Description
Race ConditionA condition when the outcome is dependent on timing or relative ordering of events
DeadlockWhen two or more threads are waiting on each other to finish or release a shared resource, thus waiting forever and not making any progress
LivelockWhen two or more threads are technically performing computation but not doing any useful work (not making progress), for example, because they endlessly pass a piece of data to each other but never actually process it
StarvationWhen a thread is not given regular access to a shared resource and cannot make progress.

These hazards are not exclusive to threads and can happen with OS processes, runtime processes and any other execution processes. They are also not specific to a particular runtime or VM (e.g., the JVM) or programming language. Admittedly, some languages make it significantly easier to write correct, safe concurrent programs, but none are completely immune to concurrency hazards. More often than not, concurrency hazards are algorithmic problems, languages just encourage or discourage certain practices and techniques.

Thread-safe code is code that is always executed correctly and does not suffer from concurrency hazards even when executed concurrently from multiple threads.

Overview

One of Clojure design goals was to make concurrent programming easier. The thinking is that as modern CPUs add more and more cores and the number of CPUs is increasing as well, the biggest contributor to application throughput will come from making use of those resources.

The key design decision was making Clojure data structures immutable (persistent) and separating the concepts of identity and value. The importance of immutability cannot be over-emphasized: immutable values can be safely shared between threads, eliminate many concurrency hazards, and ultimately make it easier for developers to reason about their programs.

However, a language that only has immutable data structures and no way to change (mutate) program state is not very useful. The identity/value separation makes state mutations (e.g., incrementing a counter or adding an element to a list) possible in ways that have known guarantees with respect to concurrency. This separation largely eliminates the need for explicit use of locks, which is possible in Clojure but typically not necessary.

To put it another way: "changing variables" in Clojure happens differently from many other languages; in ways that are predictable from the concurrency perspective and which eliminate many concurrency hazards.

Next lets take a closer look to the identity/value separation.

Identity/Value Separation ("on State and Identity")

In Clojure, values are immutable. They never change. For example, a number is a value. A map {:language "Clojure"} is a value. A vector with 3 elements is a value.

When you attempt to modify a value (a data structure), a new value is produced instead. These are known as persistent data structures (the word "persistent" has nothing to do with storing data on disk).

An identity is a named entity (e.g., a list of active chat group members or a counter) that changes over time and at any given moment references a value. For example, the current value of a counter may be 42. After incrementing it, the value is 43 but it is still the same counter --- the same identity. This is different from, say, Java or Ruby, where variables serve as identities that (typically) point to a mutable value and which are modified in place.

identity_value

See also Values and Change: Clojure’s approach to Identity and State on clojure.org.

Identities in Clojure can be of several types, known as reference types.

Clojure Reference Types

Overview

In Clojure's world view, concurrent operations can be roughly classified as coordinated or uncoordinated, and synchronous or asynchronous. Different reference types in Clojure have their own concurrency semantics and cover different kind of operations:

CoordinatedUncoordinated
SynchronousRefsAtoms
AsynchronousAgents
Coordinated
An operation that depends on cooperation from other operations (possibly, other operations at least do not interfere with it) in order to produce correct results. For example, a banking operation that involves more than one account.
Uncoordinated
An operation that does not affect other operations in any way. For example, when downloading 100 Web pages concurrently, each operation does not affect the others.
Synchronous
When the caller's thread waits, blocks, or sleeps until it has access to a given resource or context.
Asynchronous
Operations that can be started or scheduled without blocking the caller's thread.

One more reference type, vars, supports dynamic scoping and thread-local storage.

Atoms

Atoms are references that change atomically (changes become immediately visible to all threads, changes are guaranteed to be synchronized by the JVM). If you come from a Java background, atoms are basically atomic references from java.util.concurrent with a functional twist to them. Atoms are identities that implement synchronous, uncoordinated, atomic updates.

Lets jump right in and demonstrate how atoms work using an example. We know that Clojure data structures are immutable by default. Adding an element to a collection really produces a new collection. In such case, how does one keep a shared list (say, of active connections to a server or recently crawled URLs) and mutate it in a thread-safe manner? We will demonstrate how to accomplish this with an atom.

To create an atom, use the clojure.core/atom function. Its argument will serve as the atom's initial value:

(def currently-connected (atom []))

The line above makes the atom currently-connected an empty vector. To access an atom's value, use clojure.core/deref or the @ reader form:

(def currently-connected (atom []))

@currently-connected
;; ⇒ []
(deref currently-connected)
;; ⇒ []
currently-connected
;; ⇒ #object[clojure.lang.Atom 0x7499eac7 {:status :ready, :val []}]

As the returned values demonstrate, the atom itself is a reference. To access its current value, you dereference it. Dereferencing will be covered in more detail later in this guide. For now, it is sufficient to say that dereferencing returns the current value of an atom. (Other Clojure reference types as well as a few specialized data structures can be dereferenced as well.)

Locals can be atoms, too:

(let [xs (atom [])]
  @xs)
;; ⇒ []

Now to the most interesting part: adding elements to the collection.

To mutate an atom, we can use clojure.core/swap!.

swap! takes an atom, a function and optionally some other args, swaps the current value of the atom to be the return value of calling the function with the current value of the atom and the args:

(swap! currently-connected conj "chatty-joe")
;; ⇒ ["chatty-joe"]
currently-connected
;; ⇒ #object[clojure.lang.Atom 0x7499eac7 {:status :ready, :val ["chatty-joe"]}]
@currently-connected
;; ⇒ ["chatty-joe"]

To demonstrate this graphically, initial atom state looks like this:

Atom state 1

and then we mutated it with swap!:

Atom state 2

For the readers familiar with the atomic types from the java.util.concurrent.atomic package, this should sound very familiar. The only difference is that instead of setting a value, atoms are mutated with a function. This is both because Clojure is a functional language and because with this approach, clojure.core/swap! can retry the operation safely. This implies that the function you provide to swap! is pure (has no side effects).

Occasionally you will need to mutate the value of an atom the same way you would with atomic references in Java: by setting them to a specific value. This is what clojure.core/reset! does. It takes an atom and the new value:

@currently-connected
;; ⇒ ["chatty-joe"]
(reset! currently-connected [])
;; ⇒ []
@currently-connected
;; ⇒ []

reset! may be useful in test suites to reset an atom's state between test executions, but it should be used sparingly in your implementation code. Consider using swap! first.

By using an impure swap! function, we can see how it can retry when there is contention:

(let [a (atom 0)
      q (mapv #(future (swap! a (fn [n] (println %) (inc n))))
              (range 10))]
  (mapv deref q))

This will return a vector of 10 values, 1 to 10, in some arbitrary order, and it will print the numbers 0 to 9 at least once each -- but probably print several of them more than once, indicating that swap! was retried.

Summary and Use Cases

Atoms is the most commonly used concurrent feature in Clojure. It covers many cases and lets developers avoid explicit locking. Atoms cover a lot of use cases and are very fast. It's fair to say that when you need uncoordinated reference types (e.g., not Software Transactional Memory), the rule of thumb is, "start with an atom, then see".

It is not uncommon to initialize an atom in a local and then return it from the function and share a piece of state with other functions and/or threads.

Agents

Agents are references that are updated asynchronously: updates happen at a later, unknown point in time, in a thread pool. Agents are identities that implement uncoordinated, asynchronous updates.

A small but useful example of using an agent is as a counter. For example, suppose we want to track how often page downloads in a Web crawler respond with 40x and 50x status codes. The simplest version can look like this:

(def errors-counter (agent 0))
;; ⇒ #'user/errors-counter
errors-counter
;; ⇒ #object[clojure.lang.Agent 0x45e6d1e0 {:status :ready, :val 0}]
@errors-counter
;; ⇒ 0
(deref errors-counter)
;; ⇒ 0

One can immediately make several observations: just like atoms, agents are references. To get the current value of an agent, we need to dereference it using clojure.core/deref or the @agent reader macro.

To mutate an agent, we use clojure.core/send and clojure.core/send-off:

@errors-counter
;; ⇒ 0
(send errors-counter inc)
;; ⇒ #object[clojure.lang.Agent 0x45e6d1e0 {:status :ready, :val 0}]
@errors-counter
;; ⇒ 1

;; 10 is an additional parameter. The + function will be invoked as `(+ @errors-counter 10)`.
(send errors-counter + 10)
;; ⇒ #object[clojure.lang.Agent 0x45e6d1e0 {:status :ready, :val 1}]
@errors-counter
;; ⇒ 11

send and send-off are largely similar. The difference is in how they are implemented. send uses a fixed-size thread pool so using blocking operations with it won't yield good throughput. send-off uses a growing thread-pool so blocking operations is not a problem for it as long as there are resources available to the JVM to create and run all the threads. On a 4-8 GB machine with 4 cores and stock OS settings you can expect up to a couple of thousand I/O-bound threads to work without running the system out of kernel resources.

Using Custom Executors With Agents

Agents can be used (and abused) for arbitrary code execution in a thread pool. Because the default thread pool Clojure maintains will not be a good fit for all use cases, Clojure provides a function that lets you control what thread pool (executor) is used by clojure.core/send: clojure.core/set-agent-send-executor!.

(import java.util.concurrent.Executors)

(set-agent-send-executor! (Executors/newFixedThreadPool 32))
;; clojure.core/send now will use the fixed size thread pool with 32 threads

The default thread pool size is number of available CPU cores + 2.

clojure.core/set-agent-send-off-executor! is a similar function that controls what thread pool clojure.core/send-off will use.

Finally, another useful function is clojure.core/send-via which is like clojure.core/send but lets you specify an executor to be used on a case-by-case basis:

(import java.util.concurrent.Executors)

(def custom-pool (Executors/newFixedThreadPool 32))
;; just like clojure.core/send but will use custom-pool instead
;; of an internally maintained one
(send-via custom-pool stream-agent inc)

Agents and Software Transactional Memory

We haven't introduced refs and the concept of Software Transactional Memory yet. It will be covered later in this guide. Here it's sufficient to mention that agents are STM-aware and can be safely used inside transactions.

Agents and Error Handling

Functions that modify an agent's state will not always return successfully in the real world. Sometimes they will fail. For example:

@errors-counter
;; ⇒ 11
(send errors-counter / 0)
;; ⇒ #object[clojure.lang.Agent 0x45e6d1e0 {:status :ready, :val 11}]
;; evaluation will produce an exception

This puts the agent into the failed state. Failed agents will re-raise the exception that caused them to fail every time their state changed is attempted:

(send errors-counter / 0)
;; Execution error (ArithmeticException) at java.util.concurrent.ThreadPoolExecutor/runWorker (ThreadPoolExecutor.java:1144).
;; Divide by zero
(send errors-counter inc)
;; Execution error (ArithmeticException) at java.util.concurrent.ThreadPoolExecutor/runWorker (ThreadPoolExecutor.java:1144).
;; Divide by zero

To access the exception that occurred during the agent's state mutation, use clojure.core/agent-error:

(send errors-counter / 0)
;; Execution error (ArithmeticException) at java.util.concurrent.ThreadPoolExecutor/runWorker (ThreadPoolExecutor.java:1144).
;; Divide by zero
(agent-error errors-counter)
;; #error {
;;  :cause "Divide by zero"
;;  :via
;;  [{:type java.lang.ArithmeticException
;;    :message "Divide by zero"
;;    :at [clojure.lang.Numbers divide "Numbers.java" 190]}]
;;  ...}

It returns an exception. Agents can be restarted with clojure.core/restart-agent that takes an agent and a new initial value:

(restart-agent errors-counter 0)
;; ⇒ 0
(send errors-counter + 10)
;; ⇒ #object[clojure.lang.Agent 0x45e6d1e0 {:status :ready, :val 0}]
@errors-counter
;; ⇒ 10

If you'd prefer an agent to ignore exceptions instead of going into the failure mode, clojure.core/agent takes an option that controls this behavior: :error-mode. Because completely ignoring errors is rarely a good idea, when the error mode is set to :continue you must also pass an error handler function:

(def errors-counter (agent 0
                           :error-mode    :continue
                           :error-handler (fn [failed-agent exception]
                                            (println (ex-message exception)))))
;; ⇒ #'user/errors-counter
(send errors-counter inc)
;; ⇒ #object[clojure.lang.Agent 0x25a94b55 {:status :ready, :val 0}]
(send errors-counter inc)
;; ⇒ #object[clojure.lang.Agent 0x25a94b55 {:status :ready, :val 1}]
(send errors-counter / 0)
;; output: Divide by zero
;; => #object[clojure.lang.Agent 0x25a94b55 {:status :ready, :val 2}]
(send errors-counter inc)
;; ⇒ #object[clojure.lang.Agent 0x25a94b55 {:status :ready, :val 2}]
@errors-counter
;; ⇒ 3

The handler function takes two arguments: an agent and the exception that occurred.

You can also change the error mode and error handler programmatically, using clojure.core/set-error-mode! and clojure.core/set-error-handler!.

Summary and Use Cases

Agents are asynchronously updated references. They can be used for anything that does not require strict consistency for reads:

  • Counters (e.g. message rates in event processing)
  • Collections (e.g. recently processed events)

Agents can be used for offloading arbitrary computations to a thread pool, and they can provide the same flexibility as JDK executors (thread pools).

Refs

Refs are the only coordinated reference type Clojure has. They help ensure that multiple identities can be modified concurrently within a transaction:

  • Either all refs are modified or none are
  • No race conditions between involved refs
  • No possibility of deadlocks between involved refs

Refs provide ACI of ACID. Refs are backed by Clojure's implementation of software transactional memory (STM).

To instantiate a ref, use the clojure.core/ref function:

(def account-a (ref 0))
;; ⇒ #'user/account-a
(def account-b (ref 0))
;; ⇒ #'user/account-b

Like atoms and agents covered earlier, to get the current value of a ref, use clojure.core/deref or the "@" reader macro:

(deref account-a)
;; ⇒ 0
@account-b
;; ⇒ 0

Refs are for coordinated concurrent operations and so it does not make much sense to use a single ref (in that case, an atom would be sufficient). Refs are modified in a transaction in the clojure.core/dosync body.

clojure.core/dosync starts a transaction, performs all modifications and commits changes. If a concurrently running transaction modifies a ref in the current transaction before the current transaction commits, the current transaction will be retried to make sure that the most recent value of the modified ref is used.

TBD: a picture that visualizes retries and serializability.

alter

Refs are modified using clojure.core/alter which is very similar to clojure.core/swap! in the arguments it takes: a ref, a function that takes an old value and returns a new value of the ref, and any number of optional arguments to pass to the function.

In the following example, two refs are initialized at 1000, representing two bank accounts. Then 100 units are transferred from one account to the other, atomically:

(def account-a (ref 1000))
;; ⇒ #'user/account-a
(def account-b (ref 1000))
;; ⇒ #'user/account-b

(dosync
  ;; will be executed as (+ @account-a 100)
  (alter account-a + 100)
  ;; will be executed as (- @account-b 100)
  (alter account-b - 100))
;; ⇒ 900
@account-a
;; ⇒ 1100
@account-b
;; ⇒ 900

Conflicts and Retries

TBD: explain transaction conflicts, demonstrate transaction retries

commute

With a high number of concurrently running transactions, retries overhead can become noticeable. Some modifications, however, can be applied in any order. Clojure's STM implementation acknowledges this fact and provides an alternative way to modify refs: clojure.core/commute. commute must only be used for operations that commute in the mathematical sense: the order can be changed without affecting the result. For example, addition is commutative (1 + 10 produces the same result as 10 + 1) but subtraction is not (1 − 10 does not equal 10 − 1).

clojure.core/commute has the same signature as clojure.core/alter:

@account-a
;; ⇒ 1100
@account-b
;; ⇒ 900
(dosync
  (commute account-a + 300)
  (commute account-b + 300))
;; ⇒ 1200
@account-a
;; ⇒ 1400
@account-b
;; ⇒ 1200

Note that a change made to a ref by commute will never cause a transaction to retry. commute does not cause transaction conflicts.

Using Refs With Clojure Data Structures

TBD: demonstrate more complex changes, e.g., to game characters

Limitations of Refs

Software transactional memory is a powerful but highly specialized tool. Because transactions can be retried, you must only use pure functions with STM. I/O operations cannot be undone by the runtime and very often are not idempotent.

Structuring your application code as a pure core with edge code that interacts with the user or other services (performing I/O operations and other side-effects) helps with this. In that case, the pure core can use STM without issues. This is often referred to as functional core, imperative shell.

For example, in a Web or network server, incoming requests are the edge code: they do I/O. The pure core is then called to modify server state, do any calculations necessary, return a result that is returned back to the client by the edge code:

TBD: a picture to demonstrate

Unlike some other languages and runtimes (for example, Haskell), Clojure will not prevent you from doing I/O in transactions. It is left as a matter of discipline on the programmer's part. It does provide a helper function, though: clojure.core/io! will raise an exception if there is an STM transaction running and has no effect otherwise.

First, an example with pure code:

(io!
  ;; pure code, clojure.core/io! has no effect
  (reduce + (range 0 100)))
;; ⇒ 4950

And an example that invokes functions that are guarded with clojure.core/io! in an STM transaction:

(defn render-results
  "Prints results to the standard output"
  []
  (io!
    (println "Results:")
    (comment ...)))
;; ⇒ #'user/render-results
(dosync
  (alter account-a + 100)
  (alter account-b - 100)
  (render-results))
;; throws java.lang.IllegalStateException, "I/O in transaction!"

Summary and Use Cases

TBD

Vars

Vars are the reference type you are already familiar with. You define them via the def special form:

(def url "https://en.wikipedia.org/wiki/Margarita")

Functions defined via defn are also stored in vars. Vars can be dynamically scoped. They have root bindings that are initially visible to all threads. When defining a var with def, you define a var that only has root binding, so its value will be the same no matter what thread you use it from:

(def url "https://en.wikipedia.org/wiki/Margarita")
;; ⇒ #'user/url
(.start (Thread. (fn []
                   (println (format "url is %s" url)))))
;; outputs "url is https://en.wikipedia.org/wiki/Margarita"
;; ⇒ nil
(.start (Thread. (fn []
                   (println (format "url is %s" url)))))
;; outputs "url is https://en.wikipedia.org/wiki/Margarita"
;; ⇒ nil

Dynamic Scoping and Thread-local Bindings

To temporarily change var value, we need to make the var dynamic by adding :dynamic true to its metadata and then use clojure.core/binding:

(def ^:dynamic *url* "https://en.wikipedia.org/wiki/Margarita")
;; ⇒ #'user/*url*
(println (format "*url* is now %s" *url*))
;; outputs "*url* is now https://en.wikipedia.org/wiki/Margarita"

(binding [*url* "https://en.wikipedia.org/wiki/Cointreau"]
  (println (format "*url* is now %s" *url*)))
;; outputs "*url* is now https://en.wikipedia.org/wiki/Cointreau"
;; ⇒ nil

Note that, by convention, vars which are supposed to or may be dynamically scoped are named with leading and trailing asterisks * (often referred to as "earmuffs").

In the example above, binding temporarily changed the var's current value to a different URL. But that happened only in the same thread as the var was originally defined in. What makes vars interesting from the concurrency point of view is that their bindings can be thread-local (yes, if you are familiar with thread-local variables in Java or Ruby, it is very similar and serves largely the same purpose). To demonstrate, let's change the example to spin up 3 threads and alter the var's value from them:

(def ^:dynamic *url* "https://en.wikipedia.org/wiki/Margarita")
;; ⇒ #'user/*url*
(println (format "*url* is now %s" *url*))
;; outputs "*url* is now https://en.wikipedia.org/wiki/Margarita"
;; ⇒ nil
(.start (Thread. (fn []
          (binding [*url* "https://en.wikipedia.org/wiki/Cointreau"]
            (println (format "*url* is now %s" *url*))))))
;; outputs "*url* is now https://en.wikipedia.org/wiki/Cointreau"
;; ⇒ nil
(.start (Thread. (fn []
                   (binding [*url* "https://en.wikipedia.org/wiki/Guignolet"]
                     (println (format "*url* is now %s" *url*))))))
;; outputs "*url* is now https://en.wikipedia.org/wiki/Guignolet"
;; ⇒ nil
(.start (Thread. (fn []
                   (binding [*url* "https://en.wikipedia.org/wiki/Apéritif"]
                     (println (format "*url* is now %s" *url*))))))
;; outputs "*url* is now https://en.wikipedia.org/wiki/Apéritif"
;; ⇒ nil
(println (format "*url* is now %s" *url*))
;; outputs "*url* is now https://en.wikipedia.org/wiki/Margarita"
;; ⇒ nil

As you can see, var scoping in different threads did not modify the var's value in the thread it was originally defined in (its root binding). In real-world cases, for example, it means that a multi-threaded Web crawler can store some crawling state specific to a particular thread in a var and not modify its initial (global) value.

How to Alter Var Root

Sometimes, however, modifying the root binding is necessary. This is done via clojure.core/alter-var-root which takes a var (not its value) and a function that takes the old var value and returns a new one:

*url*
;; ⇒ "https://en.wikipedia.org/wiki/Margarita"
(.start (Thread. (fn []
                   (alter-var-root (var user/*url*) (fn [_] "https://en.wikipedia.org/wiki/Apéritif"))
                   (println (format "*url* is now %s" *url*)))))
;; outputs "*url* is now https://en.wikipedia.org/wiki/Apéritif"
;; ⇒ nil
*url*
;; ⇒ "https://en.wikipedia.org/wiki/Apéritif"

clojure.core/var is used to locate the var (user/*url* in our example executed in the REPL). Note that it finds the var itself (the reference, the "box"), not its value (what the var evaluates to). You can also use the reader macro #' to get a var: #'user/*url* which is a little less typing.

In the example above the function we use to alter var root ignores the current value and simply returns a predefined string:

(fn [_] "https://en.wikipedia.org/wiki/Apéritif")

Such functions are common enough for clojure.core to provide a convenience higher-order function called clojure.core/constantly. It takes a value and returns a function that, when executed, ignores all its parameters and returns that value. So, the function above would be more idiomatically written as

*url*
;; ⇒ "https://en.wikipedia.org/wiki/Margarita"
(.start (Thread. (fn []
                   (alter-var-root #'user/*url* (constantly "https://en.wikipedia.org/wiki/Apéritif"))
                   (println (format "*url* is now %s" *url*)))))
;; outputs "*url* is now https://en.wikipedia.org/wiki/Apéritif"
;; ⇒ nil
*url*
;; ⇒ "https://en.wikipedia.org/wiki/Apéritif"

When is alter-var-root used in real world scenarios? Mostly for working in the REPL where you want some system state in a global var to be easily accessible but its value would normally be computed locally inside a function (such as -main when an application starts up).

Summary and Use Cases

To summarize: vars can have dynamic scope. They have a root binding and can have thread-local bindings as well. As such, vars are good for storing pieces of program state that vary between threads but cannot be stored in a function local. alter-var-root is used to alter root binding of a var. It is done the functional way: by providing a function that takes the old var value and returns a new one.

To alter var root to a specific known value, use clojure.core/constantly.

Dereferencing

Earlier sections demonstrated the concept of dereferencing. Dereferencing means retrieving the current value of a reference (an atom, an agent, a ref, etc). To dereference a Clojure reference, use clojure.core/deref or the @reference reader macro:

(let [xs (atom [])]
  @xs)
;; ⇒ []

Besides atoms, agents, and refs, Clojure has several other concurrency-oriented data structures that can be dereferenced: delays, futures, and promises. They will be covered later in this guide.

Dereferencing Support For Data Types Implemented In Java

Clojure identifies data types that can be dereferenced by checking whether they implement the clojure.lang.IDeref interface. If that interface is not implemented, deref assumes the data type will implement java.util.concurrent.Future and calls .get on it.

It is possible to make custom data types implemented in Java support dereferencing by making them implement the clojure.lang.IDeref interface:

package clojure.lang;

public interface IDeref{
  Object deref();
}

This can be done to make data types implemented in Java look and feel more like built-in Clojure data types, or make it possible to pass said types to a function that expects its arguments to be dereferenceable.

Dereferencing Support for Data Types Implemented in Clojure

Clojure data types can be made dereferenceable by implementing the clojure.lang.IDeref, for example using reify:

(def d (reify clojure.lang.IDeref (deref [this] "I'm a value!")))
;; => #'user/d
@d
;; => "I'm a value!"

Delays

In Clojure, a delay is a data structure that is evaluated the first time it is dereferenced. Subsequent dereferencing will use the cached value. Delays are instantiated with the clojure.core/delay function.

In the following example a delay is used to calculate a timestamp that is later used as a cached value:

(def d (delay (System/currentTimeMillis)))
;; ⇒ #'user/d
d
;; ⇒ #object[clojure.lang.Delay 0x307e4c44 {:status :pending, :val nil}]
;; dereferencing causes the value to be realized, it happens only once
@d
;; ⇒ 1696717714262
@d
;; ⇒ 1696717714262
@d
;; ⇒ 1696717714262
d
;; => #object[clojure.lang.Delay 0x307e4c44 {:status :ready, :val 1696717714262}]

clojure.core/realized? can be used to check whether a delay instance has been realized or not:

(def d (delay (System/currentTimeMillis)))
;; ⇒ #'user/d
(realized? d)
;; ⇒ false
@d
;; ⇒ 1350997967984
(realized? d)
;; ⇒ true

Futures

A Clojure future evaluates a piece of code in another thread. To instantiate a future, use clojure.core/future. The future function will return immediately (it never blocks the current thread). To obtain the result of computation, dereference the future:

(def ft (future (+ 1 2 3 4 5 6)))
;; ⇒ #'user/ft
ft
;; ⇒ #object[clojure.core$future_call$reify__8544 0x70029d2d {:status :ready, :val 21}]
@ft
;; ⇒ 21

Dereferencing a future blocks the current thread. Because some operations may take a very long time or get blocked forever, futures support a timeout specified when you dereference them:

;; will block the current thread for 10 seconds, returns :completed
(def ft (future (Thread/sleep 10000) :completed))
;; ⇒ #'user/ft
(deref ft 2000 :timed-out)
;; ⇒ :timed-out

Subsequent access to futures using deref will use the cached value, just like it does for delays.

Just like delays, it is possible to check whether a future is realized or not with clojure.core/realized?:

(def ft (future (reduce + (range 0 10000))))
;; ⇒ #'user/ft
(realized? ft)
;; ⇒ true
@ft
;; ⇒ 49995000

Clojure futures are evaluated in an unbounded, cached thread pool that is also used by agents (updated via clojure.core/send-off). This works well in many cases but may result in exhaustion of the heap or thrashing if too many futures are created.

Finally, Clojure futures implement java.util.concurrent.Future and can be used with Java APIs that accept them.

Promises

Promises are yet another take on asynchronously realized values. They are similar to futures in certain ways:

  • Can be dereferenced with a timeout
  • Caches the realized value
  • Supported by clojure.core/realized?

However, promises are realized not by evaluating a piece of code but by calling clojure.core/deliver on a promise along with a value:

;; promises have no code body (no code to evaluate)
(def p (promise))
;; ⇒ #'user/p
p
;; ⇒ #object[clojure.core$promise$reify__8591 0x2a2dc0a {:status :pending, :val nil}]
(realized? p)
;; ⇒ false

;; delivering a promise makes it realized
(deliver p {:result 42})
;; ⇒ #object[clojure.core$promise$reify__8591 0x2a2dc0a {:status :ready, :val {:result 42}}]
(realized? p)
;; ⇒ true
@p
;; ⇒ {:result 42}

Promises combine many of the benefits of callback-oriented asynchronous programming and the simpler blocking function calls model provided by dereferencing.

Watches and Validators

TBD

Using Intrinsic Locks ("synchronized") in Clojure

Explicit Locking

Every object on the JVM has an intrinsic lock (also referred to as monitor lock or simply monitor). By convention, a thread that needs to modify a field of a mutable object has to acquire the object's intrinsic lock and then release it. As long as a thread owns an intrinsic lock, no other thread can acquire the same lock.

In Clojure, explicit synchronization like this is rarely necessary but may be needed for interoperability with Java code. When you need to execute a piece of code while holding an intrinsic lock of a mutable object, use the clojure.core/locking macro:

(let [l (java.util.ArrayList.)]
  (locking l
    (.add l 10))
  l)
;; ⇒ [10]

Note that for immutable Clojure data structures, explicit locking is effectively not necessary.

This section will need revisiting when Virtual Threads become more common.

Synchronization on Clojure Record Fields

TBD

Reducers (Clojure 1.5+)

java.util.concurrent

Overview

java.util.concurrent (sometimes abbreviated as j.u.c.) is a group of concurrency utilities in the JDK. Originally introduced in JDK 5 in 2004, they are developed and maintained by some of the experts in concurrency. j.u.c. is a mature library that has been heavily battle tested for almost a decade.

While Clojure provides a whole toolkit of concurrency features of its own, in certain cases the best solution is to use an existing j.u.c. class or even build a new abstraction on top of j.u.c. building blocks.

j.u.c. consists of multiple parts that cover common concurrent programming patterns and use cases: from thread pools (a.k.a. executors) to synchronization classes, to atomic variables, to concurrent collections, to the Fork/Join framework.

Executors (Thread Pools)

Overview

The Executor interface standardizes invocation, scheduling, execution, and control of asynchronous tasks. Those tasks can be executed in the calling thread, in newly created threads, or (mostly typically) in a thread pool. Thread pools also can have different implementations: for example, be fixed size or growing dynamically, using different error handling strategies and so on.

Executors are most often instantiated using static methods of the java.util.concurrent.Executors class. To submit an operation to the pool, use the ExecutorService#submit method.

(import '[java.util.concurrent Executors ExecutorService Callable])

(let [^ExecutorService pool (Executors/newFixedThreadPool 16)
      ^Callable clbl        (cast Callable (fn []
                                             (reduce + (range 0 10000))))]
  (.submit pool clbl))
;; ⇒ #object[java.util.concurrent.FutureTask 0x77f4c040 "java.util.concurrent.FutureTask@77f4c040[Not completed, task = user$eval5944$fn__5945@4b8137c5]"]
;; ...wait some time...
*1
;; => #object[java.util.concurrent.FutureTask 0x77f4c040 "java.util.concurrent.FutureTask@77f4c040[Completed normally]"]
@*1
49995000

In the example above, we create a new fixed size thread pool with 16 threads and submit a Clojure function for execution. Clojure functions implement Runnable and Callable interfaces and can be submitted for execution, however, because ExecutorService#submit is an overloaded method, to avoid reflection warnings, we cast the function to java.util.concurrent.Callable.

java.util.concurrent.Future

Executor#submit will return an instance of java.util.concurrent.Future. It is much like Clojure futures but cannot be dereferenced. To get the result, use the j.u.c.Future#get method:

(import '[java.util.concurrent Executors ExecutorService Callable])

(let [^ExecutorService pool (Executors/newFixedThreadPool 16)
      ^Callable clbl        (cast Callable (fn []
                                             (reduce + (range 0 10000))))
      task                  (.submit pool clbl)]
  (.get task))
;; ⇒ 49995000

Scheduled Executors

TBD

Countdown Latches

Countdown latch is a thread synchronization data structure. More specifically, it handles on group of concurrent workflows: "block the current thread until N other threads are done with their work". For example, "make a POST request to N URLs and continue when all N operations succeeded or failed".

Countdown latches are instances of java.util.concurrent.CountDownLatch and instantiated with a positive integer:

(import java.util.concurrent.CountDownLatch)

(CountDownLatch. n)

When the CountDownLatch#await method is executed, the calling thread blocks until the counter gets to 0. Invoking the CountDownLatch#countDown method decreases the counter by 1. Count down operations, of course, are supposed to be performed in other threads.

An example to demonstrate:

(let [cnt   (atom [])
      n     5
      latch (java.util.concurrent.CountDownLatch. n)]
  (doseq [i (range 0 n)]
    (.start (Thread. (fn []
                       (swap! cnt conj i)
                       (.countDown latch)))))
  (.await latch)
  @cnt)
;; note the ordering: starting N threads in parallel leads to
;; non-deterministic thread interleaving
;; ⇒ [0 1 2 4 3]

In the example above, we start multiple threads and block the current thread until all other threads are done. In this example, those other threads simply add an integer to a vector stored in an atom. More realistic scenarios will contact external services, the file system, perform some computation and so on.

Because when threads are executed concurrently (or in parallel), the order of their execution is not guaranteed, we see 4 being added to the vector before 3 in the result.

Countdown latches are commonly used with initial value of 1 to "block and wait until this operation in a different thread is done".

Concurrent Collections

Most of the Java collections are mutable and were not designed for concurrency. java.util.concurrent includes a number of collections that are thread safe and can be used for passing data structures between threads.

Atomic Variables

The java.util.concurrent.atomic package provides a number of data structures that support lock-free thread-safe programming on a single variable (identity). They support conditional atomic update operation (compared-and-swap aka CAS).

Some of the more popular atomic types in the j.u.c.atomic package are AtomicBoolean, AtomicLong and AtomicReference.

Atomic references are pretty well covered in Clojure with atoms but occasionally may be used by other libraries. An example to demonstrate how to use an atomic long for a thread-safe counter:

(let [l (AtomicLong.)]
  (dotimes [i 50]
    (.start (Thread. (fn []
                       (.incrementAndGet l)))))
  (.get l))
;; ⇒ 49

Fork/Join Framework

TBD

Other Approaches to Concurrency

There are also other approaches to concurrency that neither Clojure nor Java cover. The growing adoption of message passing concurrency (the Actor model and CSP) lead to the creation of several JVM-based frameworks for message passing. Some of the most popular ones include:

Akka's Java API can be used from Clojure either directly or via a library called Okku.

In LMAX Disruptor, event instances passed around are assumed to be mutable, so the framework is of limited use with Clojure.

Runtime Parallelism

Clojure was designed to be a hosted language. Its primary target, the JVM, provides runtime parallelism support. JVM threads map 1:1 to kernel threads. Those will be executed in parallel given that enough cores are available for OS scheduler to use.

In Clojure, many concurrency features are built on top of JVM threads and thus benefit from runtime parallelism if the program is running on a multi-core machine.

Books

Concurrency is a broad topic and it would be silly to think that we can cover it well in just one guide. To get a better understanding of the subject, one can refer to a few excellent books:

Wrapping Up

One of Clojure design goals was to make concurrent programming easier.

The key design decision was making Clojure data structures immutable (persistent) and separating the concepts of identity (references) and value. Immutable values eliminate many concurrency hazards, and ultimately make it easier for developers to reason about their programs.

Atoms are arguably the most commonly used reference type when working with concurrency (vars are used much more often but not for their concurrency semantics). Software Transactional Memory is a more specialized feature and has certain limitations (e.g., I/O operations must not be performed inside transactions). Finally, agents, futures, and promises provide an array of tools for working with asynchronous operations.

Concurrency is a hard fundamental problem. There is no single "best" solution or approach to it. On the JVM, Clojure offers several concurrency-related features of its own but also provides easy access to the java.util.concurrent primitives and libraries such as Akka or Jetlang.

Contributors

Michael Klishin michael@defprotocol.org, 2012 (original author)