Tuesday, April 12, 2011

Multi-version collections in Riak

The setting:
We have a distributed, transaction-less key-value store with multi-version awareness (vector clocks as version numbers, basically). More specifically, we have Riak.

This provides us with the ability to store and retrieve blobs by key — and to do decent conflict resolution, up to a point.

That is a great building block, but there are some shortcomings which means that we need to build on.

First, the stored values have (as far as Riak is concerned) no structure.
This means that if we store rows as values, and need to update only a part of a row, we still need to fetch and store the entire row.
There are both potential bandwidth and versioning issues with this: you may have to move more data than strictly needed, and the elements of the row are (at the outset) not individually versioned, which makes conflict resolution difficult.
For the latter problem, a solution has already been devised in the form of Vector Maps.

Second, there is no concept of "collections".
This leaves you with two possibilities when you need to store a collection in Riak: storing the entire collection under one Riak key, or storing it under several Riak keys.
(I'll henceforth use the term "row" for a value stored under one Riak key, both to avoid confusion with the keys of the individual element and because that's the term that has become stuck in my head.)

Storing all of a collection in one row has the above-mentioned drawbacks: it's a waste of resources (bandwidth, CPU, disk I/O) much of the time, and you lose much of the advantages of automatic version conflict resolution because only the version of the entire value is taken into consideration.

Storing the collection in different rows also has its drawbacks, though — most significantly, you lose data locality. To fetch a collection of 100 elements, spread over 100 different keys, 100 separate disk locations may have to be accessed. (Unless perhaps you're so clever that you can trick the hashing function, that is...)

A side note:
Riak has a "link" concept which enable you to link objects associated with different keys together; sadly, it has both the locality issue (which can't be helped) and the versioning issue (which is a pity, this being Riak).
For instance, consider an object which in version [] has one link, "foo"; in version [{a,1}] the foo link is deleted; and in version [{b,1}] another link, "bar", is added. Based on versions [{a,1}] and [{b,1}], with link sets Ø and {foo,bar}, respectively, you can't conclude anything about which links should be present in a merge. (Even if the version [] was accessible to Riak, it would be disregarded entirely.)
So, if you want to take advantage of Riak's multiversioning support, you'd better stay clear of links. Or else use them in a disciplined manner — i.e., only adding, never changing or deleting them.

The concerns
Summing up so far, we have these concerns when storing collections:
  1. The ability to store an arbitrary amount of elements (obviously).
  2. Data locality. — This concern alone would have all elements in one row.
  3. Manageable row size. — This concern alone would have one element in each row.
  4. Efficient element lookup.
    (In the following, the elements are assumed to be associated with some kind of key.)
  5. Easy conflict resolution.
    (E.g., when creating a new row, expect that another Riak node might also decide to create a row at the same time, based on the same decisions; this may influence the way you choose keys. Also, don't move data from one row to another too much.)
(A non-concern, luckily, is hard limits e.g. on row size. The present problem has many features in common with the issue of how to structure database storage, but while DBs have to respect the size of a disk block, we have more freedom. A good thing, too — dealing with concurrency is quite enough.)

The conflict between concerns (2) and (3) is obvious.
One straightforward compromise between the two is "add elements to a row until it reaches a certain size (say, N elements), then create a new row for the next N elements, and so on", but this conflicts with concern (4).

Hash tables
So, we need a growable, block-based collection, in which elements can be looked up efficiently; what are our options?
A hash table as it is usually implemented — using hash value modulo bucket count to get the bucket number — is not so good a choice in light of (5); global redistribution of the data is a thing to be avoided.

Taking inspiration from database systems, however, we might consider the hash table variants used there, secondary-storage hash tables (link?).
These work by using a bitwise prefix of the hash value as the bucket indicator.

The scheme I have in mind differs from both of the variants described in my book on database systems, but the basic idea is the same — after all, the main concern of touching as few block as possible at each access is the same.
I'll sketch my scheme below.

Collection representation using secondary-storage hash table
First, a bit of basics and assumptions:
I'll assume that we have a Riak bucket to our disposal for collection storage; that each collection is associated with an alphanumeric key in that collection.

Each element is versioned individually, for reasons mentioned earlier:
VersionedElement :: {ElementKey, [{VClock, Data}]}
(I'll be using Erlang type specifications.)

After a version merge, multiple versions of the element may be present, with pairwise independent versions (i.e. none of them "happens before" any of the other); this is why there is a list of versioned values rather than a single value.

As mentioned, we use a prefix of the hash values for indexing the auxiliary rows.
The two approaches for extensible secondary-storage hash tables which are described in my database book ("Database Systems: The Complete Book", by Garcia-Molina, Ullman and Widom; interesting reading) are:
  • "Extensible hash table" — in which a hash table's current structure is described by the number i of bits of suffix used for selecting the hash bucket. Drawback: the table extension from i to 2i has to be done all at once.
  • "Linear hash table" — which employs an incremental growing approach, and in which the structure is described by (i,k) where i is as above and k is the current number of buckets, 2i-1 < k <= 2i . If a suffix is below k, it's used as the bucket number; if it is k or larger, then the suffix of length i-1 is used (i.e., clear the MSB of the original suffix). When k is increased, the elements in preexisting bucket j are redistributed between j and k — where j is k with the MSB cleared.
    Quite elegant, really.
In our case, the concerns and constraints are similar but different; for one thing, we can get away with using less compact and simple data structures. More specifically, instead of a couple of numbers we're going to have a list of buckets:
RowPointer :: {BitCount :: integer(), BitSuffix :: integer()}
This allows us to split exactly the hash buckets that need splitting, rather than having to rely on a predetermined splitting order.
(This structuring turns out to be similar to the one wikipedia calls extendible hashing, except for the representation of the bit suffix table)
Consider a collection stored under Key. The collection's main row looks like this:
Key → MainRow
where
MainRow :: {[VersionedElement], [RowPointer]}
and VersionedElement and RowPointer are described above.

For locality, we allow a limited number of elements to be stored directly in the main row. This is an optimization, of course, which ensures that for small collections, one row access is needed rather than two; it can be easily omitted but I see no harm in it.

The RowPointer set must be internally consistent: For each possible bit pattern, there should be at most one RowPointer matching that pattern.

For auxiliary rows, we use the key format
AuxKey = Key#BitCount,BitSuffix
(where '#' is a character guaranteed never to be present in Key. Alternatively, use Key# as key for the main row.)
The auxiliary rows contain just elements:
AuxKey → [VersionedElement]

Consistency and conflict resolution
Let us define criteria for a reliable collection storing scheme, to guide the further design:
  • Firstly, we can determine the key set for a given collection (subject to which revision versions are visible to us).
  •  Secondly, given a collection and a key, we can determine the last version(s) of the associated value.
Part of the first criterion is that deleted elements stay deleted even when other versions of the collection is "versionally visible" where an older version of the element is present.
Part of the second criterion is, strictly speaking, that if a version is visible in which an elements was deleted, then that element is either not present at all in the key set, or the concluded "last version(s)" include a 'deleted' value — i.e., it will not be forgotten that it was once deleted.

Version-Merging Algorithm
When merging two versions of the collection, we have for each row a number of versions of that row available. In Riak we can't tell across rows which versions belong together.

An outline of a merge algorithm is as follows:
  • First, determine the set of keys.
  • Next, for each key,
    • Determine the most recent version(s).

Determining the most recent versions of each element is a known and solved problem; the interesting part is therefore finding all of the relevant versions.

First attempt:
  • Take the union of the RowPointers set from all versions of the main row.
  • For each of these rows, take all element versions.

This doesn't take deletions into account, however.
To handle deletions, I see two options:

1: The tombstone approach. We represent deleted elements by explicit tombstone values (just as it is done in vector maps and, I believe, in Riak itself).

It would be nicer to have deleted elements be removed from the system altogether. Is this possible, within the constraints we've set up?

Consider a merge operation of two independent versions "D" and "P" of a row; a given element "X" is present in "P", but absent in "D". Assume further that the element modification version is later than the last common version ("LVC"). How do we know whether the "X" was present in "LVC", then deleted in "D" and modified in "P" (in which case we must include both options, present and deleted, in the merge result: [XP, tombstone]), or if it was created since the LCV (in which case it must be present, not deleted, in the merge result: [XP])?

           LCV
          / \
delete(X)/   \update(X)
        /     \
       D       P
 (X absent)  (X → XP)

This question leads to:

2: The extra versioning approach. For each element, include both the version of the last modification and the version of its creation. Furthermore, include for each row the version of the last modification to the row (these versions must be comparable to the versions of the individual elements, by the way).

With this extra information, we can distinguish: If the row timestamp of "D" is later than the creation date, then it was deleted (result=[XP, tombstone]); if not, then X was created since (result=[XP]).

Interestingly, the two approaches (1) and (2) are not equivalent, but yield different results in certain cases.
Consider for instance the following history:

                V0
               /:
              / :
create(Y) - A1  V1 - create(X)
               /|
              / |
delete(X) - A2  V2 - delete(X)
               /:
              / :
create(Y) - A3  V3 - create(X)
               /|
              / |
delete(X) - A4  V4 - update(X)


and assume that the input versions available to the merge operation is V4 and one of {A2, A3}.
Then using the tombstone approach, A2 and A3 contains a tombstone value for X, and we must include it in the merge result.
Using the extra versioning approach, on the other hand, we can conclude that the X present in V4 is created at a point not "versionally visible" to either A2 or A3, and thus exists without question after the merger.

(If merging (V4 and A1) or (V4 and A4), the two approaches yield the same result.)

Anyway, I hope to have convinced you that we can resolve conflicts reliably.
Having gotten that out of the way, let's turn to how the data structure is actually manipulated.

Collection Manipulation Algorithms
In the insertion and deletion algorithm, the possibility of write conflicts has been ignored. This is intentional; Riak lets you detect the write conflict, and it can then be repaired — instantly or later, as read repair — using the principles described above.
Note that the organization of the data means that repairs caused by write conflicts are local, i.e. only a few rows need to be repaired, not the entire collection.

Lookup algorithm:
Given a collection key, and an element key, look up the key in the collection. Answer with the versioned element or "not found".
 - Fetch the main row of the collection: {Elements, RowPointers}.
- Is the element among Elements?
  - If yes: There's your answer.
  - If no: Is there a RowPointer matching the key?
    - If no: Conclude "not found".
    - If yes: Fetch the auxiliary row pointed to by RowPointer: Elements2.
      - Is the element in Elements2?
        - If yes: There's your answer.
    - If no: Conclude "not found".

Update algorithm:
Like the lookup algorithm, with the obvious additions. Left as an exercise to the reader.

Insertion algorithm:
Given a collection key, an element key and a value, insert the (key,value) pair in the collection.

 - Perform a Update of the key. If the key was found, we are done; if not, continue.
- Fetch the main row of the collection: {Elements, RowPointers}.
- Is there room for another element in Element, without exceeding the size threshold?
  - If yes: insert the versioned element in the main row.
  - If no: Is there a RowPointer matching the key?
    - If yes: fetch the row pointed to by RowPointer, and add the versioned element.
      - Is the size limit exceeded?
        - If no: Write the auxiliary row back.
    - If yes: Split the auxiliary row into two rows with a BitCount increased by one.
          - Write these rows.
      - Update the main row with the new row pointers.
      - Write an empty row back under the original auxiliary row key. (Optional?)
    - If no:
      - Create a RowPointer matching the key and having as small a BitCount as possible.
      - Insert an auxiliary row for that RowPointer, containing just the new element.
      - Update the main row, adding the RowPointer.


Deletion algorithm:
Given a collection key, and an element key, delete the element with that key from the collection (if it is present).
Done like Update with a 'tombstone' value — if the element is present; if it isn't, nothing should be done.

Conclusion
Riak is a good building block for a distributed storage system, but for certain applications you need some appropriate abstractions on top of the raw key-value store. There are for instance good reasons to put some thought into the organization of collections of values, and ensure that conflicting versions can be dealt with appropriately.
I have presented some of the issues involved and attempted to put together a  usable scheme.
At this point, it's all talk and no code, though — not a bad starting point, but I cannot claim much about the resulting scheme except that, given that this text ended up much longer and with many more asides that I originally envisioned, I must have put some thought into it.

(Update: There is now a follow-up. Still more thinking than code, though -- for my part, at least.)

Oh, and by the way, I hope to be able to find the time to explain why I'm writing about Riak, all of a sudden.

No comments:

Post a Comment