[ 
https://issues.apache.org/jira/browse/HDFS-3077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13229343#comment-13229343
 ] 

Todd Lipcon commented on HDFS-3077:
-----------------------------------

Hi Suresh,

Apologies in advance for the length of this comment - I wanted to cover all the 
details clearly.

If understand your proposal correctly from what we discussed yesterday, it is:
1. Configure N journals. Pick some value W < N which is the number of required 
journals for the ANN to stay up.
2. On write, try to write to all N, synchronously (adding timeouts for any 
RPC-based writes)
3. If a replica fails, drop for future writes it until it recovers. If dropping 
a journal results in less than W active journals, abort the NN.
4. On read, open at least (N - W + 1) replicas. Sort them by their highest 
txid, and pick the Wth highest in the list. Synchronize the other edit logs 
with that one (truncating longer edits and extending shorter)

The logic of this solution is that, since the number of replicas read at read 
time is greater than the minimum number of replicas written at write time, we 
guarantee that the latest edits are indeed seen by the new active.

This is mostly great -- so long as you have an external fencing strategy which 
prevents the old active from attempting to continue to write after the new 
active is trying to read. In the current HA story, the fencing strategy is PDU 
STONITH or fencing hooks into a NAS device, but the major goal of this issue is 
to remove the need for such specialized hardware.

One obvious example of where this fails without fencing is the following:
- Active NN successfully writes to 1 of 3 replicas, then the network connection 
goes flaky. The write to the second replica is left in a TCP buffer.
- ZK indicates the NN is dead. The standby notices this and fails over. Since 
the above write only hit one replica, it is counted as "uncommitted", and 
thrown away during recovery.
- Network connection resumes. Another replica receives the write and ACKs it. 
The old active then thinks the transaction was committed. Only then does it 
notice that the ZK lease is gone. Now we have divergent state since a client 
may think his transaction was committed when in fact it was not.

So, it follows that we just need to introduce some simple fencing/lease 
revocation into the logger daemons, such that before the new standby does its 
recovery process (step 4 above), it gets the loggers to promise not to accept 
edits from the old active. The simplest I could come up with is the following:

- add an RPC to the loggers called "fence old active and only accept writes 
from me"
- when a logger receives this, it promises to only accept writes from the new 
active (each node can have a UUID at startup)
- thus, in the above scenario, when the second write finally gets through after 
the network reconnects, the logger denies it
- before the reader performs recovery (step 4 above), it gets a quorum of 
loggers to fence the old active, thus preventing any further commits 
during/after recovery.

But, we still have one more problem: given some txid N, we might have multiple 
actives that have tried to write the same transaction ID. Example scenario:

- Active is writing just fine, and then loses network connectivity just before 
writing transaction 101. It manages to send txid 101 to its local replica but 
nowhere else
- Standby gets a quorum of nodes to agree to fence the old active, and then 
recovers to transaction id 100 (since live node ever saw #101)
- Standby writes transaction 101 correctly to the two live nodes, then all 
nodes crash
- Now when we come back up, all of the nodes have the same length edit logs 
(101 edits), but depending which replica we read, we might end up using the 
incorrect (uncommitted) value instead of the committed one.

You can also engineer such a situation a number of other ways. It's slightly 
contrived, but in this area, I want to use a protocol that is _correct_ (not 
just _usually correct_) in the face of failures.

So, the above implies we need something to distinguish in-flight transactions 
for recovery aside from simply their transaction ID. In stable versions of HDFS 
we use the "fstime" file for this case - whenever we do an error recovery, we 
bump the fstime on the still-alive storage dirs, and at recovery time, we 
ignore directories with less than the latest time. Doing the same here would 
rely on synchronized clocks, which is a distributed systems faux pas (clock 
skew between your NNs shouldn't result in data loss!!).

A nice parallel here is what we do with the HDFS write pipeline recovery -- the 
generation stamp of a block is basically like a logical timestamp. Replicas 
that don't participate in recovery are left with an older genstamp, which 
allows us to disregard them in the future after more data has been appended. 
So, we need something like a generation stamp for our writers, such that each 
writer gets a distinct generation stamp higher than all previous generation 
stamps. Assume we have some magic oracle to provide these genstamps. Our 
protocol is now:

- NN starts up, and gets a genstamp N from the oracle which is higher than all 
previous ones.
- It gets a quorum of loggers to agree not to accept any actions from NNs with 
a lower genstamp. The loggers also respond with the genstamp of the previous 
writer, if that writer had uncommitted transactions.
- The NN performs recovery, but ignoring transactions from "ancient" replicas 
(i.e not from the newest unrecovered active).
- NN now can take over and start writing new transactions.

If you're familiar with ZAB, you'll notice that we're now getting vanishingly 
close to an implementation thereof -- the "genstamp" above is what ZAB calls an 
"epoch". The only thing we haven't included is how this genstamp oracle is 
implemented:

- In ZAB, it uses a distributed protocol to allow a new leader/active to 
generate a unique increasing epoch number.
- In an initial implementation of this system, I am planning to bootstrap off 
of ZK to generate these numbers. This will make the system more simple.

The only remaining difference between your proposal and the above is the actual 
commit protocol. Currently, the NN tries to commit to all replicas and only 
responds after all have succeeded, or after dropping the "errored" replicas 
from the list. I think a quorum commit is vastly superior for HA, especially 
given we'd like to collocate the log replicas on machines doing other work. 
When those machines have latency hiccups, or crash, we don't want the active NN 
to have to wait for long timeout periods before continuing. For example, using 
the current BN code, if you {{kill -STOP}} the BN, the NN will hang 
indefinitely.

My overall philosophy is the same as yours -- it should be as simple as 
possible -- but like Einstein said, no simpler! Whenever I start trying to 
design a protocol that ends up smelling just like a consensus protocol, I'd 
rather go "by the book" since I know Ben Reed and folks are way smarter than me 
and thus much more likely to get it right than an ad-hoc design.

                
> Quorum-based protocol for reading and writing edit logs
> -------------------------------------------------------
>
>                 Key: HDFS-3077
>                 URL: https://issues.apache.org/jira/browse/HDFS-3077
>             Project: Hadoop HDFS
>          Issue Type: New Feature
>          Components: ha, name-node
>            Reporter: Todd Lipcon
>            Assignee: Todd Lipcon
>
> Currently, one of the weak points of the HA design is that it relies on 
> shared storage such as an NFS filer for the shared edit log. One alternative 
> that has been proposed is to depend on BookKeeper, a ZooKeeper subproject 
> which provides a highly available replicated edit log on commodity hardware. 
> This JIRA is to implement another alternative, based on a quorum commit 
> protocol, integrated more tightly in HDFS and with the requirements driven 
> only by HDFS's needs rather than more generic use cases. More details to 
> follow.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to