[
https://issues.apache.org/jira/browse/HDFS-3077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13229343#comment-13229343
]
Todd Lipcon commented on HDFS-3077:
-----------------------------------
Hi Suresh,
Apologies in advance for the length of this comment - I wanted to cover all the
details clearly.
If understand your proposal correctly from what we discussed yesterday, it is:
1. Configure N journals. Pick some value W < N which is the number of required
journals for the ANN to stay up.
2. On write, try to write to all N, synchronously (adding timeouts for any
RPC-based writes)
3. If a replica fails, drop for future writes it until it recovers. If dropping
a journal results in less than W active journals, abort the NN.
4. On read, open at least (N - W + 1) replicas. Sort them by their highest
txid, and pick the Wth highest in the list. Synchronize the other edit logs
with that one (truncating longer edits and extending shorter)
The logic of this solution is that, since the number of replicas read at read
time is greater than the minimum number of replicas written at write time, we
guarantee that the latest edits are indeed seen by the new active.
This is mostly great -- so long as you have an external fencing strategy which
prevents the old active from attempting to continue to write after the new
active is trying to read. In the current HA story, the fencing strategy is PDU
STONITH or fencing hooks into a NAS device, but the major goal of this issue is
to remove the need for such specialized hardware.
One obvious example of where this fails without fencing is the following:
- Active NN successfully writes to 1 of 3 replicas, then the network connection
goes flaky. The write to the second replica is left in a TCP buffer.
- ZK indicates the NN is dead. The standby notices this and fails over. Since
the above write only hit one replica, it is counted as "uncommitted", and
thrown away during recovery.
- Network connection resumes. Another replica receives the write and ACKs it.
The old active then thinks the transaction was committed. Only then does it
notice that the ZK lease is gone. Now we have divergent state since a client
may think his transaction was committed when in fact it was not.
So, it follows that we just need to introduce some simple fencing/lease
revocation into the logger daemons, such that before the new standby does its
recovery process (step 4 above), it gets the loggers to promise not to accept
edits from the old active. The simplest I could come up with is the following:
- add an RPC to the loggers called "fence old active and only accept writes
from me"
- when a logger receives this, it promises to only accept writes from the new
active (each node can have a UUID at startup)
- thus, in the above scenario, when the second write finally gets through after
the network reconnects, the logger denies it
- before the reader performs recovery (step 4 above), it gets a quorum of
loggers to fence the old active, thus preventing any further commits
during/after recovery.
But, we still have one more problem: given some txid N, we might have multiple
actives that have tried to write the same transaction ID. Example scenario:
- Active is writing just fine, and then loses network connectivity just before
writing transaction 101. It manages to send txid 101 to its local replica but
nowhere else
- Standby gets a quorum of nodes to agree to fence the old active, and then
recovers to transaction id 100 (since live node ever saw #101)
- Standby writes transaction 101 correctly to the two live nodes, then all
nodes crash
- Now when we come back up, all of the nodes have the same length edit logs
(101 edits), but depending which replica we read, we might end up using the
incorrect (uncommitted) value instead of the committed one.
You can also engineer such a situation a number of other ways. It's slightly
contrived, but in this area, I want to use a protocol that is _correct_ (not
just _usually correct_) in the face of failures.
So, the above implies we need something to distinguish in-flight transactions
for recovery aside from simply their transaction ID. In stable versions of HDFS
we use the "fstime" file for this case - whenever we do an error recovery, we
bump the fstime on the still-alive storage dirs, and at recovery time, we
ignore directories with less than the latest time. Doing the same here would
rely on synchronized clocks, which is a distributed systems faux pas (clock
skew between your NNs shouldn't result in data loss!!).
A nice parallel here is what we do with the HDFS write pipeline recovery -- the
generation stamp of a block is basically like a logical timestamp. Replicas
that don't participate in recovery are left with an older genstamp, which
allows us to disregard them in the future after more data has been appended.
So, we need something like a generation stamp for our writers, such that each
writer gets a distinct generation stamp higher than all previous generation
stamps. Assume we have some magic oracle to provide these genstamps. Our
protocol is now:
- NN starts up, and gets a genstamp N from the oracle which is higher than all
previous ones.
- It gets a quorum of loggers to agree not to accept any actions from NNs with
a lower genstamp. The loggers also respond with the genstamp of the previous
writer, if that writer had uncommitted transactions.
- The NN performs recovery, but ignoring transactions from "ancient" replicas
(i.e not from the newest unrecovered active).
- NN now can take over and start writing new transactions.
If you're familiar with ZAB, you'll notice that we're now getting vanishingly
close to an implementation thereof -- the "genstamp" above is what ZAB calls an
"epoch". The only thing we haven't included is how this genstamp oracle is
implemented:
- In ZAB, it uses a distributed protocol to allow a new leader/active to
generate a unique increasing epoch number.
- In an initial implementation of this system, I am planning to bootstrap off
of ZK to generate these numbers. This will make the system more simple.
The only remaining difference between your proposal and the above is the actual
commit protocol. Currently, the NN tries to commit to all replicas and only
responds after all have succeeded, or after dropping the "errored" replicas
from the list. I think a quorum commit is vastly superior for HA, especially
given we'd like to collocate the log replicas on machines doing other work.
When those machines have latency hiccups, or crash, we don't want the active NN
to have to wait for long timeout periods before continuing. For example, using
the current BN code, if you {{kill -STOP}} the BN, the NN will hang
indefinitely.
My overall philosophy is the same as yours -- it should be as simple as
possible -- but like Einstein said, no simpler! Whenever I start trying to
design a protocol that ends up smelling just like a consensus protocol, I'd
rather go "by the book" since I know Ben Reed and folks are way smarter than me
and thus much more likely to get it right than an ad-hoc design.
> Quorum-based protocol for reading and writing edit logs
> -------------------------------------------------------
>
> Key: HDFS-3077
> URL: https://issues.apache.org/jira/browse/HDFS-3077
> Project: Hadoop HDFS
> Issue Type: New Feature
> Components: ha, name-node
> Reporter: Todd Lipcon
> Assignee: Todd Lipcon
>
> Currently, one of the weak points of the HA design is that it relies on
> shared storage such as an NFS filer for the shared edit log. One alternative
> that has been proposed is to depend on BookKeeper, a ZooKeeper subproject
> which provides a highly available replicated edit log on commodity hardware.
> This JIRA is to implement another alternative, based on a quorum commit
> protocol, integrated more tightly in HDFS and with the requirements driven
> only by HDFS's needs rather than more generic use cases. More details to
> follow.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira