A Few Notes on Kafka and Jepsen

Kyle has a good write-up on replication and partitions in Kafka. I am a big fan of this methodology (empiricism FTW), though it is a bit uncomfortable to watch ones own project strapped to the apparatus.

Kyle’s explanation is pretty clear, so I have only a few things to add.

First, as Kyle notes, we have unfortunately not yet beaten the CAP theorem, although to maintain parity with the rest of the distributed systems ecosystem we promise to start working on this as soon as possible.

It’s worth pointing out our application domain to understand our motivations. Kafka is meant as a data journal (like HDFS, but targeted at real-time streams) rather than a metadata journal (like Zookeeper). These often have slightly different design tradeoffs. For example the HDFS namenode handles metadata and now uses a majority vote quorum approach for HA (or just a single master for older versions), but the HDFS data nodes need to be a little bit more parsimonious with the actual data blocks (because people don’t want to have to store five copies of all their data to tolerate two failures). Kyle describes our motivation as handling more failures, which is correct and equivalent, but it is a little more intuitive to think of keeping fewer copies of data since your failure tolerance requirements are probably fixed and you add replicas to meet that. Essentially we want to make replication a practical default approach to handling high-volume real-time data problems.

At LinkedIn, for example, we are running Kafka with replication in production for all data and are doing north of 40 billion writes a day through this replicated log. So the number of writes required is a real and practical concern.

As Kyle points out there is no correct algorithm for guaranteeing consistency in the face of f failures with fewer than 2f+1 servers, but this turns out to not actually require 2f+1 copies of each piece of data. The trick to getting fewer copies of data while maintaining consistency is to treat data writes different from configuration changes (which for anyone who is not a log-replication nerds mean changes to the set of brokers replicating writes). This idea is by no means original to us. Cheap Paxos is one of Leslie Lamport’s Paxos variants that does something along these lines with the same goal, and PacificA is a system utilizing a similar technique. This split makes sense for a system designed to handle large data volume because the data will be much much larger than the metadata about configuration (you might have tens or hundreds of data nodes, but metadata remains tiny).

A log replication algorithm typically needs to guarantee something along the lines of “committed writes are not lost”. In a leader-based log replication algorithm this usually means that the leader must have all the committed writes. To ensure that this property can hold even when a new leader is chosen there must be some overlap between the replicas who have the write and the set of nodes who participate in choosing the new leader (to ensure that the chosen leader has all the committed writes). This is the quorum property. Any overlap between these sets will work: for example you can require a majority vote for the write and a majority vote for leader election, or you can require only a single acknowledgement on write but require unanimous vote to elect a leader (not at all useful, but correct!).

Kafka does something a little different, it maintains a dynamic set of in-sync replica brokers (the ISR) that grows and shrinks as failures occur. Each broker in this set must all acknowledge each write for it to be committed, as a result any broker in the ISR has all committed messages and is eligible for election. However failure to acknowledge will cause a broker to drop out of the in-sync set, reducing the set of nodes that must acknowledge. This is great, but pushes the consistency problem into maintaining the ISR. We hold the ISR in Zookeeper which does a full majority quorum for writes. In order to rejoin the ISR a failed node must catch up on replicating the log to come back into sync with the master.

In this sense (as with Cheap Paxos and other variants) there is actually a caveat on the failure tolerance: we can tolerate N-1 Kafka node failures, but only N/2-1 Zookeeper failures. This actually is sensible, though, as Kafka node count scales with data size but Zookeeper node count doesn’t. So we would commonly have five Zookeeper replicas but only replication factor 3 within Kafka (even if we have many, many Kafka servers—data in Kafka is partitioned so not all nodes are identical).

This approach has pros and cons. The pro is basically fewer copies of data which makes the kind of large data volume problems we target a lot more practical. The con is primarily having what amounts to two types of quorums (Kafka’s quorum and Zookeepers Quorum) and more nuanced failure detection. The criteria for a node to be alive thus includes both replicating the leader and maintaining a Zookeeper connection. Failure detection can always be a bit finicky in practice so this is a real issue. But since Kafka already had a Zookeeper dependency for a variety of other uses this seemed an okay tradeoff to make.

The issue Kyle demonstrates makes for a good illustration. In this scenario Kyle kills off all but one node in the ISR, then writes to the remaining node (which is now the leader), then kills this node and brings back the other nodes. I actually think the issue here is what we call “unclean leader election” rather than our approach to quorums or anything specific to network partitions. Any type of failure executed in this pattern should work just as well as network partitions to reproduce this case.

An equivalent scenario can be constructed for a majority vote quorum. For example consider using a majority vote quorum with 3 nodes (so you can tolerate 1 failure). Now say that one of your nodes in this three node quorum is obliterated. If you accept a write with only two servers the failure of another server breaks the quorum property so you will no longer be able to elect a new master or guarantee consistency. I think this is the same as accepting a write in Kafka with only a single remaining server given our quorum properties—both cases are one server failure away from data loss.

The data loss that Kyle causes actually comes from our behavior in handling the case where all nodes die. This is an interesting thing to consider. For example, in the majority vote quorum, what do you do if you have only a single server remaining out of your three node cluster? To maintain your formal distributed system guarantee you need not do anything since the precondition of your guarantee has been broken. At this point you can can technically just printf(“Sucks to be you”) and exit(666). But a practical system likely needs to think this through. After all you would argue, I still have almost all my data on my one remaining server. The crux is that if you use this data you potentially violate consistency and lose (or gain!) committed writes, if you don’t you remain consistent but if you can’t revive your other machines you’ve lost everything.

Is it better to be alive and wrong or right and dead?

As usual the right thing to do depends on the application. Perhaps you can recover the nodes and bring them back with data intact in which case waiting around with the system down may be the best thing. But it may also be that you are running a “system of record” for writes from online traffic in which case downtime equals data loss, and you had better bring back up whatever you’ve got right-the-fuck-now.

Currently we are more than a little aggressive about pursuing the later approach—when all nodes are down we will elect the first node to come up as the leader. This can be dangerous for use cases which require strong consistency: we will do this, print a nasty warning in the logs about data loss, and continue merrily along.

For Kafka we actually do have both types of application. When used as a log directly by applications, Kafka is the system of record so downtime generally means data loss. In this case I think availability is key and our consistency guarantees are already more than good enough. But for processing downstream of this—say a Samza job that pulls data out of upstream Kafka topics, processes it, and produces new data to downstream topics—downtime need not mean data loss, just delay (provided you can eventually restore on formerly in-sync node).

Kyle’s recommendation of disabling “unclean election” and thus requiring a node in the “in sync set” to come back before accepting further reads and writes is a good one, and I think this provides more flexibility around handling this case. We have had a JIRA to add this for a while, but haven’t quite gotten around to it.

The other suggestion of requiring additional acks beyond the minimum is interesting. This does reduce the probability of a committed message being lost, though to do so reduces the availability for writes. But you could actually make the same argument for a majority quorum algorithm to have a setting that allows setting the minimum acknowledgement higher than majority to avoid “risky” writes.

There is a lot more detail on Kafka’s approach to replication and the tradeoffs it involves here. There are a lot of other interesting bits there for fellow distributed systems nerds.

Notes

  1. mobocracy reblogged this from boredandroid
  2. boredandroid posted this