Tuesday, October 25, 2011

The Impossible Isolates

…in which I continue to expound on the vast superiority of the Erlang Way, this time with the young and innocent Dart on the receiving end (but don't worry, Dart can handle multiple of those. The problem is that it must).

Google recently published a "technical preview" of the Dart language. It contains a notion of “isolates” — described by the language specification as "actor-like entities" — which serve partly as sandboxes, partly as "unit[s] of concurrency".

In the following, I intend to demonstrate that though the Dart isolates are presented as “inspired by Erlang [processes]”, there is much to be gained by taking a few more pages out of Erlang's book.

What's Impossible in Your Favorite Language?

When programming languages are discussed, focus tends to be on the things that are possible in a given language.
But what's at least as important — if you're doing anything beyond toy programs and obfuscation contests — are the things that are impossible in the language. What can't happen?

Impossibilities are very useful. They can be a great help when reasoning about programs — and when constructing programs which lend themselves to reasoning.
The impossible helps isolate relevant effect from irrelevant cause.

It's the same in real life: we do a lot of optimizations based on what we know can't happen; being able to make assumptions about the world is what lets us do, well, anything complex at all.

Gravity is quite reliable, for instance. I rarely bother to strap things to my desk these days, but rely on plain old gravity to keep them where I put them.

If I place something on my desk, then come back later to find that thing lying on the floor, and wonder how that came to be — then I may suspect a number things, but gravity outage isn't even considered a candidate explanation.
That's the difference between unlikely and impossible.

(With the different large-ish systems I've been working on for the past few years, unlikely happens all the time. And implausible (though not impossible) happens about once every 1–2 years (these events often involve virtualization).)

Narrowing down the "what may happen / what may have happened" scope is a crucial part of program debugging, understanding and maintenance, and hence of developing.

The blessings of Erlang

One of the critical things that Erlang provides, and a major reason why I like it, is the way its semantics and building blocks prevent surprises.
Two processes can't interfere with each others' state; they can't affect each others' control flow except through message passing; and if a process goes down, it disappears completely, without leaking resources.

The great thing about Erlang is not in itself that it allows you to do many things simultaneously, but that at the same time it lets you focus on one thing at a time. It is just as much about being able to think non-concurrently, as it's about concurrency. That's the way that complexity is handled: by isolation, through focus on the task at hand.
Much of the time, the result is that concerns separate nicely.

The three pillars of sound state

As Kresten Krab Thorup points out in his posting “Dart: An Erlanger's Reflections”, the three major features of Erlang which enable such focus on one task are isolation, sequencing, and fault handling.

These features consist of four (1+1+2) properties which can be summed up as, respectively,

  • Other processes cannot interfere with a process's state (except through message passing)
  • A process cannot interfere with its own state (but has a single, predictable control flow)
  • A process which fails cannot continue to run after the failure (with a possibly corrupt state)
  • When a process fails, and thus ceases to run, entities dependent on that process will be notified of its demise in a timely manner.

(Or, more consisely: “Others won't interfere”, “I myself won't interfere”, “Failure will not linger”, and “Failure will not go unnoticed”.)

Of these, I will in the following focus on the first three — the ones which are, incidentally, stated as impossibilities of the aforementioned kind.
The absense of either of these properties would destroy locality in reasoning.

One consequence of this set of properties is related to the famous "Let it crash" philosophy of the Erlang tradition: that even error handling is done only by the survivors — those processes with a “good” (non-corrupted) state.

The state of Dart (and stacklessness of its isolates)

The reason I write about all of this just now is related to Google's recently-published language “Dart” (released as an early preview; the spec is not final yet).

Dart introduces a notion of isolates — like Erlang's light-weight processes, isolates have separate heaps. Unlike Erlang processes, however, isolates don't have separate stacks, but are activated on an empty stack each time, through callbacks which process the incoming messages on a FIFO basis.

And as far as I can tell from the specification, nothing particular happens to an isolate if one of its callbacks terminate abnormally.
[Actually, I am told that as it is now, that will cause the Dart VM to crash. I will assume that this behaviour is not intended.]

Indeed, isolates appear to be more passive than active objects -- as far as I can tell, their life spans are determined, not explicitly and from within like Erlang processes' life spans, but implicitly and from without, by reachability, like OOP objects.

And that difference is a significant one: it breaks two of the three “pillars of sound state”: an isolate can surprise itself by handling an unrelated event when it perhaps shouldn't, and an event handler crashing within an isolate can leave the state in an inconsistent state without any consequences for the isolate's lifecycle.

Basically, what this means is that the invariants of an isolate's state most be reestablished by all of its event handlers, always, regardless of whether and how they terminate abnormally. This may just be par for the course in the eyes of many programmers, of course, but from an Erlang developer's perspective it's very much a “close but no cigar” situation.

The state-space explosion that commonly happens when a state machine must be able to handle every kind of events in every state is exactly one of the things that drove the development of Erlang in the first place (and led to the “selective receive” feature).

Case in point: Synchronous calls

An example of a common pattern where this matters is as follows: One actor needs, during the processing of a message, to contact another actor for information necessary to complete the processing. This is a situation that commonly occurs in Erlang programs, and in Erlang, the typical solution would be to make a synchronous call to the other actor: send a request, then wait for a response before continuing execution. (In most cases, a timeout would be set as well, to ensure that execution will in fact continue.)

This send-request, wait-for-response operation pair can be put together in a function, so that the invocation of another actor looks just like a normal function invocation — the communication can be encapsulated; in fact, the send/receive code is rarely actually spelled out, because it is already provided by the standard library.

In Dart, the situation is somewhat different: to do a synchronous call to another actor, you'd create a Promise (a one-shot message queue), and send the send-end + the request to the actor in question. That actor would then put the result to the Promise. But the caller cannot just explicitly wait for the response; instead, it must register a response handler on the Promise. The handler would typically be a closure which carries whatever state is needed to complete the processing.

All of these operation complete right away — execution continues immediately. This means that the remote call cannot be encapsulated and treated like a normal function call; modularity suffers. In Erlang, it is easy to change whether, how and to which other actor to make asynchronous calls — it's a local change, completely transparent to the call site; in Dart it looks like it wouldn't be so simple a matter.

If you were to approach the Erlang way in Dart, you'd have to write in continuation passing style (CPS), so that the call would always happen on a nearly empty stack — so that returning from a function means returning from the event handler; this means that the current constraints — that event handling always begins and ends with an empty stack — wouldn't matter, because the real stack would be in the continuation.
When using CPS, calling another actor could indeed be made to look just like another function call.

Except that it wouldn't behave like an ordinary function call, because other incoming messages could be processed between the call request and the call response — leading to the problems mentioned above with self-interference.

The funny thing about this CPS-based approach, by the way, is that this attempt to buy back the between-events stack that is necessary for encapsulation of calls, essentially overdoes it: what you get is not one call stack, but a number of simultaneous call stacks (disguised as response handler continuations), all ready to continue execution as responses become ready. A multitude of threads of execution which can trip each other up and cause all sorts of nondeterminism-induced problems.

If the intention of the single-threaded isolates is to reduce the complexity of developing concurrent systems, then this would not be a particularly good outcome.
No stack is too little; a multitude of stacks is too many. One is the number we want, the one we can reason about.

Please animate the isolates!

The above describes the present situation, as I read the Dart specification (which I must admit I haven't done end-to-end; I've mainly been focusing on the isolate- and exception-related parts).

Since the Dart language is still in development, and Google is requesting input to the development process, these things may of course change.

My input to the process, then, is this:

Give the isolates an explicit life cycle — just isolating the heap, thus ensuring sequential heap access, is a good first step, but to fully simplify matters and keep invariants local, an isolate must be in control of its own lifecycle, including which types of events it will be ready to process at any given moment.
In short, give them liberty and give them death! (to paraphrase Patrick Henry rather freely).

Monday, October 10, 2011

Dynamic Selective Receive — an Erlang hack

Erlang is a language which is dynamic in many aspects.
One of the things that are resolved statically, however, is pattern matching; the decision trees used for branching in function clauses, case statements, and receive statements are constructed on compile time.

For instance, a set of patterns like

[X] when X==0 orelse X==100
[H|T]
[]

is translated by the Erlang compiler into the following decision tree:

is_nonempty_list(B)?
+-Y-> [H|T] = B
|     is_nil(T)?
|     +-Y-> H == 0?
|     |     +-Y-> case 1
|     |     |     ^
|     |     |     |
|     |     |     Y
|     |     |     |
|     |     +-N-> H == 100?
|     |           |
|     |           N
|     |           |
|     |           v
|     +---------> case 2
|
+-N-> is_nil(B)?
      +-Y-> case 3
      +-N-> No match.

The Erlang compiler is quite good at constructing such decision trees, which is good, because it's critical to the performance of typical Erlang programs.
(Actually, if the compiler was poor at that job, “typical Erlang programs” would probably often look different, because developers would be more inclined to write their own decision trees. Compilers form developers, to some extent.)

If you want to construct a predicate dynamically, you can do that, using function composition or an interpreter function — that is for instance how the Eshell is implemented. For instance, the first pattern above can be described as

{is_pair,
 {'or', {is_eq, 0}, {is_eq, 100}},
  is_nil}

which can be interpreted by a predicate evaluator like:

eval(is_nil, []) -> true;
eval({is_pair, HP, TP}, [H|T]) -> eval(HP,H) andalso eval(TP,T);
eval({is_eq, V}, V) -> true;
eval({'or', P1,P2}, V) -> eval(P1,V) orelse eval(P2,V);
eval(P, V) when is_function(P,1) -> P(V);
eval(_, _) -> false.

However, you can't do a selective receive using such a predicate, because one of those statically-constructed decision trees is always used for selecting which message to extract from the inbox; and once it's taken out of the inbox it cannot be put back again. Only after a message is extracted from the inbox can more general code be applied to it.

When is this a problem?
Well, the Erlang shell is one example; here, dynamic selective receive will have to be faked: buffering is needed, and in fact the timeout mechanism of the Erlang receive construct is (last I checked, which is a few years ago) emulated only incompletely. I know that I filed a bug report about that issue, but I can't find it now.

Also, it's an issue that may sometimes arise when you're trying to combine two concerns in one process, and each of the concerns communicate with other processes; in such cases, it would sometimes be convenient to have parameterizable selective receive.

Solutions

For this problem, I've found a solution which looks promising.

The solution is sufficiently cheaty, however, that I should probably first mention a couple of more “in the spirit” solutions; all three solutions can be considered to be cheating, but at least I can offer different flavours of unrulyness.

  1. Construct, dynamically, a module which implements a function which does the selective receive. Constructing and compiling modules dynamically is relatively easy in Erlang. Calling a function in a runtime-specified module is just everyday Erlang; it's Erlang primary kind of polymorhism.

    You wouldn't want to do this module construction business for something that needs to be done a million times, but it's probably a good exercise.

  2. Take as a parameter a predicate function which is to be used for selecting the message. Access the process's inbox through the backdoor: process_info(self(), messages), and apply the predicate to each message in turn until an acceptable one is found (or the end of the list is reached, in which case you wait a bit and retry). Now you know exactly which message you want to extract, and that's straightforward to do (the decision tree for that is static).

    I can only recommend this approach if you don't care about performance or ugly hacks. Also, there's no really good way of handling the no-such-message-yet case, as far as I can see.

  3. Take advantage of the fact that what is possible in Beam — the VM's instruction set — is a superset of what's possible in Erlang. In particular with respect to the receive construct. This is the solution that I will describe below.

The following module is a quite general solution which uses a matchspec as a predicate. It is written in Erlang assembly, because at this level this hack becomes possible:

%% File: dyn_sel_recv.S
{module, dyn_sel_recv}.  %% version = 0
{exports, [{match_spec_receive,2}]}.
{labels, 5}.

{function,match_spec_receive,2,2}.
  {label,1}.
    {func_info,{atom,match_spec_magic},{atom,match_spec_receive},2}.
  {label,2}.
    {allocate,2,2}.
    {move,{x,1},{y,0}}.
    {move,{x,0},{y,1}}.
  {label,3}.
    {loop_rec,{f,5},{x,0}}.
    %% x0 = Msg, y0 = Timeout, y1 = CMS
    {test_heap,2,2}.
    {put_list,{x,0},nil,{x,0}}.
    %% x0 = [Msg]
    {move,{y,1},{x,1}}.
    %% x1 = CMS
    {call_ext,2,{extfunc,ets,match_spec_run,2}}.
    %% x0 = [] | [Res]
    {test,is_nonempty_list,{f,4},[{x,0}]}.
    {get_list,{x,0},{x,1},{x,2}}.
    {move,{x,1},{x,0}}.
    %% x0 = Res
    remove_message.
    {deallocate,2}.
    return.
  {label,4}.
    {loop_rec_end,{f,3}}.
  {label,5}.
    {wait_timeout,{f,3},{y,0}}.
    timeout.
    {move,{atom,timeout},{x,0}}.
    {deallocate,2}.
    return.

To test it, we have this module:

%% File: dyn_recv_test.erl
-module(dyn_recv_test).
-include_lib(“stdlib/include/ms_transform.hrl”).
-export([test/0]).

test() ->
    %% The pattern (and what it should result in):
    MS = ets:fun2ms(fun({ping, X}) -> {pong,X} end),

%% Compile the pattern:
    CMS = ets:match_spec_compile(MS),

%% Perform a selective receive based on the pattern:
    dyn_sel_recv:match_spec_receive(CMS, 1000).

A sample interaction, with comment marked with "#" and "%":

# Compile the code:
$ erlc dyn_recv_test.erl dyn_sel_recv.S

# Then start an Erlang shell:
$ erl
Erlang R14B (erts-5.8.1) [source] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.1  (abort with ^G)
1> dyn_recv_test:test(). % No appropriate messages - times out
timeout
% Fill the inbox with test data:
2> self() ! 'before'.
before
3> self() ! {ping, 144}.
{ping,144}
4> self() ! {ping, 1234}.
{ping,1234}
5> self() ! 'after'.
'after'
6> process_info(self(), messages). % There are four messages in the inbox now
{messages,[before,{ping,144},{ping,1234},'after']}
7> dyn_recv_test:test(). % Receive first ping message
{pong,144}
8> dyn_recv_test:test(). % Receive second ping message
{pong,1234}
9> dyn_recv_test:test(). % Times out again - no more ping messages
timeout
10> process_info(self(), messages). % Now, only the non-pings are left.
{messages,[before,'after']}

Discussion

Why restrict ourselves to matchspecs?

In principle you could instead construct a receive function which accepts a general predicate function, but that would be risky — while I'm reasonably confident that the matchspec hack works (no guarantees though!), I'm fairly certain that the Erlang VM would take offense if such a predicate were to e.g. do any receiving of its own, or throw exceptions.

Matchspecs, on the other hand, can do just that which is possible to do in a regular pattern matching construct in Erlang — in particular, no side effects are allowed.

And matchspecs can be preprocessed (“compiled”) into a form which is interpreted efficiently; yet their source form can be constructed with relative ease at runtime. Finally, matchspecs have the nice thing about them that even though they are represented as ordinary Erlang terms, and as such may be a somewhat hard to read (and write) if they are complex, there is a parse-tree transformation which makes it possible to write them just as ordinary Erlang patterns and conditions — this is demonstrated in the above.

Now, I haven't yet had the chance to use this hack in a real context, but here it is. If nothing else, I suppose it demonstrates that it is quite possible to allow matchspecs in Erlang's pattern-matching constructs — at least that there does not seem to be any VM-level reason against it. I think that might be an interesting language extension; then again, there are plenty of other language extension discussions, and this suggestion would probably not get high priority.

Monday, August 15, 2011

An Erlang-Java Interop Demo

Erlang is designed and used for distributed systems. As such, it is quite good at talking to itself. And as I'll show in the following, it is — at least in some cases — reasonably good at talking to other languages as well.

When would you want to do this?

(You can skip this section if you know.)

Interoperability between languages is useful when there's value to be had in the software in both ends. You might prefer to keep some functionality in Erlang because it's already written in that language; because it has better libraries for the task; or simply because the language supports that task better. The same goes for the other language: there are C libraries around for almost anything (and plenty of C/C++ legacy code), and for GUIs you might prefer e.g Java.

Or you have the same functionality implemented in both languages, and wish to do comparison tests on the two implementations.

Or the architecture is inherently distributed — e.g. a client-server configuration — so that there is no advantage to be had in building the two parts using the same language anyway.

In the following demonstration, we have a Java side with some freshly written software with interesting bugs, and an Erlang side with an interesting tool for discovering bugs and a nice-ish language for specifying tests.

The demo

What I intend to do:

  • Define an interface;
  • Write a Java function;
  • Make it accessible from Erlang;
  • Test it using Erlang.

Sounds like a lot of work? It needn't be. Not for you, anyway; most of the heavy lifting has already been done.

(If you don't care about exposition, and are just interested in the technical end result, here is the executable summary.)

An interface

First, we'll need to define an interface. For this, we use the IDL — Interface Description Language:

// File "foo.idl"
interface Foo {
  string quuxinate(in string s);
};

We translate this into Java using the Erlang compiler:

erlc '+{be,java}' foo.idl

This results in a Java interface in Foo.java, as well as some stub code we'll be using.

A function in Java

For an implementation, let's say that quuxinate() is to reverse the string. To add a twist, let's say that it breaks down under some rare, complex-ish circumstances, such as when there's a letter which occurs thrice in the input string — that'll be a bug we can find later, when we start testing it:

// File "FooImpl.java"
public class FooImpl extends _FooImplBase /* which implements Foo */ {
  public String quuxinate(String s) {
    int[] stats = new int[256];
    for (int i=0; i<s.length(); i++) {
      char c =  s.charAt(i);
      if (c<255 && ++stats[c] >= 3) throw new RuntimeException("WTF");
    }
    return new StringBuilder(s).reverse().toString();
  }
}

Making the function accessible from Erlang

You may have noticed that we don't implement Foo directly, but rather extend an stub class which implements it. This means that the class already knows how to talk to Erlang — specifically, it knows how to handle requests like a gen_server.

What is left is to establish connection to an Erlang node. There are two frameworks for this: CORBA and Erlang inter-node communication. CORBA is, I believe, the heavy-weight option, and I haven't worked with it; in the following, we'll use Erlang inter-node communication.

A runnning Erlang program (OS-level process) is often referred to as a 'node', because Erlang is designed for having multiple such programs be connected, in what is known as a 'cluster' of nodes.

It's not an exclusive club, either: nodes can be, and usually are, Erlang nodes, but other kinds can enter the mix — a C or Java node, for instance.

A bit of code is needed to run a Java program as a node; don't worry though, this is the only lengthy bit, and it can be generalized so that it doesn't have to refer to Foo:

// File "FooServer.java"
public class FooServer {
  // The following is based on the program in lib/ic/examples/java-client-server/server.java
  static java.lang.String snode = "javaserver";
  static java.lang.String cookie = "xyz";

  public static void main(String[] args) throws java.io.IOException, com.ericsson.otp.erlang.OtpAuthException {

    com.ericsson.otp.erlang.OtpServer self = new com.ericsson.otp.erlang.OtpServer(snode, cookie);

    System.err.print("Registering with EPMD...");
    boolean res = self.publishPort();
    if (!res) throw new RuntimeException("Node name was already taken.");
    System.err.println("done");

    do {
      try {
        com.ericsson.otp.erlang.OtpConnection connection = self.accept();
        System.err.println("Incoming connection.");
        try {
          handleConnection(connection);
        } catch (Exception e) {
          System.err.println("Server terminated: "+e);
        } finally {
          connection.close();
          System.err.println("Connection terminated.");
        }
      } catch (Exception e) {
        System.err.println("Error accepting connection: "+e);
      }
    } while (true);
  }

  static void handleConnection(com.ericsson.otp.erlang.OtpConnection connection) throws Exception {
    while (connection.isConnected() == true) {
      FooImpl srv = new FooImpl();
      com.ericsson.otp.erlang.OtpInputStream request= connection.receiveBuf();
      try {
        com.ericsson.otp.erlang.OtpOutputStream reply = srv.invoke(request);
        if (reply != null) {
          connection.sendBuf(srv.__getCallerPid(), reply);
        }
      } catch (Exception e) {
        System.err.println("Server exception: "+e);
        e.printStackTrace(System.err);
        handleException(e, connection, null);
      }
    }
  }

  static void handleException(Exception e, com.ericsson.otp.erlang.OtpConnection connection, com.ericsson.otp.ic.Environment env) throws Exception {
    // We'll improve on this later...
    throw e;
  }
}

Time to build and try out:

ERL_ROOT=`erl -noshell -eval 'io:format("~s\n", [code:root_dir()]), init:stop().'` # Or simply where Erlang is.
IC_JAR=`ls -1 $ERL_ROOT/lib/ic-*/priv/ic.jar`
JI_JAR=`ls -1 $ERL_ROOT/lib/jinterface-*/priv/OtpErlang.jar`
CLASSPATH=".:$IC_JAR:$JI_JAR"
javac -classpath "$CLASSPATH" *.java

epmd # Start Erlang port mapper daemon if it isn't already.
java -classpath "$CLASSPATH" FooServer

The Java node should now be running and registered as javaserver — ready to accept connections with the right cookie.

Let's test that:

erl -sname tester -setcookie xyz
> {ok,Host}=inet:gethostname().
> JavaServer = {dummy, list_to_atom("javaserver@"++Host)}.
> gen_server:call(JavaServer, {quuxinate, "Testing, 1-2-3"}).

The reply should be the reverse string:

"3-2-1 ,gnitseT"

And the bug we put there is working too:

> gen_server:call(JavaServer, {quuxinate, "Testing, 1 2 3"}).
** exception exit: {{nodedown,javaserver@flitwick},
                    {gen_server,call,
                                [{dummy,javaserver@flitwick},
                                 {quuxinate,"Testing, 1 2 3"}]}}
     in function  gen_server:call/2

Property testing of Java code

Now then, what can we do with this setup?

One interesting thing that we can do is to apply one of the property-based testing tools to our Java function. In the following, I'll be using Triq, but Quviq QuickCheck or PropEr could be substituted with only minor changes. First, assuming that you haven't got Triq, but have git:

git clone git://github.com/krestenkrab/triq.git
(cd triq && ./rebar compile)

Then we are ready to write our test — namely, that given any (ASCII) string, quuxinate() should return the reverse string:

// File "test.erl"
-module(test).
-include_lib("triq/include/triq.hrl").
-export([main/0]).

prop_reverse(JavaServer) ->                  % The property
  ?FORALL(S, ascii_string(),
      gen_server:call(JavaServer, {quuxinate, S})
      == lists:reverse(S)).

ascii_string() ->                            % A data generator
  list(choose(0,127)).

main() ->
  {ok,Host}=inet:gethostname(),
  JavaServer = {dummy, list_to_atom("javaserver@"++Host)},

  triq:check(prop_reverse(JavaServer), 100), % Do the magic
  init:stop().                               % Shut down cleanly

Compile and run the test:

erlc -I triq/include -pa triq/ebin test.erl

erl -noshell -sname tester -setcookie xyz -pa triq/ebin -run test main

Triq will now generate a hundred random test cases, and verify the property for each case. After a dozen such tests, the bug is triggered:

...........Failed with: {exit,
                 {{nodedown,javaserver@flitwick},
                  {gen_server,call,
                      [{dummy,javaserver@flitwick},
                       {quuxinate,
                           [79,84,75,110,3,42,73,14,1,53,76,42,126,40,118,122,
                            74,2,58,34,42,98]}]}},
                 [{gen_server,call,2},
                  {test,'-prop_reverse/1-fun-0-',2},
                  {triq,check_input,4},
                  {triq,check_forall,6},
                  {triq,check,3},
                  {test,main,0},
                  {init,start_it,1},
                  {init,start_em,1}]}

Failed after 12 tests with {'EXIT',
                            {{nodedown,javaserver@flitwick},
                             {gen_server,call,
                              [{dummy,javaserver@flitwick},
                               {quuxinate,
                                [79,84,75,110,3,42,73,14,1,53,76,42,126,40,
                                 118,122,74,2,58,34,42,98]}]}}}

The 22-character string has three '*'s (ascii value 42) in it, which was what triggered the bug.

Triq then proceeds to simplify the test case:

Simplified:
        S = [33,33,33]

concluding that the string "!!!" is a locally-minimal failing test case.

Quite useful, isn't it? — and the amount of non-reusable code has been quite manageable (3 lines of IDL, 14 lines of implementation, 14 lines of test code, 1 line of Foo-specific code in FooServer).
(Speaking of which: you're free to use these snippets as you see fit; provided as-is and with no guarantees of anything, of course.)

I should mention at this point that property testing tools exist within the Java world as well — there exists at least one for Scala. I didn't find it particularly satifying to use, though, although I may have been unlucky; in any case, this is an alternative.

Erlang, IDL and exceptions

The Guide (that is, the Erlang IC (IDL compiler) User Guide) has this to say about handling of Java exceptions:

While exception mapping is not implemented, the stubs will generate some Java exceptions in case of operation failure. No exceptions are propagated through the communication.

Which means that, out of the box, these are our options for handling Java-side exceptions:

  1. Convert exceptions into values within quuxinate().
  2. Return no result on exceptions — in which case the Erlang-side call will time out (after, as a default, 5 seconds).
  3. Close the connection — in which case the Erlang-side call will receive an error result immediately.

None of these options are especially satisfying.

So let's look at how to do this:

  1. Report exceptions to the caller as an {error, Reason} reply.

It's not too difficult. What we need to do (according to the gen_server call protocol) is send a {RequestRef, Reply} tuple to the caller, where RequestRef is a reference which was included in the request and must be included in the response as well.

The main difficulty is one of access: at the point of error handling, we will need access to (1) the caller's PID and (2) the request reference. I'd like to keep FooImpl and the connection-managing FooServer separate, so we need to add an accessor:

// File "FooImpl.java"
public class FooImpl extends _FooImplBase /* which implements Foo */ {
  ...
  /** The request environment is exposed for error handling reasons. */
  public com.ericsson.otp.ic.Environment getEnv() {
     return _env;
  }
}

That's it. We could instead have added one accessor for the caller PID and one for the request reference, but let's keep the complexity in the server class.

In the server class, instead of throwing an exception which causes the connection to be terminated, we will build and send an error reply:

// File "FooServer.java"
public class FooServer {
  ...
// in handleConnection(), replace
        // handleException(e, connection, null);
// with:
        handleException(e, connection, srv.getEnv());
  ...
// and replace handleException() with:
  static void handleException(Exception e, com.ericsson.otp.erlang.OtpConnection connection, com.ericsson.otp.ic.Environment env) throws Exception {
    // Write exception reply:
    com.ericsson.otp.erlang.OtpOutputStream err_reply = new com.ericsson.otp.erlang.OtpOutputStream();
    err_reply.write_tuple_head(2);
    err_reply.write_any(env.getSref());
    err_reply.write_tuple_head(2);  // Construct return value {error, ErrorText}
    err_reply.write_atom("error");
    err_reply.write_string(e.toString());
    connection.sendBuf(env.getScaller(), err_reply);
  }
}

There; that's all.

If we test it again, we get a nicer behaviour:

> gen_server:call(JavaServer, {quuxinate, "Testing, 1 2 3"}).
{error,"java.lang.RuntimeException: WTF"}

Disclaimer

I ought to mention that I have only just discovered this interoperability option (two days ago, in fact, when I stumbled upon this article); it is only a few months ago that I wrote a custom socket server test program in Java, and corresponding driver code in Erlang, just to achieve what could be had far more easily using Erlang's IDL compiler. It may therefore not be so easy to use in practice as the above make it seem. (For one thing, the only type that I've used is string.)

Even so, I hope to have inspired others to try this out.
One way to do so is to have a look at the executable summary referred to earlier; it is the demo as a shell script.

Happy inter-language hacking!


Further technical notes (and speculation)

The IDL compiler supports generating code for Java, C and Erlang. Here is its documentation.

If you're writing Erlang port programs (like an Erlang driver, but running in a separate process), but have an interface which is evolving quicḱly enough, or you yourself are lazy enough, that writing and maiintaining the serialization and deserialization code has quite lost its attraction, then it might be tempting to look into using IDL for that interface, and have the boring bits code-generated for you.

The IDL compiler and the code it generates does not seem to have been designed for this, but as far as I can tell it appears to be achieveable.

Friday, June 3, 2011

Concurrent Design as a Matter of Cause

I've written earlier about designing for concurrency in the small.
But even the best and most flawless bricks can be put together in far more meaningless ways than meaningful ways.
In the following, I'll consider concurrent design on a larger scale, and introduce a tool which may be useful to ensure soundness in a design.

What is concurrent design about? (Performance aside — we'll focus on soundness for the moment.) What is the basic units for reasoning about concurrent, possibly distributed systems?
Mutexes, messages, transactions — these are among the building blocks. But the connecting mortar is causality chains. When a database client, for instance, submits data to a database server, it can rely on the data to be persisted only if there is a causality chain leading from the moment the client receives the "success"-response, back to when the database sent it, and further back to when the database server's hard drive physically wrote the last block of the transaction.
If there is no causality chain, then no timing assumptions can be made. Which is why, in a given concurrent design, it is prudent to ensure that the necessary causality chains are present.

Example: the "update take-over" pitfall

This is a concurrency design pitfall which I learned about a few years ago. I'd forgotten all about its non-obviousness, until it came up recently in a design discussion. This incident suggested to me that the problem might be sufficiently non-trivial for there to be a lesson to pass on. And I will do that — describe the problem, the naïve-but-wrong solution, and some correct solutions — but do so a bit more elaborately than I originally planned, because the focus will not be on the solutions themselves, but rather on the process: a method to arrive at them.

The scenario: A client (C) must keep track of some object's state. It does so by subscribing to changes (updates). However, there are two update sources, and it is desirable to change over from on (A) to the other (B) from some point on. That is, before the take-over, A provides all updates; after the take-over, B provides all the updates.
A common variation of the pattern is that A is providing snapshots of the object's state through an synchronous request-response protocol, rather than providing updates through a subscribe-publish mechanism.
For simplicity, this variation is the scenario I'll be focusing on in the following; it is depicted as UML to the right.

The naïve solution: Simply set up the subscription to B before getting the snapshot from A.

The threat: An update is missed because it "falls into the crack" caused by the take-over — it is processed by A too late to be part of the snapshot, but is processed by B too soon, before the subscription is set up.
The core of the issue is that when there are two independent message paths, we cannot assume anything about their relative timing; even though they have a common source, events in one path may overtake events in the other path. 

Analysis:
What does it take to get this setup to work as expected?
Any update must reach the client, either through A (the snapshot) or B (the following updates).
Or, from a causality view:
There must be a causality chain ruling out "update is processe by A after the snapshot is made, but is processed by B before the subscription is set up".
A causality chain is a series of causality links, which are defined as follows:

Causality rules

  1. There is a causality link from X to Y if X happens before Y and they both happen in the same thread.
  2. There is a causality link from a message is sent to its reception.
  3. Two message receptions may be causally linked if the transport layer guarantees that one is delivered before the other.
    For instance, TCP guarantees that message within the same connection are delivered in order, i.e. that if X is sent before Y (and in the same direction), then X is delivered/received before Y.
    Similarly, in Erlang, message delivery order is guaranteed for any sender/receiver pair.

Exercise/Kata: I think this problem makes a rather nice kata in concurrent design.
If you'd like to try it yourself, do so before reading on.
How many different solutions do you see?

Enter the loop

Here is the trick: Causality loops are impossible. You can't enter a causality loop; they correspond to paradoxes. (Proof by paradox is an ancient technique. also known as proof by contradiction.)
Thus, one way to attack the problem is to reformulate the requirement as follows:
If update is processed by A after the snapshot is made, and the same update is processed by B before the subscription is set up, then there is a causality loop.
The loop in question is absent, however, if either of the two sub-conditions are absent.
The situation can be represented graphically:

This is quite a bit simpler than the UML diagram above. But actually it captures the essence of the problem.
And it is useful, because it leads to a further refinement of the problem statement: How can we introduce a loop which is absent if we reverse either of the two arrows?
We will, however, also need the context in order to translate our findings back into concrete solutions. The context looks like this:

Getting solutions through loops

That question is not too hard: simply add A2→B1 and B2→A1, to get a four-edge loop which disappears if any edge is reversed or removed.
Translating backwards, what does this mean in terms of the original problem?

Solution 1:
The client subscribes synchronously to B before getting the snapshot from A. The event origin publishes updates synchronously to A before it sends them to B.
Now, the first part of that solution is reasonable, but the second part is often not directly achievable — the event origin may just have a generic publish-subscribe scheme, and be oblivious to the difference between the nature of A and B.
But there is another interpretation of the A2→B1 edge.
It can be realized by linking A2 or anything "downstream" (causally later) from A2 to B1 or anything "upstream" (causally earlier) from B1.
By using a direct A2→B1 link, we obtain:

Solution 2: The client subscribes synchronously to B before getting the snapshot from A. When A has processed an update, it sends it on to B (which gets the updates by this route, rather than directly from the event source).

Likewise, the A2→B1 edge can be realized by a direct link.  This results in another two solutions:

Solution 3: The client does not contact A directly in order to get a snapshot.  Instead, the 'subscribe' operation provided by B both subscribes and obtains a snapshot from A.

Solution 4: Like in #2, updates are sent only to A, which sends them on to B; like in #3, the client contacts only B, which both registers a subscription and requests a snapshot for the client from A.

Merging nodes

Revisiting Solution 2, we find that, depending on the situation, this solution may render B somewhat moot. It may make sense to eliminate it altogether, which is yet another interpretation: A2 occurs in the same thread as B1, and B2 occurs in the same thread as A1.
In terms of causality graphs, this is a correct solution because merging A1 with B2 and B1 with A2 results in a two-edge loop:

Solution 5: The snapshot provider also plays the role of update publisher. This combined service provides a "get snapshot and subscribe for updates" operation.

In general, merging existing nodes may provide additional ways to obtain causality loops.

Solutions involving conditional edges

We now have usable solutions, but our options haven't been exhausted yet.
There are other ways of causing loops in the graph — but in order to ensure that the loop is only present when it should be, we will need to introduce conditional edges — which are only actually there in certain circumstances.

For instance, consider the loop caused by adding the edge A2→A1. In order to ensure that this loop is absent if the B1→B2 edge is reversed, we will need to put a condition on the new edge saying "only if the update at A2 has already happened at B before the subscription at B2." This means that information from the subscription operation is required to evaluate the condition, so we'll need a B2→A2 edge as well:
Also, we will need the updates to be identifiable in some way, by timestamp, serial number or similar; let's assume serial numbers.

Just as Solutions 1 and 2 differed in how the A2→B1 edge was implemented — whether it was present as a link directly from A to B or from A to the event source to B ­— so there are at least two solutions corresponding to this graph, because an A2→A1 can be realized both directly and as A2→C1 (because C1 is upstreams of A1).
The A2→C1 edge involves the client:

Solution 6: Subscribing to B is a synchronous operation, which returns the ID of the last known update (which is the last update not published as result of the subscription). The "get snapshot" operation provided by A returns both the snapshot and the ID of the last update included in the snapshot. The client first performs the subscription, then requests snapshots repeatedly, until the snapshot includes the last update not published.

In the direct variant, the A2→A1 edge is kept within A; if sending snapshot is expensive, this is probably better:

Solution 7: Subscribing to B is a synchronous operation, which returns the ID of the last known update. The "get snapshot" operation  takes an update ID and checks it against the last known (by A) update. Sending the snapshot is delayed until the update in question has been processed by A.

So much for that causality loop; moving on, we consider the loop caused by adding a B2→B1 edge. It also needs to be conditional, with the same condition; this time, we need to add an A1→B2 edge to have the information required to evaluate the condition:
In domain terms: "If the update is not in the image when subscribing, then reprocess the update."
This seems to imply memory:

Solution 8: The update provider B caches a suitable amount of the most recent updates. The "get snapshot" operation returns both the snapshot and the ID of the last update included therein. The subscription operation takes an update ID, and results in all updates since that one to be sent to the client — whether they have already been processed before the subscription took place (in which case the update is taken from the cache), or arrive at B at a later point. The client first gets a snapshot synchronously, then subscribes with the ID thus obtained.

There are more (of course there are)

The set of solutions found so far is obviously not complete; for instance, none of the solutions we've considered have involved reinventing even a quarter of Common Lisp (or Erlang).
Further meaningful solutions exist; for instance, one that lets the snapshot provider send updates until the real update provider has taken over (which resembles a hybrid between #5 and #7); or one in which the snapshot provider tries to include a given update ID (as in #7) but times out if this takes too long, in which case the client may repeat the snapshot operation (as in #6). Or one in which the "get snapshot" operation is replaced with a "get snapshot if update u is included" (also a hybrid between #6 and #7).

Conclusion

We have looked at a concurrent design problem in the view of causality chains, and attacked it using the concept of causality loops (and with the help of causality graphs). Using a fairly simple technique, we have generated a number of solutions — quite different solutions, as it turns out, but all correct and realistic, and with different trade-offs.
Causality loops as a design tool appears to be a useful way both to find correct solutions and to explore and outline the design space for concurrent design problems.

Acknowledgement

I would like to thank Kresten Krab Thorup, who unknowingly became the cause of this posting, and indirectly of many of the ideas in it.

Monday, May 23, 2011

On concurrency issues

Concurrency issues — race conditions and the like — are the worst category of bugs. These are the bugs that cannot well be proven absent by unit tests; these are the kind of bugs that hide away, biding their time until the most inopportune moment, then rearing their ugly, non-deterministic head on your production system when it is at its busiest. And even then, they can continue to exist unlocated for quite a while, despite many hours being put into tracking them down. Elusive, hardly reproducible, yet ultimately expensive; I've seen it happen more than once.

What follows is some thoughts on the basis of the typical issues.

Your basic multi-threading bug
As any course on multi-threaded programming will tell you, when multiple threads of execution are to run concurrently, care needs to be taken or there will be a risk of data corruption and/or unintended results.

More specifially, the threat (a "race condition" or "data race") is present when:
  1. one thread modifies the state of an object
  2. at the same time as
  3. another thread accesses the object.
That is: it takes a coincidence, a conjunction of three conditions.
Let's analyze it...:

Analysis

Another way of stating the above is obtained by reversing the statement:
We can avoid concurrency issues by always making sure that any object either
  1. is never modified; or
  2. is never accessed by two threads simultaneously (or stronger: never written to while accessed otherwise); or
  3. can only ever be accessed by one thread.
Such objects are known as, respectively,
  1. Immutable objects.
  2. Objects with state protected by a mutex (synchronization lock; monitor)
and the one used less often:
  1. Single-thread objects — or even stronger: objects of linear type.
The first two options are well-known; personally, I'm increasingly coming to consider the strong third option — objects with enforced linear lifecycle — to be rather overlooked, language design space-wise.

"Concurrency-ready" metric
One possible metric for how well a programming language is designed to express complex concurrent systems is, then, how easy it is to enforce that all objects fall into one of the three categories.

Languages and concurrency

Let's apply this view to a few programming languages. The two languages I've used most recently are Java and Erlang:
Erlang
In Erlang, there are the following kinds of objects: terms, message queues, process dictionaries and other process metadata, ETS tables, ports (files and drivers).
  • Terms are immutable values. They may contains handles of other kinds of objects, but the handles themselves are also immutable.
  • Certain objects — private ETS tables, and to some extent ports — are single-thread objects.
  • The rest — message queues, public ETS tables, process metadata etc. — are mutex-protected.
Single-thread objects are enforced not to be used by other threads than the owning one.
In Erlang, low-level data races only occur if there's a bug in the Erlang run-time system, or if you write your own driver and include one.

Java
In Java, you could argue both that there are fewer kinds of object — just one, really, the class-file defined kind — and that there is a much wider range of object kinds.There are certainly thousands of classes, defined by one well-defined scheme, which includes just a few mechanisms relevant to concurrency.
But the problem here is the number of classes — because it is at the class level that it is ensured that the corresponding objects will be thread-safe. A well-designed and -implemented class may be thread-safe, in that it is either immutable or uses appropriate inter-thread synchronization (and encapsulation) to ensure thread-safety. A less well-written class may rely implicitly on details of the context in which it is used, and really be single-thread-use only, or multi-thread usable only in certain unstated conditions.
Java is one of the few languages actually designed for portable multi-threaded programs; in particular, it has explicitly stated semantics wrt. multi-threaded execution. However, as the above comparison is one indication of, it has its shortcomings. For a language to claim good concurrency support, it should provide mechanisms for good, usable guarantees to be derived from local context (e.g. "we know that this-and-this property always holds, because these few lines here ensure that it does").
I've expounded earlier on the importance of supporting local reasoning. As it happens, Java did also then get some criticism (sorry, Java, but you're the modern main-stream language I happen to know the best...).

A Java exercise
Imagine that you have in front of you the source code of a Java class.
A quick inspection reveals that all methods are declared "synchronized".
What kinds of thread-safety issues might the class yet have? Try to list at least three ways in which the class may be non-thread-safe.
I'll present my list at the end of this article.

Does it matter?

But building security against these issues into a language is not exactly trivial, you might argue. Is stricter language rules, additional compiler analysis, and/or costly run-time support just for preventing concurrency issues not just overkill?
Sure, the trend is towards increasing parallellism and so on, but we have done quite fine without such extra measures so far, haven't we?
And bondage-and-discipline languages have been out of fashion for a while. Dynamically typed languages are as popular as ever!

The difference is this:

You can easily live and work with the relative uncertainties of dynamic typing — but then, you can unit test and get some confidence that the types match. If something is broken, or breaks later, then there's a good chance it'll be caught.
For many concurrency issues, unit tests are not likely to catch any errors. Furthermore, the necessary invariants aren't local — they are often widely dispersed in the code. To convince yourself that the code is correct, you more or less need to keep it all in your head at once. That, combined with missing language support for documenting and/or enforcing vital non-local invariants, means that they will perhaps not be communicated to whoever makes the next change in the code, who will therefore not have the full picture necessary to keep things correct.

Rather than being caught at the next full test suite run, or at least quite soon after the rubber hits the road, here's what happens to a race-condition bug:
  • The program may appear to work most of the time.
  • The issue will tend to manifest itself at the most inopportune moment: Not during development or testing (unless explicit and considerable effort is taken to stress-test against such issues), but in production, when your servers are at their busiest, or on the desktop of your busiest client.
  • Often, what clues you have amount to little besides "There almost certainly is an issue, it presumably is a software bug, the issue is probably in our code — and it occurs seldomly, so it's likely to be a concurrency issue of some kind."
  • Replicating the issue may be difficult; under the exact same circumstances, the program may run just fine.
    Indeed, if the problem does manifest itself during development, it'll appear to have gone at the next run.
    The issue may even be technically impossible to reproduce on some machine architectures, because it requires multiple physical processors of the right kinds to manifest itself (and your servers or other production environment is less likely than the development machines to be thus bug-resistant).
  • Eliminating a tricky concurrency bug can be a drawn-out experience in all phases — detecting it, reproducing it, tracking down its root cause, verifying that it has gone — all steps tend to be markedly more difficult than for deterministic bugs.
  • With any kind of bug, tracking it down once is one thing; there may be even be a feeling of gratification once you've done it. Tracking the same bug down twice is another matter — it is deeply frustrating to realize that there's a reason for your feeling of déja-vu. This is one of the reasons for regression testing: bug hunting may be stimulating in its own way, but it'd better lead to a different bug each time.
    This goes doubly (well, even more than that) for the elusive non-deterministic bugs.
    Sadly, because of their nature it is at best difficult, and often near impossible, to write regression tests for these kinds of issues.

Conclusion

Languages differ in concurrency support. Nothing new about that, of course, but I think it likely that many developers using one of the  mainstream languages which have a relatively good level of support in that area may not know that there are alternatives which are significantly more concurrency-ready.
In the beginning of this text, the prerequisites for a race condition were broken down and three kinds of conditions for avoiding race condition were derived; based on this I suggested a qualitative metric for a language's level of support for concurrent programming. I hope to have demonstrated that it may be a useful way of looking at both languages and concrete programs or program designs.

In the absence of a programming language providing strong, local guarantees with respect to thread-safety, as a developer you need to be alert whenever there's a chance that two threads may be executing your code concurrently. The best way of doing this is probably through discipline — for instance, by clearly constructing each class so that it falls into one of the above categories: Immutable, thread-safe through synchronization, or single-thread use only — and then using them strictly according to this. That is one way of trying to restore local reasoning.

Do you write programs involving multiple threads? If so, are you familiar with which consistency guarantees your platform actually provides (e.g, the Java Memory Model)?
Do you regularly stress-test your program on a system with multiple physical processors? I hope you do.
Dealing with concurrent programs in general requires good global, combinatorial-temporal reasoning abilities. Probably not your best-developed cognitive mode... :-)
The solution? Keep things as simple as possible. Find rules that work, and follow them. Encapsulate the issues, so that you can deal with just one question at a time. If possible, let the rules be checked mechanically.


Answer to Java exercise
Some ways in which a Java class may be unsafe even though all methods are declared "synchronized":
  1. A field is public, and either
    1. Non-final (and non-volatile) or
    2. Referring to a non-thread-safe object.
  2. The superclass is non-thread-safe, and the current class does not or can not override the causes.
  3. An instance method modifies the value of a a static variable without proper synchronization.
  4. An instance method reads the value of a a static variable which is also modified by a static method, without proper synchronization.
  5. A method exposes a reference to a non-thread-safe object referred to directly or indirectly by the current object.
    1. By returning such a reference.
    2. By passing such a reference to some method which stores the reference somewhere accessible to another thread.
    3. By starting a new thread and giving it such a reference.
    (I.e., a non-thread-safe object becomes shared.)
  6. A field (which may be private) refers to a non-thread-safe object which may be shared because
    1. It originated as (or was extracted from) a parameter to a constructor
    2. It originated as (or was extracted from) a parameter to a method
    3. It was returned from a call
    (I.e., a non-thread-safe object is already unexpectedly shared.)
  7. An inner class accesses an instance variable of the surrounding class, but fails to synchronize on the right this.
  8. An instance method locks on another object which may be of the same class (this may result in a deadlock)
    1. Implicitly, by calling a (synchronized) method on the other object
    2. Explicitly, using "synchronized"
    (A plausible example of this is a synchronized equals() method.)
  9. A constructor leaks this to some place where it can be accessed by another thread, and the object has at least one (final) variable which is accessed without synchronization.
    (This is because the special rule for final fields, which allows them to be accessed without synchronization, only applies after the constructor has completed.)
  10. A method exposes a reference to a thread-safe object referred to directly or indirectly by the current object, and that thread-safe object allows mutation (of itself or of a contained object) in a way that the current class is not prepared for.
This list is quite likely not complete; it is not the result of a systematic analysis of the matter.

Monday, May 16, 2011

Testing distributed-store algorithms

This is a follow-up to my post on a datastructure for storing collections in Riak.
While I have been planning this follow-up, Kresten Krab Thorup has actually gone ahead and implemented the algorithm (on GitHub; see src/riak_column.erl) — which suites me nicely, as I haven't written a single line of implementation and am not really past the thinking stage yet :-)
To wit:

Shortly after writing the post, I discovered a couple of issues or finer points, both of which have to do with the fact that different rows in Riak live independent lives —  they are independently versioned, and no cross-row guarantees are given — specifically, nothing can be concluded from the read order: if one client A updates row X, then row Y, then another client B may see the Y update and later see the old version of row X.

(If Riak is set up with appropriate consistency settings, such that the majority of the replicas of an object are written synchronously, then you do have indirectly some sort of guarantee. Below a certain threshold of machine and/or network problems, that is.
But even so, nothing can be concluded from the read order if there several clusters set up with cluster-to-cluster replication: changes on different keys in one cluster arrive at the other clusters in arbitrary order.)

This leads to at least the following issues:
  1. Firstly, when splitting a row into two, I included this step: "Write an empty row back under the original auxiliary row key".
    In fact, doing that would be wrong. Even writing some tombstone dummy value in the old row would be a bad idea; instead, one should simply mark the row as obsolete while keeping the old data. This is necessary because a client accessing the collection later on may see from the master row that the old row has been split, but not see the new rows. Or it may see the old row, but not the change in the main row. In the former case, it is necessary that at least the values in the old row be available (or the collection would suddenly have shrunk); in the latter case, it would not be apparent that the values in the old row are obsolete.

  2. Secondly, after modifying an auxiliary row, the main row should be updated — even nothing in it has changed. This may sound silly, but is the easiest way to ensure that, in the event of a concurrent row split, the auxiliary row in question is taken into account at a subsequent merge (indeed, that the merge is triggered at all).

  3. And even that is not enough if there are more than one cluster: At the time of the read repair of the main row, the update for the auxiliary row may not have arrived - and it may therefore be lost silently. It appears that some kind of versioning of the auxiliary rows is necessary; with such versioning, we can tell when repairing the main row that we're missing an update on the pre-split auxiliary row, and that the reference to it should therefore be kept in the main row, so that it can be repaired later when all information is available.
Ah, the perils and challenges of incomplete information.

Managing concurrent subtlety
As the saying goes,
Meddle not in the affairs of concurrent systems for their ways are subtle, and are quick to anger.
--
Tolkie(error: timeout)n
In a domain as subtle as this — with gotchas in the style of the above mentioned issues, how can we ever convince ourselves that a scheme like the one for Riak collections are implemented robustly and correctly?

As always, there are two ways: Formal verification — proving that the program is correct; and thorough testing — gaining confidence by exercising the program.
Both have their advantages and disadvantages.

If I were Dijkstra, I'd develop a formal proof (as he did in quite a few of his blog postings), and probably modify the algorithm appropriately along the way. I, however, am no Dijkstra, do not believe to have the time to develop a formal proof — nor do I have any delusion of being able to write anything worth reading in the process. Luckily, going the other way, through testing, have something going for it as well:
  • It is not always clear which guarantees the components we build on actually provide. That mean that, regarding formal proof, the set of axioms may be uncertain.
  • The same test can be applied to different implementations of a component.
  • The same test can be applied to different versions of a component — i.e. if we modify the code, we can cheaply gain some confidence in the modified version.
For both formal verification and testing goes that the answers you get depend on the questions you ask.
When proving a property, you prove only that property; when you test a concrete call sequence with concrete values, you test with only those values.
Which is of course a good reason to get familiar with property-based testing — which lies somewhere in between the two in that it tests a property on a number of concrete call sequences — typically a few hundred, and typically different instances for each time a test is run.

This can be a nice compromise between formal verification and hand-rolled test cases — provided of course that the properties and the instance generators are chosen well.
As always with testing, paranoia and imagination is key. But you get more value for your paranoia when it's used to power random test case generation.

For the problem in question, and assuming an Erlang implementation like KKT's, property testing can be done with relative ease using a tool like Quviq QuickCheck or Proper. Their support for testing state machines comes in handy; I've not tried using it before, but this seems a good occasion.

How to test randomly
Randomized testing involves abstracting over usage scenarios.
How the abstraction is done determines what will be tested.
Knowledge of the problem domain is the primary guide; paranoia together with perhaps knowledge of implementation details should provide additional input.
What must be considered is:
  • What is tested:
    Which API functions to exercise?
    Which invariants to verify?
  • How it is tested:
    Test concurrent use to check for thread safety?
    Test with invalid inputs?
    Should certain special circumstances be simulated - e.g., file I/O errors, disk-full conditions, network delays, packet loss?
  • With what it is tested:
    Test with abnormal (very long/short/high/low) inputs?
    Key collisions?
    Many values for the same key?
    Keys which are nearly identical?
This is of course where the imagination and paranoia enter the picture.
You can't assume that "since we generate the values randomly over a large domain, we exercise all code paths" — exercising e.g. a dictionary with a million randomly generated keys is of little use if the keys never collide.

How to test distributed collections
OK then, how to test an implementation of the distributed collections scheme?
Using the state machine testing support of Proper, we can maintain a "model" collection aside the subject-under-scrutiny collection (shortened to SUS in the following).

What to test
:
This is the easiest part: We will test the usual collection functions: insert, delete, lookup, list keys.
The invariant is that the result is consistent with the same operation performed on the model collection — as defined below.

How to test:
We'll certainly need to test concurrent use. Let's say that there are three simultaneous users of the collection; we know that there are subtleties involving two, and there may be additional ones involving at least three.
The users won't be really simultaneous, though — We will test concurrent use in a way where the concurrency is made explicit. This provides repeatability and insight into why things fail, which is indispensable for this problem domain.
So, instead of having an actual underlying Riak store, we'll mock it up, in such a way that the mock provides just the guarantees we actually expect the real thing to provide.


Model representation
The structure I have in mind is: the mock remembers all versions of all values put into it. This is what the entire simulated multi-cluster store — let's call it the "cloud" — has ever seen.
The mock furthermore has a number of "views" of the store, corresponding to what you would see if you accessed the cloud at different points. It has a number of these; we'll make the assumption that in any of these views, for any given key, the version will only increase with time. I.e. we do not expect the version of any value in any view to evolve backwards. The mock keeps track of the view-to-version mapping for all keys.

One thing that can happen, then, beside operations on collections, is that value versions find their way from one view to another. Also, version merging can happen in transit — the versions being merged are not necessarily the latest value in any view, but may be any versions that have ever existed in the cloud.
For simplicity, let's say that this is modelled with views also — if we allow enough views, this won't reduce the generality.

The model thus consists, not just of a simple collection, but of an abstract model of the storage cloud.
Beside the externally visible events — the calls to the collection API — there are internal events: a given version of a given key finds its way to one view to another. Both kind of events go into the randomly generated test scenario specification.

The test
It was originally my intention to end with putting all of this together in a Proper-based Erlang unit test — it would then be a good occasion for me to get experience with the state-machine modelling support (link: state machine part is  from p.26).
However, it appears that I have a latency-vs.-completeness tradeoff to make, so I'd better stop here, publish what I have, and hope to return to the subject soon, hopefully with some concrete code. (While this posting has been under way, naturally other topics have come up which I'd like to write about; the order in which further postings arrive here is undetermined...)

Tuesday, April 12, 2011

Multi-version collections in Riak

The setting:
We have a distributed, transaction-less key-value store with multi-version awareness (vector clocks as version numbers, basically). More specifically, we have Riak.

This provides us with the ability to store and retrieve blobs by key — and to do decent conflict resolution, up to a point.

That is a great building block, but there are some shortcomings which means that we need to build on.

First, the stored values have (as far as Riak is concerned) no structure.
This means that if we store rows as values, and need to update only a part of a row, we still need to fetch and store the entire row.
There are both potential bandwidth and versioning issues with this: you may have to move more data than strictly needed, and the elements of the row are (at the outset) not individually versioned, which makes conflict resolution difficult.
For the latter problem, a solution has already been devised in the form of Vector Maps.

Second, there is no concept of "collections".
This leaves you with two possibilities when you need to store a collection in Riak: storing the entire collection under one Riak key, or storing it under several Riak keys.
(I'll henceforth use the term "row" for a value stored under one Riak key, both to avoid confusion with the keys of the individual element and because that's the term that has become stuck in my head.)

Storing all of a collection in one row has the above-mentioned drawbacks: it's a waste of resources (bandwidth, CPU, disk I/O) much of the time, and you lose much of the advantages of automatic version conflict resolution because only the version of the entire value is taken into consideration.

Storing the collection in different rows also has its drawbacks, though — most significantly, you lose data locality. To fetch a collection of 100 elements, spread over 100 different keys, 100 separate disk locations may have to be accessed. (Unless perhaps you're so clever that you can trick the hashing function, that is...)

A side note:
Riak has a "link" concept which enable you to link objects associated with different keys together; sadly, it has both the locality issue (which can't be helped) and the versioning issue (which is a pity, this being Riak).
For instance, consider an object which in version [] has one link, "foo"; in version [{a,1}] the foo link is deleted; and in version [{b,1}] another link, "bar", is added. Based on versions [{a,1}] and [{b,1}], with link sets Ø and {foo,bar}, respectively, you can't conclude anything about which links should be present in a merge. (Even if the version [] was accessible to Riak, it would be disregarded entirely.)
So, if you want to take advantage of Riak's multiversioning support, you'd better stay clear of links. Or else use them in a disciplined manner — i.e., only adding, never changing or deleting them.

The concerns
Summing up so far, we have these concerns when storing collections:
  1. The ability to store an arbitrary amount of elements (obviously).
  2. Data locality. — This concern alone would have all elements in one row.
  3. Manageable row size. — This concern alone would have one element in each row.
  4. Efficient element lookup.
    (In the following, the elements are assumed to be associated with some kind of key.)
  5. Easy conflict resolution.
    (E.g., when creating a new row, expect that another Riak node might also decide to create a row at the same time, based on the same decisions; this may influence the way you choose keys. Also, don't move data from one row to another too much.)
(A non-concern, luckily, is hard limits e.g. on row size. The present problem has many features in common with the issue of how to structure database storage, but while DBs have to respect the size of a disk block, we have more freedom. A good thing, too — dealing with concurrency is quite enough.)

The conflict between concerns (2) and (3) is obvious.
One straightforward compromise between the two is "add elements to a row until it reaches a certain size (say, N elements), then create a new row for the next N elements, and so on", but this conflicts with concern (4).

Hash tables
So, we need a growable, block-based collection, in which elements can be looked up efficiently; what are our options?
A hash table as it is usually implemented — using hash value modulo bucket count to get the bucket number — is not so good a choice in light of (5); global redistribution of the data is a thing to be avoided.

Taking inspiration from database systems, however, we might consider the hash table variants used there, secondary-storage hash tables (link?).
These work by using a bitwise prefix of the hash value as the bucket indicator.

The scheme I have in mind differs from both of the variants described in my book on database systems, but the basic idea is the same — after all, the main concern of touching as few block as possible at each access is the same.
I'll sketch my scheme below.

Collection representation using secondary-storage hash table
First, a bit of basics and assumptions:
I'll assume that we have a Riak bucket to our disposal for collection storage; that each collection is associated with an alphanumeric key in that collection.

Each element is versioned individually, for reasons mentioned earlier:
VersionedElement :: {ElementKey, [{VClock, Data}]}
(I'll be using Erlang type specifications.)

After a version merge, multiple versions of the element may be present, with pairwise independent versions (i.e. none of them "happens before" any of the other); this is why there is a list of versioned values rather than a single value.

As mentioned, we use a prefix of the hash values for indexing the auxiliary rows.
The two approaches for extensible secondary-storage hash tables which are described in my database book ("Database Systems: The Complete Book", by Garcia-Molina, Ullman and Widom; interesting reading) are:
  • "Extensible hash table" — in which a hash table's current structure is described by the number i of bits of suffix used for selecting the hash bucket. Drawback: the table extension from i to 2i has to be done all at once.
  • "Linear hash table" — which employs an incremental growing approach, and in which the structure is described by (i,k) where i is as above and k is the current number of buckets, 2i-1 < k <= 2i . If a suffix is below k, it's used as the bucket number; if it is k or larger, then the suffix of length i-1 is used (i.e., clear the MSB of the original suffix). When k is increased, the elements in preexisting bucket j are redistributed between j and k — where j is k with the MSB cleared.
    Quite elegant, really.
In our case, the concerns and constraints are similar but different; for one thing, we can get away with using less compact and simple data structures. More specifically, instead of a couple of numbers we're going to have a list of buckets:
RowPointer :: {BitCount :: integer(), BitSuffix :: integer()}
This allows us to split exactly the hash buckets that need splitting, rather than having to rely on a predetermined splitting order.
(This structuring turns out to be similar to the one wikipedia calls extendible hashing, except for the representation of the bit suffix table)
Consider a collection stored under Key. The collection's main row looks like this:
Key → MainRow
where
MainRow :: {[VersionedElement], [RowPointer]}
and VersionedElement and RowPointer are described above.

For locality, we allow a limited number of elements to be stored directly in the main row. This is an optimization, of course, which ensures that for small collections, one row access is needed rather than two; it can be easily omitted but I see no harm in it.

The RowPointer set must be internally consistent: For each possible bit pattern, there should be at most one RowPointer matching that pattern.

For auxiliary rows, we use the key format
AuxKey = Key#BitCount,BitSuffix
(where '#' is a character guaranteed never to be present in Key. Alternatively, use Key# as key for the main row.)
The auxiliary rows contain just elements:
AuxKey → [VersionedElement]

Consistency and conflict resolution
Let us define criteria for a reliable collection storing scheme, to guide the further design:
  • Firstly, we can determine the key set for a given collection (subject to which revision versions are visible to us).
  •  Secondly, given a collection and a key, we can determine the last version(s) of the associated value.
Part of the first criterion is that deleted elements stay deleted even when other versions of the collection is "versionally visible" where an older version of the element is present.
Part of the second criterion is, strictly speaking, that if a version is visible in which an elements was deleted, then that element is either not present at all in the key set, or the concluded "last version(s)" include a 'deleted' value — i.e., it will not be forgotten that it was once deleted.

Version-Merging Algorithm
When merging two versions of the collection, we have for each row a number of versions of that row available. In Riak we can't tell across rows which versions belong together.

An outline of a merge algorithm is as follows:
  • First, determine the set of keys.
  • Next, for each key,
    • Determine the most recent version(s).

Determining the most recent versions of each element is a known and solved problem; the interesting part is therefore finding all of the relevant versions.

First attempt:
  • Take the union of the RowPointers set from all versions of the main row.
  • For each of these rows, take all element versions.

This doesn't take deletions into account, however.
To handle deletions, I see two options:

1: The tombstone approach. We represent deleted elements by explicit tombstone values (just as it is done in vector maps and, I believe, in Riak itself).

It would be nicer to have deleted elements be removed from the system altogether. Is this possible, within the constraints we've set up?

Consider a merge operation of two independent versions "D" and "P" of a row; a given element "X" is present in "P", but absent in "D". Assume further that the element modification version is later than the last common version ("LVC"). How do we know whether the "X" was present in "LVC", then deleted in "D" and modified in "P" (in which case we must include both options, present and deleted, in the merge result: [XP, tombstone]), or if it was created since the LCV (in which case it must be present, not deleted, in the merge result: [XP])?

           LCV
          / \
delete(X)/   \update(X)
        /     \
       D       P
 (X absent)  (X → XP)

This question leads to:

2: The extra versioning approach. For each element, include both the version of the last modification and the version of its creation. Furthermore, include for each row the version of the last modification to the row (these versions must be comparable to the versions of the individual elements, by the way).

With this extra information, we can distinguish: If the row timestamp of "D" is later than the creation date, then it was deleted (result=[XP, tombstone]); if not, then X was created since (result=[XP]).

Interestingly, the two approaches (1) and (2) are not equivalent, but yield different results in certain cases.
Consider for instance the following history:

                V0
               /:
              / :
create(Y) - A1  V1 - create(X)
               /|
              / |
delete(X) - A2  V2 - delete(X)
               /:
              / :
create(Y) - A3  V3 - create(X)
               /|
              / |
delete(X) - A4  V4 - update(X)


and assume that the input versions available to the merge operation is V4 and one of {A2, A3}.
Then using the tombstone approach, A2 and A3 contains a tombstone value for X, and we must include it in the merge result.
Using the extra versioning approach, on the other hand, we can conclude that the X present in V4 is created at a point not "versionally visible" to either A2 or A3, and thus exists without question after the merger.

(If merging (V4 and A1) or (V4 and A4), the two approaches yield the same result.)

Anyway, I hope to have convinced you that we can resolve conflicts reliably.
Having gotten that out of the way, let's turn to how the data structure is actually manipulated.

Collection Manipulation Algorithms
In the insertion and deletion algorithm, the possibility of write conflicts has been ignored. This is intentional; Riak lets you detect the write conflict, and it can then be repaired — instantly or later, as read repair — using the principles described above.
Note that the organization of the data means that repairs caused by write conflicts are local, i.e. only a few rows need to be repaired, not the entire collection.

Lookup algorithm:
Given a collection key, and an element key, look up the key in the collection. Answer with the versioned element or "not found".
 - Fetch the main row of the collection: {Elements, RowPointers}.
- Is the element among Elements?
  - If yes: There's your answer.
  - If no: Is there a RowPointer matching the key?
    - If no: Conclude "not found".
    - If yes: Fetch the auxiliary row pointed to by RowPointer: Elements2.
      - Is the element in Elements2?
        - If yes: There's your answer.
    - If no: Conclude "not found".

Update algorithm:
Like the lookup algorithm, with the obvious additions. Left as an exercise to the reader.

Insertion algorithm:
Given a collection key, an element key and a value, insert the (key,value) pair in the collection.

 - Perform a Update of the key. If the key was found, we are done; if not, continue.
- Fetch the main row of the collection: {Elements, RowPointers}.
- Is there room for another element in Element, without exceeding the size threshold?
  - If yes: insert the versioned element in the main row.
  - If no: Is there a RowPointer matching the key?
    - If yes: fetch the row pointed to by RowPointer, and add the versioned element.
      - Is the size limit exceeded?
        - If no: Write the auxiliary row back.
    - If yes: Split the auxiliary row into two rows with a BitCount increased by one.
          - Write these rows.
      - Update the main row with the new row pointers.
      - Write an empty row back under the original auxiliary row key. (Optional?)
    - If no:
      - Create a RowPointer matching the key and having as small a BitCount as possible.
      - Insert an auxiliary row for that RowPointer, containing just the new element.
      - Update the main row, adding the RowPointer.


Deletion algorithm:
Given a collection key, and an element key, delete the element with that key from the collection (if it is present).
Done like Update with a 'tombstone' value — if the element is present; if it isn't, nothing should be done.

Conclusion
Riak is a good building block for a distributed storage system, but for certain applications you need some appropriate abstractions on top of the raw key-value store. There are for instance good reasons to put some thought into the organization of collections of values, and ensure that conflicting versions can be dealt with appropriately.
I have presented some of the issues involved and attempted to put together a  usable scheme.
At this point, it's all talk and no code, though — not a bad starting point, but I cannot claim much about the resulting scheme except that, given that this text ended up much longer and with many more asides that I originally envisioned, I must have put some thought into it.

(Update: There is now a follow-up. Still more thinking than code, though -- for my part, at least.)

Oh, and by the way, I hope to be able to find the time to explain why I'm writing about Riak, all of a sudden.