Time, Clocks, and an Implementation in Erlang

Table of Contents

L. Lamport's beautiful paper, "Time, Clocks, and the Ordering of Events in a Distributed System", describes an elegant distributed locking algorithm. I implement it in Erlang.

Ordering Events

The primary problem "Time, Clocks" attends to is to define the concept of a clock for a distributed system[0]. This is a surprisingly difficult problem if we don't want to introduce external notions of physical time. An important observation is that the whole point of a clock is that an event preceding another will occur at an earlier time. Lamport writes this as \(C(a) < C(b)\) (where \(C\) is the clock) if \(a \to b\) (\(a\) precedes \(b\)), and calls it the "Clock Condition". A clock is a function, in other words, which linearizes the notion of "happens before".

What do we mean by "happens before"? Well, we're talking about the ordering of some sorts of events, and also some amount of communication between processes. The events on a single processor are linearly ordered because they happen in one thread, but what about the communications? Well, we can treat a message as two events: sending the message, and receiving it. Sending the message must happen before receiving it (obviously); this constraint ties the events on one processor to those on another processor. For example, if processor A sees events a, b, and c, then sends message m to process B, which then sees events d, e, and f, the ordering abcsend mreceive mdef is forced, since a, b, and c preceding sending m, sending m precedes receiving it, and receiving m precedes d, e, and f.

This relation → defines a partial ordering on events. In particular, it must satisfy these laws:

  • If events a and b occur on the same processor, ab if a precedes b in the natural order of events on that processor.
  • For any message m, send mreceive m.
  • For no a is aa true (no event precedes itself).
  • If ab and bc, ac ("precedes" is transitive).

With this partial ordering in place, we can now define the notion of a clock as above. But it's not yet clear that a system of clocks that follow the Clock Condition actually exists for an arbitrary system.

One does.

Core idea is that we can always increase the local time without breaking the Clock Condition. Each clock will store an integer (often these are called sequence numbers, or you can just call it the clock's time), and every time an event happens, we'll increase this integer to maintain the Clock Condition. If the event is a normal old event (not the receiving of a message), we'll just increment this integer by one, separating this event in time from all preceding ones. To handle the message receives, we will send a timestamp of when the message was sent with each message. When the message is received, that process will increase its clock until it is greater than the message timestamp (when it was sent), unless of course its clock is already greater than this.

This clock allows any processor to totally order events it has seen[1]. This allows us to easily derive distributed algorithms that have to achieve some sort of order guarantee.

Distributed Locks

The particular problem we want to solve is relatively simple, but difficult enough to be representative. We want a method for a fixed number of processor to share a single resource with mutual exclusion. Each processor should be able to request access to the resource, have this request granted, and release access again, in such a way that no two processors think they both have access at any one time. We'd further like these requests to be granted in the order in which they are sent. This extra requirement makes even a totally centralized system non-trivial: we might imagine just setting up a central server and granting lock requests as they come in (and queuing them while the lock is held), but this doesn't work; I might send a lock request before you, but if your request reaches the central server first, you'd be incorrectly granted the lock.

However, the problem becomes easy once you establish a system of clocks on the processors as above. We'll have each a processor send a lock request to every other processor when it wants to request the lock. Then event of the lock being granted must occur after every other processor receives this message (otherwise, they won't know that the lock is being held). This naturally suggests waiting for an acknowledgment of the lock request, since that will have to be sent after the lock request is received and will thus be received after the lock request was.

Likewise, we want to force the event of being granted the lock to follow the event of releasing the lock that follows the previous granting of the lock. This constraint is also naturally achieved, simply by sending a release notification to all processes, and waiting for this release notification before considering the lock not held.

Note that our method of designing this algorithm was simply to

  1. Determine how events were constrainted (granting must follow every processor knowing about the lock request), and
  2. Add messaging to enforce these constraints.

Given how difficult designing distributed algorithms usually is, this is incredibly easy in comparison.

Implementation

I decided to implement this algorithm, because an algorithm this beautiful deserves to be implemented. In particular, I decided to make this the first significant Erlang program I would write.

Why Erlang?

Erlang is not my native tongue (that would be Python or Javascript), nor even a language I've used in a while. The reason I decided to use Erlang is that this problem is a distributed systems problem. Unfortunately, I don't have a network of computers that I could easily program this on, so my only choice is to simulate a distributed system with multi-threading on one machine. While this doesn't usually give me a significant communication delay, at least the non-deterministic ordering of events should allow me to find bugs in my implementation effectively. Erlang's VM has a mature and feature-full implementation of a multi-threading system, including message-passing for concurrency and no shared memory. This would avoid accidental state-sharing (which is cheating). Lastly, I didn't yet know Erlang, and this seemed like a good Erlang exercise.

I could also write the same thing in Python, but Python is single-threaded: there's a "Global Interpreter Lock" that allows only one thread to execute bytecode at once (this is much less of a problem than it sounds in practice, since all I/O and most heavy numerics release this lock). Python instead switches between threads in increments of about 100 bytecodes. But since I expected the code to be fairly short, I wasn't convinced that this switching would be often enough to root out bugs. Plus, Python does allow shared memory between threads, so I might accidentally use it and not notice, defeating the point of the exercise.

Javascript is even worse a choice, since it is entirely single-threaded. You can simulate threading and delayed message receives with setTimeout and friends, but it's ugly. But I might yet do this to embed a little Javascript animation of the algorithm below.

Haskell was another choice, but I really don't want to go figure out how to properly compose monads to get asynchronous debug output printed to the console from my actors. Maybe the actors should clearly be arrows? Or maybe I'm just not comfortable enough with Haskell yet.

Simplifications

I'm going to simplify the problem a bit by assuming that: 1) messages are always received, and 2) between two particular processors, messages are received in the order they are sent. Both of these two assumptions can be avoided: we can ensure that messages are received by using timeouts and message acknowledgment, and we can ensure that message order is maintained by adding a "message number" in the message explicitly encoding the order. Lamport makes this simplification in the paper.

To simplify the solution, I'm going to express our algorithm above as a state machine (see page 4 of the "Time, Clocks"), where each processor maintains a queue of received messages and responds to new messages as they arrive:

  • To request the lock, send a lock request message to each processor.
  • Upon receiving a lock request, add it to your queue, and send an acknowledgment.
  • Upon receiving an acknowledgment, add it to your queue.
  • You are granted the lock if your lock request precedes any others in your queue (precedes in time of sending) and if you've received a message from each other process sent after your request.
  • To release the lock, send a release notification to every other process.
  • Upon receiving a release notification from another process, remove all that process's lock requests from the queue.

I'm going to add an extra message, which tells a processor who the other processors in the lock network are, since in Erlang I have to construct those processors before I pass them as arguments to each process.

Take a moment to ensure that this state machine description coincides with the textual description given above.

Models and Extensibility

I'm going to model the lock algorithm as taking part between processes that do nothing but run this algorithm. To use this implementation, you'll want to create this lock process and keep a handle to it, and periodically send it request and release requests. The lock-algorithm process will send an acquired message back as soon as you acquire the lock, so you'll probably want to block until you receive that.

-module(lock).
-export([lockproc/1, test/0]).

Our lock process will instantiated by a function lockproc/1: the function lockproc of one argument, a handle to the parent process. Internally, this will dispatch to a helper function lockproc/4, which will also track a few state variables: the list of other members of the lock network, the current clock time, and the message queue.

lockproc(Owner) ->
    lockproc(Owner, [], 0, []).

lockproc(Owner, Others, Time, Queue) -> ...

The function test/0 will set up a few processes to make debugging easier; we'll discuss it later.

Though it would be conceptually cleanest not to, I'm going to make clock handling part of the same process. In principle, I could add a "clock-maintenance" process to proxy messages to the lock algorithm process, which would add the current clock time to each message. This would separate concerns and allow reusing this clock-updating logic, but it would also double the number of processes that the Erlang VM has to track and thus double overhead. It would also make debugging more difficult, since almost all interactions of multiple concurrent actors are bug sources.

Instead, I'll have a simple clock function, in two variants: clock/1 will simply increment the clock (to be used when a normal event occurs), and clock/2 will also get the timestamp of a received message, and increase clock time accordingly.

clock(Clock) ->
    Clock + 1.
clock(Clock, Timestamp) ->
    max(Clock, Timestamp + 1).

The actual clock time will be carried around inside lockproc/4.

Coding the State Machine

In Erlang, message m is sent to process p with the syntax m ! p and messages can be received with the receive construct:

receive
    Pattern1 ->
	result1();
    Pattern2 ->
	result2();
    ... -> ...
end

(Punctuation in Erlang is idiosyncratic; it draws a lot from Prolog, which wasn't necessarily a good role model. I'd suggest ignoring it while reading (indentation is a better guide), and to consult Learn You Some Erlang if you want to write some Erlang.)

However, I want to have a way to refer to the message as a whole (mostly for debugging purposes), and I didn't find a way to bind the entire pattern to a name (like Haskell's at-patterns), so I just made a catch-all receive message. We'll pick it apart later.

lockproc(Owner, Others, Time, Queue) ->
    receive Msg -> Msg = Msg end,
    io:format("~p got ~p~n", [Owner, Msg]),

The io:format/2 is something similar (but not entirely so) to Common Lisp's (FORMAT). The ~p is like %s in C-like languages, and the ~n is like a \n. I'm using it to make it clear what the system is doing: I'll print this debugging output every time I receive a message, which makes it easy to track what's going on in my lock system.

Next, we'll want to pattern-match on the message, executing different code depending on what type of message we get. In Erlang, this can be done with case ... of.

case Msg of
    ...
end.

First, we need to establish who our neighbors are.

{neighbors, Neighbors} ->
    lockproc(Owner, Neighbors, Time, Queue);

The curly braces here denote a tuple. Note the use of recursion to both keep track of state and to stay in our message-receiving loop. This seems to be a common pattern in Erlang. The recursion is a tail-recursion, so this has no costs in stack space or similar. In Erlang, capitalized symbols are variables, but lowercase symbols are just that: symbols. So the pattern neighbors above matches only the literal symbol neighbors; it identifies the type of message.

Next, we'll respond to a request from our parent process to acquire the lock by sending all processes in the lock network a lock request.

request ->
    lists:foreach(fun(Proc)-> Proc ! {Time, self(), request} end, Others),
    lockproc(Owner, Others, clock(Time), Queue);

We can assume this request can only come from our parent process, so I'm not sending along a return address in the message or anything. I'm using lists:foreach/2 (lists is a module which contains many of the standard functional list functions) to actually send a message to every process in the network, and I'm also updating the time in our recursive call. Note the value of the return address in the message: self(). In Erlang, self/1 returns a handle to the current process.

If we can send lock requests, we must also be able to receive them. This requires updating the message queue and also sending an acknowledgment. Following age-old tradition, the acknowledgment message is called an "ack".

{Timestamp, From, request} ->
    TimeNow = clock(Time, Timestamp),
    QueueNow = [{Timestamp, From, request} | Queue],
    From ! {TimeNow, self(), ack},
    lockproc(Owner, Others, TimeNow, QueueNow);

Note the use of pattern matching to bind the Timestamp and From variables while also testing that the message type is a request. After receiving the message, we update the clock, add the message to our queue, and send a message back, using the handle From that we extracted from the lock request.

The notation [H | T] is analogous to (cons first rest) in Lisp or (x : xs) in Haskell: it adds the element H to the front of list T.

Receiving the acknowledgment just adds it to the queue.

{Timestamp, From, ack} ->
    TimeNow = clock(Time, Timestamp),
    QueueNow = [{Timestamp, From, ack} | Queue],
    QueueNow2 = check_acquire(Owner, Others, QueueNow),
    lockproc(Owner, Others, TimeNow, QueueNow2);

This pattern and the one above do not overlap, because request and ack are symbols, not variables to bind values to. We update the time and queue, and also check whether we have actually acquired the lock. In principle, we could have put the recursive call outside the case statement, and checked if we have acquired the lock right before it, thus avoiding having to put check_acquire/1 calls in each case, but it isn't always possible to have acquired the lock (thus avoiding an expensive check), and it would also mean a lot more bookkeeping with our state variables, since in Erlang you can't change the value of a variable, so we'd have to keep X and XNow variables for each of our state variables.

We'll discuss check_acquire/1 later. For now, it should be enough to note that it handles notifying the owner and printing debug output, and also returns a new queue.

Now to releasing the lock. This requires notifying all other processors.

release ->
    io:format("~p released lock~n", [Owner]),
    lists:foreach(fun(Proc)-> Proc ! {Time, self(), release} end, Others),
    lockproc(Owner, Others, clock(Time), Queue);

Receiving such a release notification requires removing all requests from the sending process.

{Timestamp, From, release} ->
    QueueNow = remove(fun({_, F, request}) -> F == From;
			 ({_, _, ack}) -> false
		      end, Queue),

    QueueNow2 = check_acquire(Owner, Others, QueueNow),
    lockproc(Owner, Others, clock(Time, Timestamp), QueueNow2)

Unfortunately, the lists module doesn't include the remove function (it includes filter, but remove better corresponds both to Lamport's paper and to the description of the solution above). Luckily, it's easy to implement.

remove(_, []) -> [];
remove(Q, [H | T]) ->
    case apply(Q, [H]) of
       true  -> remove(Q, T);
       false -> [H | remove(Q, T)]
    end.

Alternatively, one could implement it as

remove(Q, L) ->
    lists:filter(fun(X) -> not Q(X) end, L).

Note that we again check whether we've acquired the lock. Indeed, you only acquire a lock when the final acknowledgment for your lock request is received, or when a lock is released by someone else and you are next in line.

Testing for Lock Acquisition

We test whether we've acquired the lock in check_acquire/3.

check_acquire(Owner, Others, Queue) ->
    ...

Up until now, we've been storing the message queue as a list that we prepend to (for efficiency's sake). We'll want the Queue sorted by receive time, which we can do with lists:sort/2. Unfortunately, no sorted(key=f) or sortBy exists; that would make the sort cleaner.

check_acquire(Owner, Others, Queue) ->
    QueueSorted = lists:sort(fun({T1, _, _}, {T2, _, _}) -> T1 =< T2 end,
			     Queue),

Then, we can trim off any extra ack messages at the start of the list, since they will be before any requests and thus not matter. lists:dropwhile/2 works nicely.

QueueTrimmed = lists:dropwhile(fun({_, _, Type}) -> Type == ack end,
			       QueueSorted),

If this trimmed and sorted queue starts with our request and contains messages from every other processor, we're good. I'll break this check out into can_acquire/3 and deal with it separately.

can_acquire(Owner, Owners, [{_, Owner, request} | Msgs]) ->
    MsgFroms = lists:map(fun({_, F, _}) -> F end, Msgs),
    lists:all(fun(O) -> lists:member(O, MsgFroms) end, Owners);
can_acquire(_, _, _) ->
    false.

This check is \(O(N M)\), where \(N\) is the number of processes and \(M\) is the number of messages. This could be reduced to \(O(N + M)\) if we use an array or hash-table to store whether we've seen a message from a given processor and update it message-by-message, but this get hairy in a purely-functional environment like Erlang (and Erlang has no monads to help with side-effecty things like arrays), and this isn't intended to be a super-efficient implementation anyways, so I'll settle with the above.

Finally, if we can acquire the lock, we send the parent process a message and write a debugging message.

case can_acquire(self(), Others, QueueTrimmed) of
    true -> Owner ! acquired,
	    io:format("~p acquired lock ~n", [Owner]);
    false -> false
end,
QueueTrimmed.

I return the trimmed queue since we deleted a few ack messages from it, and we might as well not remove them again. We don't have to reverse the queue back, since the only time the order matters is in can_acquire/3, and we sort the queue before we call it.

Testing

To make testing more efficient, I added the following helper function test/0, which sets up two processors sharing a lock.

test() ->
    P1 = spawn(lock, lockproc, [spawn(fun() -> 1 end)]),
    P2 = spawn(lock, lockproc, [spawn(fun() -> 1 end)]),
    P1 ! {neighbors, [P1, P2]},
    P2 ! {neighbors, [P1, P2]},
    [P1, P2].

I use dummy processors as the parent processes since the debug output will let me know what's going on.

Unfortunately, the bind-once nature of Erlang variables makes debugging more annoying than it ought to be. You can't just write [P1, P2] = lock:test(). once and the copy-and-paste it, since you'd be rebinding the variables P1 and P2; so you end up adding stupid little sequence numbers to your variables and constantly editing them. I used Px1, Px2, where the x ran through digits and then the alphabet.

193> c("/home/pavpanchekha/dev/distributed/lock",
       [{outdir, "/home/pavpanchekha/dev/distributed/"}]).
{ok,lock}

194> [Pv1, Pv2] = lock:test().
<0.622.0> got {neighbors,[<0.623.0>,<0.625.0>]}
<0.624.0> got {neighbors,[<0.623.0>,<0.625.0>]}
[<0.623.0>,<0.625.0>]

195> Pv1 ! request.
<0.622.0> got request
<0.622.0> got {0,<0.623.0>,request}
<0.624.0> got {0,<0.623.0>,request}
<0.622.0> got {1,<0.623.0>,ack}
<0.623.0>: [<0.623.0>,<0.625.0>]
request

=ERROR REPORT==== 3-Jan-2012::00:52:00 ===
Error in process <0.623.0> with exit value:
  {undef,[{list,map,[#Fun<lock.0.78593084>,[{1,<0.623.0>,ack}]]},
	 {lock,can_acquire,3},{lock,check_acquire,4},{lock,lockproc,5}]}

196> c("/home/pavpanchekha/dev/distributed/lock",
       [{outdir, "/home/pavpanchekha/dev/distributed/"}]).
{ok,lock}

197> [Pw1, Pw2] = lock:test().
<0.633.0> got {neighbors,[<0.634.0>,<0.636.0>]}
<0.635.0> got {neighbors,[<0.634.0>,<0.636.0>]}
[<0.634.0>,<0.636.0>]

Hopefully, there's a better way, and I, being an Erlang newbie, don't know it. Tell me if you know of one!

On the other hand, the comma operator does lead to the very nice ability to send multiple messages at a time, to simulate conditions where multiple processors try to request the lock at once.

9> P12 ! request, P11 ! request.
<0.52.0> got request
<0.50.0> got request
<0.52.0> got {4,<0.53.0>,request}
<0.50.0> got {4,<0.53.0>,request}
<0.52.0> got {4,<0.51.0>,request}
<0.50.0> got {4,<0.51.0>,request}
<0.52.0> got {5,<0.53.0>,ack}
<0.50.0> got {5,<0.53.0>,ack}
<0.52.0> acquired lock 
<0.50.0> got {5,<0.51.0>,ack}
<0.52.0> got {5,<0.51.0>,ack}

I'm not sure I'm sold on the no-mutation thing in general. On one hand, it does mean that I don't have problems with state manipulation. On the other hand, if I'm already constrained not to have loops inside my functions (tail recursion!), so it seems to me that state control will be really simple no matter what, while the benefit in clarity by being able to say queue.append(msg) is large. Maybe eliminating loops inside a function body (by, say, making closures close-by-value, not close-by-reference) wouldn't remove any power from Erlang and would make the single-assignment problem not as annoying. I have the same complaints about Haskell.

Conclusion

The system works, in all combinations in which I tested it, including multiple nodes trying to access the lock at once, and various combinations of request/release. I don't have a way of testing performance, unfortunately, since this emphatically isn't spread over multiple computers and so message sends are actually rather cheap. I don't see a way of asking Erlang to randomly slow down message receives by several milliseconds.

Of course, this isn't an algorithm that one ought to use in production, if only because it doesn't deal with faulty nodes at all — you just end up waiting on faulty nodes forever. Lamport's "The Implementation of Reliable Distributed Multiprocess Systems" is a good introduction to implementing fault-tolerance; after you've read through that, work the standard Paxos algorithm and perhaps read Google's implementation white-paper, Paxos Made Live.

I particularly enjoyed coding the algorithm because, with the logical clock system in place, it becomes rather easy to reason about operations. In fact, the algorithm can be summarized in English: "To lock, tell everyone else to acknowledge you and then unlock if no one else requested a lock before you did. To unlock, tell everyone else to forget about your locking it". Beautifully simple, and the logical clock mechanism lets this be rigorous by making it easy to talk about "before".

I think the choice of Erlang was a good one for the implementation. Pattern matching request types made the core state machine very easy to implement, and various niceties like a functional list library and literal symbols make programming in Erlang rather pleasant. Even the syntax turns out not to be much of an issue. It is very consistent, even if it makes refactoring a huge pain.

On the other hand, I was unhappy with the facilities I was given for decoupling components of the system. In particular, the fact that there wasn't a performant way to separate the logical clock mechanism from the lock algorithm is a pain. I either have to add additional processes to act as proxies, or I have to structure my processes awkwardly. As far as I could tell. There's a good chance I'm simply missing some Erlang feature, and if so, do tell me.

Overall, I'm happy I learned some Erlang and read a good distributed systems paper. Hopefully you did too, dear reader.

Acknowledgments

Anders Lindgren, for the erlang-mode for Emacs, which completely blew me away. It's incredible! It's almost as fun to use as SLIME, and that's the language Emacs was built to edit. It by itself might be a reason to explore Erlang a bit more.

Ericsson, for Erlang, which was rather enjoyable to work with.

Leslie Lamport, for a beautifully clear and elegant paper.

[0] A distributed system is one where: 1) there exist multiple actors that can perform computation independent of and synchronously with each other, and 2) the communication delay between these actors is non-negligible. Multi-threading systems with shared memory are not distributed (since communication delay is free, or nearly free). The usual visual for a distributed system is several computers hooked up to a local network or the Internet. I'll call each of these independent actors a "processor", since it can process data, but again keep in mind that an actual single-computer multi-processor system is usually not a distributed system, since communication cost is low.

[1] Perhaps with the help of some tie-breaking mechanism. This tie-breaking mechanism has to be global if our ordering of events is to be consistent, but it can otherwise be arbitrary. For example, we can order events by the clock time, and then by the PID of the processor they happened on.

By on . Share it—it's CC-BY-SA licensed.

Any opinions, findings, and conclusions or recommendations expressed in this material are those of the author and do not necessarily reflect the views of the National Science Foundation.