[ 
https://issues.apache.org/jira/browse/HDFS-9040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14741972#comment-14741972
 ] 

Zhe Zhang commented on HDFS-9040:
---------------------------------

Great work Walter and Jing!

I agree with Jing that handling the interleaving of different error-handling 
events is the most challenging part. Below is my analysis and 2 proposed 
simplifications based on Jing's patch.

A non-striped {{DataStreamer}} goes through the following steps in error 
handling:
{code}
1) Finds error => 2) Asks NN for new GS => 3) Gets new GS from NN => 4) Applies 
new GS to DN (createBlockOutputStream) => 5) Ack from DN => 6) Updates block on 
NN
{code}

The first simplification I want to propose is to *preallocate GS* when NN 
creates a new striped block group ({{FSN#createNewBlock}}). For each new 
striped block group we can reserve {{NUM_PARITY_BLOCKS}} GS's. Then steps 1~3 
in the above sequence can be saved. If more than {{NUM_PARITY_BLOCKS}} errors 
have happened we shouldn't try to further recover anyway.

The second simplification is a *propose-accept* protocol for steps 4~6. 
* The {{Coordinator}} should maintain two states:
{code}
class Coordinator {
  /** Highest proposed GS so far. Will be sent to NN when accepted by all 
healthy DNs */
  AtomicLong proposedGS;

  /** Accepted GS on each DN */
  synchronized long[] dnAcceptedGSs;
}
{code}
* When {{streamer_i}} fails it checks whether its own accepted GS is equal to 
{{proposedGS}}. If so it will propose a new GS.
{code}
StripedDataStreamer#processDatanodeError:
if (coordinator.dnAcceptedGSs[i] < coordinator.proposedGS) {
  // Do nothing. We are sure this internal block will have a 
  // smaller GS than the eventual NN copy
} else {
  // Bump proposed GS
  coordinator.proposedGS.getAndAdd(1);
}
{code}
* In the normal {{run}} loop, if a streamer sees its DN version GS is smaller 
than proposed GS of the coordinator, that indicates an external error. The 
healthy streamer then does this:
{code}
StripedDataStreamer#updatePipelineInternal() {
  long newGS = coordinator.proposedGS;
  // Notify DN of the new GS
  success = createBlockOutputStream(..., newGS, ...);
  if (success) {
    // Update the ith accepted GS.
    coordinator.updateDNAcceptedGS(i, newGS);
  }
}
{code}
* When {{Coordinator#updateDNAcceptedGS}} sees *{{proposedGS}} is equal to all 
{{dnAcceptedGSs}}* of all healthy streamers, it calls {{updatePipeline}} RPC to 
notify NN of the new GS.
* {{dnAcceptedGSs}} can also be maintained by each individual streamer.
* This simplification is actually similar to {{updateStreamerMap}} in Jing's 
patch. The motivation is to establish some invariances to guarantee correctness.

I haven't finished reading Walter's patch and will post a review later.

> Erasure coding: A BlockGroupDataStreamer to rule all internal blocks streamers
> ------------------------------------------------------------------------------
>
>                 Key: HDFS-9040
>                 URL: https://issues.apache.org/jira/browse/HDFS-9040
>             Project: Hadoop HDFS
>          Issue Type: Sub-task
>            Reporter: Walter Su
>            Assignee: Walter Su
>         Attachments: HDFS-9040.00.patch, HDFS-9040.001.wip.patch
>
>
> A {{BlockGroupDataStreamer}} to communicate with NN to allocate/update block, 
> and {{StripedDataStreamer}} s only have to stream blocks to DNs. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to