Distributed Sequencing

Ordering things like inputs or events is a big deal in distributed systems. It might be somewhat easier if there is a system providing monotonically increasing numbers. This is going to be a relatively short article talking about getting a monotonically increasing value in a distributed system using Redis (Go) and etcd (Rust).

This is a part of my larger series on a data mesh.

The Go and Redis part is pretty straightforward and I will explain that first. This uses the redigo library. We start by defining a struct for convenience.

type Seq struct {
Sequence uint64 `json:"Sequence"`
}

We then define another struct to represent the Redis connection:

func newRedisConn() redisConn {
pool := newPool()
return redisConn{pool}
}

func newPool() *redis.Pool {
return &redis.Pool{
MaxIdle: 80,
MaxActive: 1200,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", "localhost:6379")
if err != nil {
log.Panic(err)
}
return c, nil
},
}
}

Getting a connection from the pool:

func getConnection(redis_pool redis.Pool) redis.Conn {
conn := redis_pool.Get()

if _, err := conn.Do("AUTH", "p@ssw0rd"); err != nil {
log.Panic(err)
}
return conn
}

And using the Redis increment function:

func (r *redisConn) cas(svc string) uint64 {
conn := getConnection(*r.pool)
val, err := redis.Uint64(conn.Do("incr", svc))
if err != nil {
log.Panic(err)
}
return val
}

However, do read Martin Kleppman’s excellent blog on this if you go this route.

Next using etcd and Rust to do the same thing. Now it is not as straightforward with etcd. But I might have figured something out when I was working on my distributed multileader election.

The idea is to use a lease to hold a lock and only fetching from another key when that lock is obtained. Now one can store the actual value into this second key — which is both more expensive and more complicated. An easier workaround is to use the revision from each fetch.

The setup of the program is as follows:

// Environment variables.
let trace_level = match env::var("TRACE_LEVEL")
.expect("Trace level undefined")
.as_str()
{
"TRACE" | "Trace" | "trace" => Level::TRACE,
"INFO" | "Info" | "info" => Level::INFO,
"DEBUG" | "Debug" | "debug" => Level::DEBUG,
"WARN" | "Warn" | "warn" => Level::WARN,
"ERROR" | "Error" | "error" => Level::ERROR,
_ => Level::TRACE,
};

// Set up tracing.
let subscriber = FmtSubscriber::builder()
.with_max_level(trace_level)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");

let mut client = Client::connect(["localhost:30327"], None)
.await
.expect("failed to create client");

let resp = client
.put(COUNTER_KEY, "0", None)
.await
.expect("failed to put value");
info!("Initialized counter: {:#?}", resp);

let lease = client
.lease_grant(LEASE_TTL, None)
.await
.expect("lease failed");
let lease_id = lease.id();

let getoptions = GetOptions::new().with_prefix();
let resp = client
.get(LOCK_KEY_PREFIX, Some(getoptions.clone()))
.await
.expect("failed to get lock key");
info!("Lock key search resp: {:#?}", resp);

The actual work is done here (the idea is very similar to a mutex):

if resp.kvs().len() == 0 {
// Create lock options to hold a lease on the lock.
let lock_options = LockOptions::new().with_lease(lease_id);
// Get the lock with the LOCK_KEY_PREFIX.
let resp = client
.lock(LOCK_KEY_PREFIX.clone(), Some(lock_options))
.await
.expect("failed to get lock");
debug!("Locked key resp: {:#?}", resp);

// Store the actual (unique) lock from the response.
let lock_key = str::from_utf8(resp.key()).expect("failed to get lock key");
let _ = client
.put(COUNTER_KEY, "1", None)
.await
.expect("failed to get counter");

// Query the COUNTER_KEY (other) key.
let resp = client
.get(COUNTER_KEY, None)
.await
.expect("failed to get counter");
// Get the revision number from this call.
info!(
"Counter key revision: {:#?}",
resp.header().unwrap().revision()
);

// Unlock key.
client.unlock(lock_key).await.expect("failed unlock key");
tokio::time::sleep(Duration::from_secs(1)).await;
}

In this version, the revision number skips a few numbers in between. But generally that doesn’t affect simpler ordering requirements.

We can simply skip the unlocking step. This returns revision numbers in order when running a single process but hangs up one when testing with two concurrent processes.

The revision number does not change if we only repeatedly get the lock counter.

Finally, there might be other issues with both systems that might only come forward during further testing, in which case we will have to change and swap the second counter value while holding the lock.

--

--

Ratnadeep Bhattacharya (https://www.rdeebee.com/)

Distributed Systems researcher and engineer (grad student) at The George Washington University!