Contents

Implementing Practical Byzantine Fault Tolerance - Part 3

Part 3: Protocol Implementation

This is the final part of my brief description of pBFT protocol implementation. The first one gave a general overview of the protocol, and the second one covered a high-level view of the project and its different parts. In this one, we are going to take a very quick tour of the normal case implementation and look more closely at view changes.

Normal Case Operations

I have covered more important parts of normal case operations in part one together with a handy diagram featured in the paper, and the code for the majority of that is fairly simple. There are still some gotchas to be careful, however.

In the previous part, I introduced PbftState where all consensus-related data lives, and PbftExecutor which implements the logic. For the normal case operations besides things like checking replica state, validation, and making sure sequence falls between watermarks, we simply add messages to the log and act when we collect 2f + 1 of them.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Excerpt from fn process_prepare

// Since we are not guaranteed to receive pre-prepare prior to receiving first prepare messages,
// we might need to create a new entry in the log based on the prepare message.
let entry = log
    .entry(idx)
    .or_insert(RequestConsensusState::new(&message_meta));

// If we received message for a given view and sequence number with different digest,
// we reject it.
if message_meta.digest != entry.digest {
    return Err(Error::PrepareForViewAndSequenceDoesNotMatchDigest {
        view: message_meta.view,
        sequence: message_meta.sequence,
        expected: entry.digest.clone(),
        actual: message_meta.digest,
    });
}

// If we already have Prepare message from this replica, we do not add it again.
if !entry
    .prepare
    .iter()
    .any(|m| m.replica_id == prepare.replica_id)
{
    entry.prepare.push(prepare.clone());
}

// Check if replica is prepared
if entry.is_prepared(self.config.node_config.nodes.len())
    && state.message_store.has_message(message_meta.sequence)
{
    // Broadcast Commit if not done already...
}

Since different messages may arrive at different times, we need to make sure to not rely on the order. For example Prepare message from some replicas may arrive before PrePrepare from the leader, or Prepare for sequence n + 1 may arrive before the one for n.

When the replica finally collects 2f + 1 Commit messages, it is ready to apply the operation. However, this time the order matters. Since the whole protocol is about agreeing on message ordering in a distributed system, we need to make sure that it is preserved when modifying the state. So if the message n was not yet applied, we cannot apply n + 1 even if we have a sufficient number of Commits.

NOTE: In case it is not clear – applying messages in different order on different replicas may lead to a different state. Say message n sets fluffiness: 10 and message n + 1 sets fluffiness: 0. If one replica preserves the order, but the other applies n + 1 prior to n they will no longer return the same fluffiness (which as you might imagine is a pretty serious problem).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
fn apply_messages(
    &self,
    state: &mut PbftState,
) -> Result<(Vec<SignedCheckpoint>, Vec<ClientResponse>)> {
    // Start from state.last_applied
    let last_applied = &mut state.last_applied_seq;

    // Because we are using a BTreeMap, we can iterate over it in order of
    // sequence number.
    let start = state
        .consensus_log
        .iter()
        .position(|(idx, _)| idx.sequence > *last_applied);
    if start.is_none() {
        return Ok((vec![], vec![]));
    }
    ...
    for (idx, entry) in state.consensus_log.iter().skip(start.unwrap()) {
        // Entry is not committed locally, we should not apply it, and we
        // cannot proceed further.
        if !entry.is_committed_local(self.config.node_config.nodes.len()) {
            // It is possible to have entries with the same sequence in
            // different views due to the View Change protocol, this may
            // result in entry never being committed, hence we cannot simply
            // break the loop here.
            continue;
        }
        // In case we have applied a message with the same sequence, we skip
        // it. This can happen due to view change -- having a consensus 
        // entry for the same message (sequence) in different views.
        if idx.sequence == *last_applied {
            continue;
        }
        if idx.sequence > *last_applied + 1 {
            break;
        }
        
        // Apply messages...
    }

The other thing to be careful with is that there might be log entries with the same sequence but different a view number (if the view change occurred).

If all those things add up, however, we can at last apply the operation to the state:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
...
// Take a write lock
let mut state_machine = self.state_machine.write().unwrap();
...

// Make sure that the digest matches - this should always
// be the case, hence we simply assert.
assert!(entry.digest == store_msg.digest());

let result = state_machine.apply_operation(store_msg.operation());
*last_applied += 1;
assert!(*last_applied == idx.sequence);

// Stop View Change timer if this message started it
if let Some(timer) = &state.timer {
    // The message that started the timer was applied,
    // so we can stop the timer.
    if timer.trigger_digest == store_msg.digest() {
        self.reset_timer(&mut state.timer);
    }
}

store_msg.set_opreation_result(result.clone());
...

The process of applying the operation is finished by storing the result (so that it can be returned in case of request retransmission), and sending the response to clients (KV Service nodes at /api/v1/client/response)

Checkpoints

A concept related to applying messages is checkpoints. Since they are crucial for view change protocol that we are inevitably getting closer to, let me share a sentence or two about them.

NOTE: See section “4.3 Garbage Collection” in the paper.

All replicas are going to periodically take the checkpoint of their state and share the prof with others. As the paper section title suggests this is useful for reducing the number of stored protocol messages. Additionally, they play an important role in view change protocol, as a stable boundary from which the new view can begin.

Checkpoints are taken every set amount of applied messages. For this implementation, it is going to be 10 by default. Since everything lives in memory the task is exponentially easier than in a real system as we do not need to copy any bytes on a disk (and since we also do not really worry about load, memory usage bla bla bla…).

The state machine checkpoint method is very naive. It serializes state to JSON and bluntly returns it so that we can easily produce a digest based on that.

1
2
3
4
5
fn checkpoint(&self, _sequence: u64) -> Result<String, Error> {
    let checkpoint =
        serde_json::to_string(&self.store).map_err(Error::CheckpointCreateError)?;
    Ok(checkpoint)
}

Using a BTreeMap to store the state instead of HashMap ensures that after serialization keys are ordered, therefore given the same state each replica is going to produce the same digest, which might have not been the case with HashMap.

NOTE: For a real system, as suggested in the paper, some copy-on-write mechanisms could be used to not duplicate the whole data on disk, which would be both space and compute-heavy.

Even then taking a checkpoint could be a quite costly operation (since we need to compute the digest of the whole state) so making them rare makes a lot of sense. However, it can result in a lot more messages being passed around during the view change so it is a balance to be found on per system basis.

Now that the checkpoint is created, the replica stores it, calculates the digest and broadcasts the Checkpoint message to other nodes:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// Part of:  fn apply_messages(...) ...
if *last_applied % self.config.checkpoint_frequency == 0 {
    let checkpoint = state_machine.checkpoint(idx.sequence)?;
    let digest = md5::compute(checkpoint.as_bytes());

    let checkpoint_digest = CheckpointDigest(digest.0);
    state.checkpoints.insert(idx.sequence, checkpoint.clone());
    state
        .checkpoint_digests
        .insert(idx.sequence, checkpoint_digest.clone());

    // Checkpoints are later broadcasted
    checkpoints.push(
        Checkpoint {
            replica_id: self.config.node_config.self_id,
            sequence: idx.sequence,
            digest: checkpoint_digest,
        }
        .sign(&self.keypair)?,
    )
}

You might have already spotted the pattern that more fun begins whenever the replica has 2f + 1 protocol messages for the same sequence, and in this case also with matching Checkpoint digest. Such a checkpoint in the eyes (or whatever else) of the replica becomes stable.

A stable checkpoint essentially means that at least f + 1 honest replicas saved the state up to that point. They therefore no longer need all the other protocol messages used to arrive at the consensus, which can now be discarded, this way freeing some space. At this point, we also shift watermarks setting the low one to a stable checkpoint sequence:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
fn process_checkpoint_message(&self, state: &mut PbftState, checkpoint: SignedCheckpoint) {
...
    if entry.is_stable(self.config.node_config.nodes.len()) {
        debug!(
            seq = checkpoint.sequence,
            "checkpoint reached -- updating watermarks and discarding messages"
        );
        state.set_watermarks(checkpoint.sequence);
        self.discard_protocol_messages(state, checkpoint.sequence);
    }
}

With the state nicely checkpointed, we move to the last part of the protocol…

View Changes

Perhaps the most complicated fun part of the protocol is view changes. They are also quite important as they are necessary for good old fault tolerance – if the leader goes down, someone needs to take over.

In the paper author’s words: “The view-change protocol provides liveness by allowing the system to make progress when the primary fails”. See section “4.4 View Change” for details.

The whole thing starts when the client request reaches the backup replica. Replica being a team player forwards requests to the leader first, but it does have limited patience. Regardless if the request reaches the leader or not, the backup already started the countdown.

If the particular request ends up being applied on time, the replica stops the timer and forgets the leader’s misstep.

When the time runs out, however, the backup flips to the ViewChange state, stops accepting messages other than Checkpoint, ViewChange, and NewView, and broadcasts its own ViewChange message for view v + 1.

So far so good, now let’s look at the contents of the ViewChange message.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
pub struct ViewChange {
    pub replica_id: NodeId,
    pub view: u64,
    // Last stable checkpoint for given replica. It is an Option in case we do
    // not have any checkpoints yet.
    pub last_stable_checkpoint: Option<ViewChangeCheckpoint>,
    // Proof for each prepared message (by sequence), containing at least 2f+1
    // Prepare messages from different replicas for a given message.
    // Each proof contains the pre-prepare message and the prepare messages by
    // public key of different replicas.
    pub prepared_proofs: HashMap<u64, PreparedProof>,
}

It quite obviously needs to contain the view number to which we propose the change, replica ID to introduce itself, and a bunch of other less straightforward stuff…

As a part of the View Change message, the replica needs to inform the leader of its current log state and provide proof that it is not fabricated. This boils down to two elements:

  • last_stable_checkpoint (if any) - is simply the most recent checkpoint that the replica considers stable as per rules described in the previous section. Together with it, the replica needs to deliver proof, that it actually is stable, and it does so by sending Checkpoint messages it used to conclude the stability (2f + 1 valid Checkpoint messages from different replicas):
    1
    2
    3
    4
    5
    6
    
    pub struct ViewChangeCheckpoint {
        pub sequence: u64,
        pub digest: CheckpointDigest,
        // Map public key to signed checkpoint message by the given replica.
        pub checkpoint_proofs: HashMap<String, SignedCheckpoint>,
    }
    
  • prepared_proofs - is a set of proofs for each sequence number that prepared after the last stable checkpoint sequence – meaning the messages for which the replica received 2f + 1 valid Prepare messages with sequence higher than the one of last_stable_checkpoint. Such proof for every single message consists of PrePrepare and at least 2f + 1 Prepares (again, signed ones, from different replicas) – which essentially is what made the replica conclude that the message is prepared:
    1
    2
    3
    4
    
    pub struct PreparedProof {
        pub pre_prepare: SignedPrePrepare,
        pub prepares: HashMap<String, SignedPrepare>,
    }
    

While theoretically only the leader needs this information, the message is broadcasted to everyone, so that during the next step, when the leader is ready to take over and sends NewView to backups, they can all verify he is legit.

Proofs and signatures

Let’s take a quick detour to discuss the signatures. You might have noticed that in several places instead of ex. Checkpoint or Prepare we use SignedCheckpoint or SignedPrepare.

Those are precisely what allows replicas to prove and verify that it has legit messages from other replicas (e.g. for the ViewChange message), and not just fabricated ones, as each replica is going to sign protocol messages with its private key.

To implement that we use a SignedMessage wrapper type:

1
2
3
4
5
pub struct SignedMessage<T> {
    pub message: T,
    pub signature: Vec<u8>,
    pub pub_key: [u8; 32],
}

and simply sign the JSON representation of the message:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
impl<T: Serialize> SignedMessage<T> {
    pub fn new(message: T, keypair: &ed25519_dalek::Keypair) -> Result<Self> {
        let serialized = serde_json::to_string(&message).map_err(
            crate::error::Error::serde_json_error("failed to serialize message"),
        )?;
        let signature = keypair.sign(serialized.as_bytes()).to_bytes().to_vec();
        Ok(Self {
            message,
            signature,
            pub_key: keypair.public.to_bytes(),
        })
    }

    pub fn verify(&self) -> Result<bool> {
        let serialized = serde_json::to_string(&self.message).map_err(
            crate::error::Error::serde_json_error("failed to serialize message"),
        )?;

        let pub_key = ed25519_dalek::PublicKey::from_bytes(&self.pub_key).map_err(
            crate::error::Error::ed25519_error("failed to parse public key from bytes"),
        )?;

        let signature = &ed25519_dalek::Signature::from_bytes(&self.signature).map_err(
            crate::error::Error::ed25519_error("failed to parse signature from bytes"),
        )?;

        Ok(pub_key.verify(serialized.as_bytes(), signature).is_ok())
    }

    pub fn pub_key_hex(&self) -> String {
        hex::encode(self.pub_key)
    }
}

Which can later be verified with pub_key. It also allows the node to make sure that the public key matches the replica ID in the message.

NOTE: While this may be a bit redundant given that replicas sign the whole request payload before sending it out, I have chosen to add additional signatures to individual protocol messages on top to make it easier to verify proofs without a need for pushing the whole request payload through different layers, storing it, decoding for verification, and sending together with ViewChange or NewView messages to other replicas.

New View

When the frustrated client broadcasted its request to all replicas, and all frustrated replicas got tired of waiting for the message to be applied and send out ViewChanges to each other, the new leader is ready to emerge.

As we established earlier (in part one), the leader is determined based on a view number, hence if we are transitioning from view v to v + 1, the appropriate replica will know it is its turn to take over. It can do so – not surprisingly – when it receives 2f + 1 valid ViewChange messages from different replicas and does it by broadcasting the NewView message. Beforehand, however, it needs to go through received ViewChanges and sort a few things out.

NOTE: To be clear: since the new leader needs at least 2f + 1 ViewChange messages, it is not enough if only one backup triggers the view change, so one might wonder: how is it triggered on all of them (or at least 2f + 1)? This comes from the client behavior (see Client Request section in part one), which if tired of waiting for the responses is going to broadcast the request to all replicas participating in the consensus. This in turn will trigger a view change timer on all of them, and if the leader is down or disconnected should result in a fairly smooth transition to the new view.

Transitioning to the next view can be tricky as there might be a bunch of messages floating around in different stages of execution. For some, the PrePrepare could have been created, but only some replicas received it before triggering ViewChange, others might already be prepared but not yet committed, while a few can already be applied to the state but are not yet part of the checkpoint.

In order to keep the execution coherent there are a few things that a new leader needs to sort out before handling new messages. It starts out by determining min-s and max-s:

  • min-s - is the sequence number of the latest checkpoint, from all the latest checkpoints received in different ViewChanges. This number serves as a base on which all (or more precisely honest majority) agree. That is because those were stable which guarantees that the 2f + 1 of replicas agreed on it.
  • max-s - is the sequence number of highest prepared proofed and sent as part of ViewChanges. This one determines the last message that replicas agreed on before transitioning into the view change and that anything after that point can (and will) be forgotten, as consensus was not yet achieved.

Having those two numbers new leader can create the NewView message consisting of:

  • New view number.
  • Set of 2f + 1 valid (and signed) ViewChange messages from different replicas - as proof to others that it is legit.
  • Set of PrePrepare messages for all messages between min-s and max-s (the sequence numbers for those would not change, just a view they are in), or if such set would be empty (essentially min-s == max-s) the leader creates a PrePrepare message for a special NULL request, which is a no-op (does not change state in any way).
1
2
3
4
5
6
7
8
9
pub struct NewView {
    pub view: u64,
    // Proof of View Change messages received from different replicas
    pub view_change_messages: HashMap<String, SignedViewChange>,
    // Pre-prepare messages for those that were prepared in previous view, but
    // were not included in the last stable checkpoint
    // (or a single PrePrepare for NULL message).
    pub pre_prepares: Vec<SignedPrePrepare>,
}

Afterward, the NewView is ready to be broadcast, and the new leader officially transitions to its role. Before things get running smoothly however, replicas need to receive the NewView, ensure it is correct (verify those sweet proofs…), store new PrePrepare messages, and essentially push them through the whole protocol (by creating Prepares and so on).

The last_applied property, together with the fact that the sequence number for those requests did not change guarantees that they are not executed (or applied to the state) for a second time in case they were already committed before the view change.

Summary

This concludes my little write-up about implementing the Practical Byzantine Fault Tolerance Algorithm (finally :sweating:). If you got that far thank you for reading, and I hope you found something useful here or learned a thing or two - I certainly did.

If you spot any errors, have some suggestions, or just something you want to share, feel free to reach out, comment, or whatever, I appreciate it. And for now, I am happy to close this chapter in my side projects book and move on to the next one…