Distributed Linearizability without Consensus

TL;DR

It is an ambitious project (difficult to test at the scales of Spanner or DynamoDB). My consideration of edge cases is not exhaustive, especially at that scale. But maybe people will pitch in!

It is a multi-leader replication method that, I think, will let me guarantee:

  1. Linearizability
  2. Data Durability
  3. Optimized writes (vs quorum or consensus based approaches)
  4. Massive scale
  5. No sharding issues

There are multiple leaders in the system — a configurable number. The nodes in the system are divided into multiple groups — again a configurable number. Ordering is provided by an external distributed lock.

  1. Each group contains at least one leader. Writes can arrive at any node and is replicated at least to one leader in the group.
  2. Each leader is part of multiple groups allowing number of leaders to be independent from number of groups. Reads are done by reading from each leader and ordering them to get both the sequence and the latest.

I’m currently pondering the distributed part of the distributed streaming database or the data mesh. And my first thought was Raft. And why not! I tried to build it a few years back. I have a basic understanding of the algorithm. And a desire to poke around in it. Also, there are great crates, including the async-raft crate for building raft based systems.

But then I started thinking a little deeper. What do I really need from a database? In my mind, beyond the basics of storing and retrieving data, I want:

  1. Durability
  2. Monotonic reads
  3. Optimize for read and write (the trade-off).

I mean do we really require a consensus amongst all nodes here?

Single leader consensus algorithms quickly become a bottleneck as throughput and/or cluster size increase. Multi leader consensus algorithms are notoriously hard. There are other options like Google’s TrueTime (not available to the general public as far as I can tell) and Huygens, which is also the basis of CRaft — a multi leader version of Raft. But the more I thought about it, the more I convinced myself that consensus was overkill for this use case

But if I give up on consensus, what about problems like write conflicts, failure protection and so on? Well I could use a quorum based approach instead of writing data everywhere. The idea behind a quorum is that you write to w nodes and read from r nodes. If w + r > n, where n is the number of nodes in the cluster then you are guaranteed to get the correct data. But this still does not solve the problems I set out to solve. If there are 10_000 nodes in the cluster then w and r are quite large indeed! The configurability of the tradeoff between read and write speed is limited by n. Plus quorums by themselves, do not really guarantee consistent reads— lookup sloppy quorums and hinted handoff.

What about multi-leader read/writes then?

You can choose the number of leaders for the cluster; read from all the leaders and write to any node and replicate the write to at least one (other) leader. This replication can be synchronous providing guarantees on durability — the write exists on at least two nodes.

The number of leaders in the system defines the number of reads that need to happen — thus the cost of reads. And the replication factor, how many nodes you want to replicate to before providing acknowledgement, defines the number of writes — thus the cost of writes. Of course, one can still have more than this number of writes happening asynchronously — so there can be a synchronous and an asynchronous replication factor.

Synchronous replication factor is the data durability guarantee we provide. Asynchronous replication factor is best efforts.

What about the timestamp though? If we don’t have consensus, how do we order our data? This is a hard problem. The options that seem, to me, like they should work are:

  1. Implement Huygens clock synchronization.
  2. A distributed lock generating a sequence number.

In this case, a Redis cache. The observation is that an increment operation in Redis is equivalent to an atomic compare-and-swap. So any node in a cluster can accept a write, ask the Redis cache for the next sequence number and replicate the write to one of the leaders. When reads come, they read from all the leaders and order the data by the sequence numbers. So we can simply have a Redis cache somewhere that can be queried for the next sequence number, guaranteed to be unique across the cluster. The drawback of this method is that we can’t use it if our cluster grows to spawn more than one data center.

The other hard problem in this case is failover. Imagine our replication factor is 2 and an arbitrary node fails. We have no way of knowing what data it had and has to be replicated onto the new leader. The simplest solution to this is to provide a little logical structuring to our cluster. Let’s divide the cluster into logical groups with a leader in each group like in this diagram.

Grouping inside the cluster

In this example, there are 10 nodes in the cluster divided into 3 overlapping groups — blue, red and green. Node 3 and node 6 are the leaders in the cluster. Node 3 belongs to both the blue and red groups while node 6 belongs to the red and green groups. Any writes to the blue group is replicated to node 3, any writes to the green group is replicated to node 6 and any write to the red group is replicated to both nodes 3 and 6. Nodes 3 and 6 can then asynchronously replicate the writes within the group.

While we could simply have had 3 subgroups with 3 leaders here, there is a slight benefit to intersecting the groups — we can pick the number of groups, which dictate the asynchronous replication factor, and the number of leaders, which dictate the number of reads, independently. So more groups intersecting means less leaders, thus faster reads while not having to replicate to a large number of nodes.

The leaders are the central nodes in the cluster with a high degree of connectivity.

With that background, now is the time to talk about leader election. We can borrow a bit from Raft here:

  1. We borrow the idea of terms from Raft. Every term has a leader and is identified by a number.
  2. The leader broadcasts a heartbeat to all nodes in the group.
  3. Every node has a timer with a random timeout duration, but always greater than the duration, plus some buffer, at which heartbeats are sent by the leader, which is constant for the entire cluster.
  4. The first node whose timer expires without receiving a heartbeat from the leader increments the term ID and broadcasts itself as the leader. The longer the timeout duration (that is the more buffer there is), the longer the nodes will wait for the leader to return which might be of some value in container based environments. In case of a clash, a new term is started.

This is not a simple process but we do need consensus to elect the group leader.

The last piece of the architecture is leader failover. With the above grouping idea, the failover now becomes somewhat simpler. An existing node becomes the leader and the deltas, data present in other nodes of the groups the leader belongs to but not in the leader, is copied over — remember a leader might be a part of multiple groups, so this could be an expensive operation. This is a big downside of the algorithm. As of now, I do not see a way around this and it might be important since we expect a lot of “churn”, nodes leaving and entering the cluster, in container based environments. However, in case of non-leader nodes failing there is no requirement of any data copy. The final question here is whether to still accept writes when a leader is being elected. The naive logic is to suspend writes to the group while the leader is being elected since any new writes will not be available for read. But then all data that resided only in the failed leader is also not available. Though, this situation can be easily avoided by ensuring no group has only a single leader. Every group intersecting more than once also means that we can avoid some of the replication price paid by a leader failover as we only have to replicate from the other leaders in the groups we are a part of instead of all the nodes in all the groups. However, this will increase the number of leaders in the system penalizing reads a little more and increasing the cost of writes as well, since writes have to be replicated to multiple leaders before being acknowledged.

Furthermore, there might be opportunities optimize the replication. In our case, we could devise something like this:

  1. A new leader, L-dest comes online
  2. Identifies another leader, L-source, to copy data from.
  3. Sends a message to the L-source.
  4. Creates the wal and a MemTable and starts processing new writes.
  5. On L-source, upon receiving the signal, save the MemTable to an SSTable.
  6. Compact existing SSTables as far as possible.
  7. Combine the SSTables into a snapshot and send it over to L-dest.
  8. L-dest expands the snapshot and saves the SSTables.

But how does this differ from sharding?

  1. Users do not need to make a choice about a shard-key or any other choices about how the sharding is done. Any write can go to any node.
  2. And uneven data distribution, human errors, adding of new shards do not cause issues with this architecture.

However, reads in this architecture is more expensive than in sharded databases since data needs to be read from all “shards” and then combined in our case.

Thus except for the possibility of a high cost leader failover, this algorithm provides for:

  1. Monotonic reads.
  2. Data durability.
  3. The ability to optimize writes by sacrificing some durability.
  4. Disassociating the read/write costs from the size of the cluster.

Implementation

[1] Multileader Election

[2] Cluster Implementation 1

--

--

Ratnadeep Bhattacharya (https://www.rdeebee.com/)

Distributed Systems researcher and engineer (grad student) at The George Washington University!