Data Mesh Clustering 1 — Part 11

In this article I will start implementing the cluster part of this series according to the plan outlined earlier. I will talk about the implementation in general and not paste code directly into the article unlike my previous articles. That’s primarily because the codebase has started growing now. But you can checkout the code directly.

The basic idea here is:

  1. When a node starts up it reads certain input values to setup the cluster from a configuration file.
  2. Once the node starts up, it creates a lease from the etcd cluster. This lease is used to trigger leader election, run the election and create heartbeat for group members.
  3. The cluster also uses a few key-prefixes to run different elections, join groups, know which groups have failed members etc.
  4. If the node is not a leader, then it keeps alive its lease, watches the leaders — starts an election if a leader failure is detected and watches the other peers in the group. Right now the members do not do anything with the group membership information though.
  5. If the node is a group leader, then it keeps the lease alive — as a heartbeat and watches the peers. It also advertises for another member in the group if any member fails. More ops might be added later.

If you just want look at a single file to understand these ops, look at the node. However, it is a fairly large-ish file.

This how the config.yaml looks like:

dbname: "RDeeBee"
groups: 10
group_size: 3
reads: 3
writes: 2
id_key: "id_key"
failover_id_key_prefix: "failover_id"

And it directly translates to a config struct.

#[derive(Debug, Deserialize, Clone)]
/// PreConfig holds the config for the cluster that is user defined.
struct PreConfig {
/// Name of the database.
dbname: String,
/// Number of groups in the system.
groups: usize,
/// The number of nodes in each group.
group_size: usize,
/// Number of nodes to be read for each read.
/// This corresponds to the number of readers in the system.
/// However, every leader also has a backup leader.
reads: usize,
/// Number of nodes any write has to be written to before the write can be acknowledged as committed.
writes: usize,
/// If the system or a new group is starting up then the nodes need new IDs.
/// This string to query in order to get a system's ID.
id_key: String,
/// However, the node may also be replacing a failed node.
/// The key prefix that the group leaders will use to advertise failed nodes.
failover_id_key_prefix: String,
}

And it then gets extended into a larger config struct.

#[derive(Debug, Clone)]
/// Config extends the user defined configuration figuring out the details needed to run the cluster.
pub(crate) struct Config {
preconf: PreConfig,
/// List of grouping of groups.
/// A list of lists, where each list correspond to a combination of groups that have a common leader.
groupings: Vec<Vec<usize>>,
/// Map that informs which nodes map to which group.
/// Each node makes an API call at startup to get their ID.
/// This ID is then used to map them to a group.
node_group_map: HashMap<usize, usize>,
/// List of leaders and their corresponding groups.
/// Maps each leader to the group combination it is a leader for.
/// /// The key is the index for the groupings field.
leader_group_map: HashMap<String, usize>,
/// Leader election prefixes corresponding to each group (index of `groupings`).
/// election_prefixes -> keys -> string rep of ServiceNode.
/// election_prefixes -> value -> index of the group in groupings that this leader leads.
/// Since each group has two leaders there will be two election prefixes for each group
/// or combination thereof.
/// The key is the index for the groupings field.
election_prefixes: HashMap<usize, (String, String)>,
/// While election prefixes hold the prefixes used to run the elections,
/// leader keys hold the actual keys that the leaders update with their service node information
/// upon winning an election.
/// The key is the index for the groupings field.
leader_keys: HashMap<usize, (String, String)>,
}

We added a few macros to generate the keys.

#[macro_export]
macro_rules! election_key_prefix_gen {
($dbname:expr, $group:expr, $num:expr) => {
format!("election-{}-group-{}-leader-{}", $dbname, $group, $num)
};
}

#[macro_export]
macro_rules! leader_key_gen {
($dbname:expr, $group:expr, $num:expr) => {
format!("leader-{}-group-{}-{}", $dbname, $group, $num)
};
}

#[macro_export]
macro_rules! group_membership_key_gen {
($dbname:expr, $group:expr) => {
format!("member-{}-group-{}", $dbname, $group)
};
}

#[macro_export]
macro_rules! group_add_lock {
($group_id:expr) => {
format!("group-add-lock-{}", $group_id)
};
}

#[macro_export]
macro_rules! id_key_lock {
() => {
stringify!("id-key-lock")
};
}

The node is defined as:

#[derive(Clone)]
pub struct Node {
/// Client for the etcd cluster.
client: Client,
/// The node name and IP address of this node.
svc_node: ServiceNode,
/// The node ID of this node.
node_id: Option<usize>,
/// The group key where members register themselves to the group.
group_key: Option<String>,
/// The cluster configuration.
config: Config,
/// This node's etcd cluster lease ID.
lease: i64,
/// Time between lease refreshes.
refresh_interval: u64,
/// The type of node this is - leader or only member.
nodetype: NodeType,
/// If the node is a leader then it has to watch its peers.
registry: Arc<RwLock<Option<Registry>>>,
}

The node then becomes a part of the server.

#[derive(Clone)]
pub(crate) struct RDeeBeeServer {
rdeebee: Arc<RwLock<RDeeBee>>,
cluster_node: Arc<RwLock<Node>>,
}

This allows the cluster and the database to run in different threads while being able to communicate.

let rdb_srv = match RDeeBeeServer::new(COMPACTION_SIZE, DEEBEE_FOLDER.to_string()).await {
Ok(rdb_srv) => rdb_srv,
Err(e) => return Err(e),
};
// Start the cluster node
let node = rdb_srv.get_node();
std::thread::spawn(move || {
info!("Starting cluster thread");
let rt = tokio::runtime::Runtime::new().expect("Failed to start server runtime");
let mut node = node.as_ref().borrow_mut().write();
rt.block_on(async move {
node.run_cluster_node().await.unwrap();
})
});

--

--

Ratnadeep Bhattacharya (https://www.rdeebee.com/)

Distributed Systems researcher and engineer (grad student) at The George Washington University!