High contention allocator
by ananthakumaran

FoundationDB is a distributed key-value store. The whole database can be considered as a giant sorted map where keys and values are bytes. FoundationDB client libraries provide two means for handling namespaces.

Subspace - Raw bytes like users:friends or users:profile is prefixed to keys before they get stored and stripped from the keys when they are retrieved.

alias FDB.Coder.{Subspace, Integer, ByteString}
alias FDB.Transaction

coder =
  Transaction.Coder.new(
    # key encoding
    Subspace.new("users:friends", Integer.new()),
    # value encoding
    ByteString.new()
  )
user = Transaction.get(tr, 42, %{coder: coder})

The key is encoded as users:friends\x21\x42. The primary disadvantage is the size of prefix, which increases as the level of nesting increases.

Directory - The size problem is solved by converting the namespace to a short unique prefix. There is an extra cost involved when a directory is created or opened compared to the subspace, but this will get amortized over time if the number of directories used is small enough compared to other requests.

alias FDB.Directory

root = Directory.new()
coder =
  Transaction.Coder.new(
    Subspace.new(
      Directory.create_or_open(root, tr, ["users", "friends"]),
      Integer.new()
    ),
    ByteString.new()
  )
user = Transaction.get(tr, 42, %{coder: coder})

The key will get encoded something like \x21\x8\x21\x42. Internally a mapping from ["users", "friends"] to the prefix \x21\x8 is stored. This has the added advantage that renaming the namespace for example from ["users", "friends"] to ["v1", "users", "friends"] could be done without changing all the keys. Only the internal mapping has to be updated.

This blog post attempts to explain the algorithm used to create short unique prefixes in the FoundationDB client libraries. To understand the issues involved, a naive implementation is presented first and then its performance characteristics are contrasted with the algorithm used in the client libraries.

defmodule NaiveHCA do
  alias FDB.{Database, Transaction}

  def allocate(transaction) do
    <<counter::integer-64>> = Transaction.get(t, "counter") || <<0::integer-64>>
    counter = counter + 1
    :ok = Transaction.set(t, "counter", <<counter::integer-64>>)
    encoded = :binary.encode_unsigned(counter)
    <<byte_size(encoded)>> <> encoded
  end
end

A single 64 bit counter is used to store the state and numbers are encoded with minimum required bytes – without leading zeros. To avoid prefix clash between numbers of different byte sizes like 0x05 and 0x0501, the byte size is also added as a prefix.

Why does the allocations per second seem to max out at about 300?. FoundationDB uses multi-version concurrency control (MVCC) for reads and optimistic concurrency for writes. As a result, neither reads nor writes are blocked by other readers or writers. Instead, conflicting transactions will fail at commit time and should be retried by the client. The common idiom is to use Database.transact which will retry until the commit succeeds.

FDB.Database.transact(database, fn t ->
  NaiveHCA.allocate(t)
end)

In the naive implementation, the number of conflicts increases as the concurrency increases. The major focus of the algorithm is to reduce the number of conflicts.

Instead of using a single counter a window is used. Each process tries to acquire a random value within the current window. Once the current window is half filled, the window is advanced. The number of conflicting transactions will be small if the window size is big enough compared to the number of concurrent processes.

defmodule HCA do
  alias FDB.Coder.{Tuple, Integer, Subspace}
  alias FDB.Transaction
  alias FDB.KeySelectorRange
  alias FDB.KeyRange
  alias FDB.Option

  @counter 0
  @reserve 1

  def allocate(t) do
    hca_coder =
      Transaction.Coder.new(
        Subspace.new("hca", Tuple.new({Integer.new(), Integer.new()})),
        LittleEndianInteger.new(64)
      )

    t = Transaction.set_coder(t, hca_coder)
    prefix = search(t)
    encoded = :binary.encode_unsigned(prefix)
    <<byte_size(encoded)>> <> encoded
  end

How the keys and values should be encoded is defined using the hca_coder. The actual work is delegated to the search function.

  defp search(t) do
    candidate_range = range(t, current_start(t), false)
    search_candidate(t, candidate_range)
  end

The search function calculates the current window and calls search_candidate to find a candidate within the window.

  defp current_start(t) do
    result =
      Transaction.get_range(t, KeySelectorRange.starts_with({@counter}), %{
        limit: 1,
        reverse: true,
        snapshot: true
      })
      |> Enum.to_list()

    case result do
      [] -> 0
      [{{@counter, start}, _}] -> start
    end
  end

The current window’s start and the number of values so far allocated within that window are stored in the following format.

{@counter, start} => allocated_count

This function returns the latest window start value

  defp range(t, start, window_advanced) do
    if window_advanced do
      clear_previous_window(t, start)
    end

    Transaction.atomic_op(t, {@counter, start}, Option.mutation_type_add(), 1)
    count = Transaction.get(t, {@counter, start}, %{snapshot: true}) || 0
    window = window_size(start)

    if count * 2 < window do
      start..(start + window - 1)
    else
      range(t, start + window, true)
    end
  end

  defp window_size(start) do
    cond do
      start < 255 -> 64
      start < 65535 -> 1024
      true -> 8192
    end
  end

range is responsible for calculating the current window range. The allocated count is incremented beforehand and the window is advanced if half filled.

Conflict range

By default any read or write inside a transaction creates a read or write conflict range for the corresponding key range. When the transaction is committed these conflict ranges are used to make sure transactions are serializable.

If every process reads and mutates the same key, then we are back to the same problem as the naive implementation. FoundationDB provides means to disable the generation of conflicts, but the responsibility is on the user to make sure it doesn’t compromise the consistency. Here snapshot reads and atomic updates are used to avoid the creation of unnecessary conflict ranges. More on conflicts will be explained later.

  defp search_candidate(t, search_range) do
    candidate = Enum.random(search_range)
    candidate_value = Transaction.get(t, {@reserve, candidate})
    Transaction.set_option(t, Option.transaction_option_next_write_no_write_conflict_range())
    Transaction.set(t, {@reserve, candidate}, 1)

    if is_nil(candidate_value) do
      Transaction.add_conflict_key(
        t,
        {@reserve, candidate},
        Option.conflict_range_type_write()
      )

      candidate
    else
      search_candidate(t, search_range)
    end
  end

search_candidate repeatedly tries to reserve a random value within the given window. The write conflict is only added if the value is not already reserved by some other process.

  defp clear_previous_window(t, start) do
    Transaction.clear_range(
      t,
      KeyRange.range({@counter}, {@counter, start}, %{begin_key_prefix: :first})
    )

    Transaction.set_option(t, Option.transaction_option_next_write_no_write_conflict_range())

    Transaction.clear_range(
      t,
      KeyRange.range({@reserve}, {@reserve, start}, %{begin_key_prefix: :first})
    )
  end
end

There are two main aspects for any algorithm: correctness and performance

Correctness

What is correctness? From the algorithm perspective same prefix value shouldn’t get allocated to two different processes. In a typical scenario the atomicity and serializable isolation provided by the database will make the job easy. We can assume that operations of different transactions won’t get interleaved. But as we mentioned earlier, to reduce the number of conflicts we are explicitly instructing the database to not consider the conflicts produced by some of the reads and writes. So the burden is now on us to make sure we haven’t compromised the correctness for performance.

Instead of proving the correctness in a rigorous way, I am going to reason in a semi-formal way.

At any point in time, the algorithm would be in two state

  1. There are enough unfilled values within in the current window. All the processes would be able to allocate without advancing the window. As the conflict range is set on the reserved candidate, only one of the processes would succeed.

  2. There are not enough unfilled values within in the current window. One or more processes might try to advance the current window.

Let’s think of a scenario which might break the correctness

Process 1 Process 2 Process 3
init init  
allocate x (window 1)    
commit    
    init
    clear window 1
    allocate y (window 2)
    commit
  allocate x (window 1)  
  commit  

Process 1 and 2 both starts at the same time, and x is allocated to process 1. Process 3 finds the window half filled and advances the window by clearing the current one. Process 2 tries to allocate the same value x as Process 1.

Let’s look at the exact call sequence before trying to reason why it would not work. FoundationDB client has options to trace transactions. An edited version of the xml trace file is shown below.

{OP: 'GetVersion', ID: 'process 1'}
{OP: 'GetVersion', ID: 'process 2'}
{OP: 'GetRange', ID: 'process 1', RangeSizeBytes: '0', StartKey: 'hcax14x00', EndKey: 'hcax14xffx00'}
{OP: 'GetRange', ID: 'process 2', RangeSizeBytes: '0', StartKey: 'hcax14x00', EndKey: 'hcax14xffx00'}
{OP: 'Get', ID: 'process 1', ValueSizeBytes: '0', Key: 'hcax15x01x14'}
{OP: 'Commit_ReadConflictRange', ID: 'process 1', Begin: 'hcax15x01x14', End: 'hcax15x01x14x00'}
{OP: 'Commit_WriteConflictRange', ID: 'process 1', Begin: 'hcax14x14', End: 'hcax14x14x00'}
>>>>>>{OP: 'Commit_WriteConflictRange', ID: 'process 1', Begin: 'hcax15x01x14', End: 'hcax15x01x14x00'}
{OP: 'Commit_Mutation', ID: 'process 1', Mutation: 'code: AddValue param1: hcax14x14 param2: x01x00x00x00x00x00x00x00'}
{OP: 'Commit_Mutation', ID: 'process 1', Mutation: 'code: SetValue param1: hcax15x01x14 param2: x01x00x00x00x00x00x00x00'}
{OP: 'Commit', ID: 'process 1', NumMutations: '2'}
{OP: 'GetVersion', ID: 'process 3'}
{OP: 'GetRange', ID: 'process 3', RangeSizeBytes: '13', StartKey: 'hcax14x00', EndKey: 'hcax14xffx00'}
{OP: 'Get', ID: 'process 3', ValueSizeBytes: '0', Key: '[email protected]'}
{OP: 'Commit_ReadConflictRange', ID: 'process 3', Begin: '[email protected]', End: '[email protected]'}
{OP: 'Commit_WriteConflictRange', ID: 'process 3', Begin: 'hcax14x14', End: 'hcax14x14x00'}
{OP: 'Commit_WriteConflictRange', ID: 'process 3', Begin: '[email protected]', End: '[email protected]'}
{OP: 'Commit_WriteConflictRange', ID: 'process 3', Begin: '[email protected]', End: '[email protected]'}
{OP: 'Commit_Mutation', ID: 'process 3', Mutation: 'code: ClearRange param1: hcax14x00 param2: [email protected]'}
{OP: 'Commit_Mutation', ID: 'process 3', Mutation: 'code: ClearRange param1: hcax15x01x00 param2: [email protected]'}
{OP: 'Commit_Mutation', ID: 'process 3', Mutation: 'code: AddValue param1: [email protected] param2: x01x00x00x00x00x00x00x00'}
{OP: 'Commit_Mutation', ID: 'process 3', Mutation: 'code: SetValue param1: [email protected] param2: x01x00x00x00x00x00x00x00'}
{OP: 'Commit', ID: 'process 3', NumMutations: '4'}
{OP: 'Get', ID: 'process 2', ValueSizeBytes: '0', Key: 'hcax15x01x14'}
>>>>>>{OP: 'CommitError_ReadConflictRange', ID: 'process 2', Begin: 'hcax15x01x14', End: 'hcax15x01x14x00'}
{OP: 'CommitError_WriteConflictRange', ID: 'process 2', Begin: 'hcax14x14', End: 'hcax14x14x00'}
{OP: 'CommitError_WriteConflictRange', ID: 'process 2', Begin: 'hcax15x01x14', End: 'hcax15x01x14x00'}
{OP: 'CommitError_Mutation', ID: 'process 2', Mutation: 'code: AddValue param1: hcax14x14 param2: x01x00x00x00x00x00x00x00'}
{OP: 'CommitError_Mutation', ID: 'process 2', Mutation: 'code: SetValue param1: hcax15x01x14 param2: x01x00x00x00x00x00x00x00'}
{OP: 'CommitError', ID: 'process 2', Err: 'Transaction not committed due to conflict with another transaction'}

FoundationDB uses multi version concurrency control for reads. Internally it stores multiple versions of the same key – for a short period of time. When a transaction starts, the first read will get the current version of the database as the read version of the transaction. This is indicated by the {OP: 'GetVersion'} in the trace. Any further reads from the transaction will not observe any writes that happens after that.

timeline

A transaction conflicts if it reads a key that has been written between the transaction’s read version and commit version. Because Process 1 has committed before Process 2 is commited, Process 2’s read on {@reserve, candidate} key conflicts with Process 1’s write on the same key. The commit by Process 2 would get cancelled and it should retry the whole process again. This is indicated by the last entry {OP: 'CommitError'... in the trace log. Once a transaction gets the read version, it can’t observe any other concurrent writes by other processes. If Process 2 has started after Process 1 had committed, then the read on {@reserve, candidate} would return non nil even if Process 3 has cleared the key range concurrently.

This effectively means concurrent clear won’t cause problem.

  1. The transactions that have started before the commit of Process 3 would try to allocate within the last window. They will not observe the clear range operation.

  2. The transactions that have started after the commit of Process 3 would try to allocate within the new window

Multiple processes trying to advance the window concurrently is not a problem from correctness perspective. They would still conflict with each other if they try to allocate the same candidate. Because of the serlizable isolation, we can assume there would be a total order across all commits, ie commit won’t happen concurrently.

Performance

From the graphs it’s clear that the high contention allocator algorithm scales linearly without any problem.

⇐ Sort order preserving serialization   |   Introducing exq-scheduler, a distributed scheduler for Sidekiq ⇒

About ActiveSphere

We help businesses build and scale complex web-apps, visualizations and high-throughput messaging systems. Check out some of our other open source projects or our work portfolio.