[ https://issues.apache.org/jira/browse/HDFS-9040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14741972#comment-14741972 ]
Zhe Zhang commented on HDFS-9040: --------------------------------- Great work Walter and Jing! I agree with Jing that handling the interleaving of different error-handling events is the most challenging part. Below is my analysis and 2 proposed simplifications based on Jing's patch. A non-striped {{DataStreamer}} goes through the following steps in error handling: {code} 1) Finds error => 2) Asks NN for new GS => 3) Gets new GS from NN => 4) Applies new GS to DN (createBlockOutputStream) => 5) Ack from DN => 6) Updates block on NN {code} The first simplification I want to propose is to *preallocate GS* when NN creates a new striped block group ({{FSN#createNewBlock}}). For each new striped block group we can reserve {{NUM_PARITY_BLOCKS}} GS's. Then steps 1~3 in the above sequence can be saved. If more than {{NUM_PARITY_BLOCKS}} errors have happened we shouldn't try to further recover anyway. The second simplification is a *propose-accept* protocol for steps 4~6. * The {{Coordinator}} should maintain two states: {code} class Coordinator { /** Highest proposed GS so far. Will be sent to NN when accepted by all healthy DNs */ AtomicLong proposedGS; /** Accepted GS on each DN */ synchronized long[] dnAcceptedGSs; } {code} * When {{streamer_i}} fails it checks whether its own accepted GS is equal to {{proposedGS}}. If so it will propose a new GS. {code} StripedDataStreamer#processDatanodeError: if (coordinator.dnAcceptedGSs[i] < coordinator.proposedGS) { // Do nothing. We are sure this internal block will have a // smaller GS than the eventual NN copy } else { // Bump proposed GS coordinator.proposedGS.getAndAdd(1); } {code} * In the normal {{run}} loop, if a streamer sees its DN version GS is smaller than proposed GS of the coordinator, that indicates an external error. The healthy streamer then does this: {code} StripedDataStreamer#updatePipelineInternal() { long newGS = coordinator.proposedGS; // Notify DN of the new GS success = createBlockOutputStream(..., newGS, ...); if (success) { // Update the ith accepted GS. coordinator.updateDNAcceptedGS(i, newGS); } } {code} * When {{Coordinator#updateDNAcceptedGS}} sees *{{proposedGS}} is equal to all {{dnAcceptedGSs}}* of all healthy streamers, it calls {{updatePipeline}} RPC to notify NN of the new GS. * {{dnAcceptedGSs}} can also be maintained by each individual streamer. * This simplification is actually similar to {{updateStreamerMap}} in Jing's patch. The motivation is to establish some invariances to guarantee correctness. I haven't finished reading Walter's patch and will post a review later. > Erasure coding: A BlockGroupDataStreamer to rule all internal blocks streamers > ------------------------------------------------------------------------------ > > Key: HDFS-9040 > URL: https://issues.apache.org/jira/browse/HDFS-9040 > Project: Hadoop HDFS > Issue Type: Sub-task > Reporter: Walter Su > Assignee: Walter Su > Attachments: HDFS-9040.00.patch, HDFS-9040.001.wip.patch > > > A {{BlockGroupDataStreamer}} to communicate with NN to allocate/update block, > and {{StripedDataStreamer}} s only have to stream blocks to DNs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)