[jira] [Commented] (HDFS-9040) Erasure coding: A BlockGroupDataStreamer to rule all internal blocks streamers
[ https://issues.apache.org/jira/browse/HDFS-9040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14743182#comment-14743182 ] Li Bo commented on HDFS-9040: - Thanks Walter and Jing ‘s work. In Jing’s patch, {{allocateNewBlock}} is called in {{writeChunk()}}, which makes the packets in the {{dataQueue}} of streamer all belong to the same block. I think this is a good way to reduce the logic complexity. Currently the fastest streamer also has to wait for other streamers before requesting a following block group from NN, so I think we may not feel the writing speed becomes slow. > Erasure coding: A BlockGroupDataStreamer to rule all internal blocks streamers > -- > > Key: HDFS-9040 > URL: https://issues.apache.org/jira/browse/HDFS-9040 > Project: Hadoop HDFS > Issue Type: Sub-task >Reporter: Walter Su >Assignee: Walter Su > Attachments: HDFS-9040.00.patch, HDFS-9040.001.wip.patch > > > A {{BlockGroupDataStreamer}} to communicate with NN to allocate/update block, > and {{StripedDataStreamer}} s only have to stream blocks to DNs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (HDFS-9040) Erasure coding: A BlockGroupDataStreamer to rule all internal blocks streamers
[ https://issues.apache.org/jira/browse/HDFS-9040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14742842#comment-14742842 ] Walter Su commented on HDFS-9040: - Jing's proposal looks great. Thanks for the effort. bq. The direction here is to make sure there is no overlap between different error handling efforts and the new block allocation. 1. Totally agree. In HDFS-8383 I try to make 2 error-handling not overlap. My method is simply restart another round of (updateBlockForPipeline, updatePipeline). Your method decouples them, you restart {{updateBlockForPipeline}} many times and call {{updatePipeline}} one time in the end. So, At first, I'll merge HDFS-8383.01.patch into BlockGroupDataStreamer. Then I'll try replace it with your method. 2. And I never thought we shouldn't overlap error-handling with new-block-allocation as well. Your method is to postpone it. That's great. 3. The reason I prefer not to do {{locateFollowingBlock}} in DFSOutputStream is, DFSOutputStream is async with DataStreamer. DFSOutputStream shouldn't block during new-block-allocation. (Well, it blocks when dataQueue congested) bq. The complicated part is, when a streamer#0 ends, you can't bump GS for it. 4. You forgot this issue. DataStreamer wait {{ackQueue}} to be empty before it close blockStream. With {{BlockGroupDataStreamer}} I can make 9 internal streamers to wait for error-handling to be finished, until then I put empty_last_packet to all 9 internal streamers to let them close blockStreams. ( It slows down the fastest streamer. That's a trade-off.) 5. It's great you did streamer replacement. We can make HDFS-8704 very easy. > Erasure coding: A BlockGroupDataStreamer to rule all internal blocks streamers > -- > > Key: HDFS-9040 > URL: https://issues.apache.org/jira/browse/HDFS-9040 > Project: Hadoop HDFS > Issue Type: Sub-task >Reporter: Walter Su >Assignee: Walter Su > Attachments: HDFS-9040.00.patch, HDFS-9040.001.wip.patch > > > A {{BlockGroupDataStreamer}} to communicate with NN to allocate/update block, > and {{StripedDataStreamer}} s only have to stream blocks to DNs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (HDFS-9040) Erasure coding: A BlockGroupDataStreamer to rule all internal blocks streamers
[ https://issues.apache.org/jira/browse/HDFS-9040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14742796#comment-14742796 ] Walter Su commented on HDFS-9040: - bq. preallocate GS when NN creates a new striped block group (FSN#createNewBlock). For each new striped block group we can reserve NUM_PARITY_BLOCKS GS's. Then steps 1~3 in the above sequence can be saved. Good idea Zhe. I have a concern that every time {{createBlockOutputStream}} requires a fresh blockToken. (It doesn't matter token expires during streaming.) > Erasure coding: A BlockGroupDataStreamer to rule all internal blocks streamers > -- > > Key: HDFS-9040 > URL: https://issues.apache.org/jira/browse/HDFS-9040 > Project: Hadoop HDFS > Issue Type: Sub-task >Reporter: Walter Su >Assignee: Walter Su > Attachments: HDFS-9040.00.patch, HDFS-9040.001.wip.patch > > > A {{BlockGroupDataStreamer}} to communicate with NN to allocate/update block, > and {{StripedDataStreamer}} s only have to stream blocks to DNs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (HDFS-9040) Erasure coding: A BlockGroupDataStreamer to rule all internal blocks streamers
[ https://issues.apache.org/jira/browse/HDFS-9040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14741972#comment-14741972 ] Zhe Zhang commented on HDFS-9040: - Great work Walter and Jing! I agree with Jing that handling the interleaving of different error-handling events is the most challenging part. Below is my analysis and 2 proposed simplifications based on Jing's patch. A non-striped {{DataStreamer}} goes through the following steps in error handling: {code} 1) Finds error => 2) Asks NN for new GS => 3) Gets new GS from NN => 4) Applies new GS to DN (createBlockOutputStream) => 5) Ack from DN => 6) Updates block on NN {code} The first simplification I want to propose is to *preallocate GS* when NN creates a new striped block group ({{FSN#createNewBlock}}). For each new striped block group we can reserve {{NUM_PARITY_BLOCKS}} GS's. Then steps 1~3 in the above sequence can be saved. If more than {{NUM_PARITY_BLOCKS}} errors have happened we shouldn't try to further recover anyway. The second simplification is a *propose-accept* protocol for steps 4~6. * The {{Coordinator}} should maintain two states: {code} class Coordinator { /** Highest proposed GS so far. Will be sent to NN when accepted by all healthy DNs */ AtomicLong proposedGS; /** Accepted GS on each DN */ synchronized long[] dnAcceptedGSs; } {code} * When {{streamer_i}} fails it checks whether its own accepted GS is equal to {{proposedGS}}. If so it will propose a new GS. {code} StripedDataStreamer#processDatanodeError: if (coordinator.dnAcceptedGSs[i] < coordinator.proposedGS) { // Do nothing. We are sure this internal block will have a // smaller GS than the eventual NN copy } else { // Bump proposed GS coordinator.proposedGS.getAndAdd(1); } {code} * In the normal {{run}} loop, if a streamer sees its DN version GS is smaller than proposed GS of the coordinator, that indicates an external error. The healthy streamer then does this: {code} StripedDataStreamer#updatePipelineInternal() { long newGS = coordinator.proposedGS; // Notify DN of the new GS success = createBlockOutputStream(..., newGS, ...); if (success) { // Update the ith accepted GS. coordinator.updateDNAcceptedGS(i, newGS); } } {code} * When {{Coordinator#updateDNAcceptedGS}} sees *{{proposedGS}} is equal to all {{dnAcceptedGSs}}* of all healthy streamers, it calls {{updatePipeline}} RPC to notify NN of the new GS. * {{dnAcceptedGSs}} can also be maintained by each individual streamer. * This simplification is actually similar to {{updateStreamerMap}} in Jing's patch. The motivation is to establish some invariances to guarantee correctness. I haven't finished reading Walter's patch and will post a review later. > Erasure coding: A BlockGroupDataStreamer to rule all internal blocks streamers > -- > > Key: HDFS-9040 > URL: https://issues.apache.org/jira/browse/HDFS-9040 > Project: Hadoop HDFS > Issue Type: Sub-task >Reporter: Walter Su >Assignee: Walter Su > Attachments: HDFS-9040.00.patch, HDFS-9040.001.wip.patch > > > A {{BlockGroupDataStreamer}} to communicate with NN to allocate/update block, > and {{StripedDataStreamer}} s only have to stream blocks to DNs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (HDFS-9040) Erasure coding: A BlockGroupDataStreamer to rule all internal blocks streamers
[ https://issues.apache.org/jira/browse/HDFS-9040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14741388#comment-14741388 ] Jing Zhao commented on HDFS-9040: - Thanks for the patch, Walter! I think this looks much clearer compared with the current implementation. Some thoughts and comments: # In general I think it's the correct direction to push all the coordination logic into one place, and let all the other streamers simply transfer data. # Currently the new block allocation step and failure handling steps can still be interleaved. To me this may be too hard to guarantee the correctness. For example, we need to handle a scenario where some data streamer has not fetched the new block yet when the coordinator starts handling a failure. The current patch tries to handle this by checking the corresponding following block queue. But since a data streamer can be in a state where it fetches the new block but has not assigned new values to its nodes/storageTypes, we may still have some race condition. Thus I agree with Nicholas's comment [here|https://issues.apache.org/jira/browse/HDFS-8383?focusedCommentId=14737962&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14737962], i.e., we need to add some "barriers" to sync all the data streamers and so as to simplify the problem. # More specifically, my current proposal for failure handling looks like this: The coordinator side: #* Check if there is failure(s) periodically. If we use DFSStripedOutputStream as the coordinator, we can easily do this in {{writeChunk}}, e.g., to check failures whenever we've received one stripe of data. #* If there is new failure, first wait till all the healthy streamers fetch the new block and are in DATA_STREAMING stage. #* Mark all the healthy streamers as external error. #* Call updateBlockForPipeline and get the new GS. #* Wait till all the healthy streamers to fetch the new block from the queue and creating new block streams. #* If there is new failure happening when creating new block streams, notify all the remaining streamers the failure and keep them in the external error state. Repeat the above steps. #* Otherwise reset all the external error states and make the updatePipeline RPC call. Then notify all the streamers that this failure handling session has succeeded. # The DataStreamer side: #* When finding itself in external error state, wait and take the new block from the blocking queue. #* Create new datanode connection using the new block. #* Notify the coordinator the result of the new datanode connection creation. #* If the connection creation succeeded, wait the coordinator for the overall result. #* If all the involving streamers succeed, update its block based on the new GS. #* Otherwise repeat the steps. #* And instead of overriding updateBlockForPipeline and updatePipeline, it may be easier to implement the above logic by overriding {{setupPipelineForAppendOrRecovery}}. # Obviously the above proposal may still have some holes. But the direction here is to make sure there is no overlap between different error handling efforts and the new block allocation. Please see if this makes sense to you. # Also I think it is easier to implement the above logic in StripedOutputStream: 1) it's easier to determine when to start block allocation and failure check, 2) it's easier to handle exceptions during the NN RPCs since we do not need to pass the exception from a separate coordinator thread. But we can discuss this further and please let me know if I miss something. Currently I have an in-progress patch implementing the above proposal. I will try to make it in a better shape and post it as a demo soon. > Erasure coding: A BlockGroupDataStreamer to rule all internal blocks streamers > -- > > Key: HDFS-9040 > URL: https://issues.apache.org/jira/browse/HDFS-9040 > Project: Hadoop HDFS > Issue Type: Sub-task >Reporter: Walter Su >Assignee: Walter Su > Attachments: HDFS-9040.00.patch > > > A {{BlockGroupDataStreamer}} to communicate with NN to allocate/update block, > and {{StripedDataStreamer}} s only have to stream blocks to DNs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (HDFS-9040) Erasure coding: A BlockGroupDataStreamer to rule all internal blocks streamers
[ https://issues.apache.org/jira/browse/HDFS-9040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14738730#comment-14738730 ] Walter Su commented on HDFS-9040: - bq. The role of BlockGroupDataStreamer looks similar (or closely related) to Coordinator. Should we make it part of Coordinator? it's not like that. The old way requires swtiching streamers. With BGDataStreamer, the data flow becomes DFSStripedOutputStream --> BGDataStreamer --> StripedDataStreamer(s) bq. I think a challenge is whether / how to reuse the existing DFSOutputStream / DataStreamer code. It doesn't seem easy to avoid refactoring them. bq. Implementing a block group writer is a better solution but you have to copy a lot of existing code to the new class, which may be hard to be... Thanks for sharing the information. I'll try my best to avoid that. I'm still working on this. Will update a demo soon. > Erasure coding: A BlockGroupDataStreamer to rule all internal blocks streamers > -- > > Key: HDFS-9040 > URL: https://issues.apache.org/jira/browse/HDFS-9040 > Project: Hadoop HDFS > Issue Type: Sub-task >Reporter: Walter Su >Assignee: Walter Su > > A {{BlockGroupDataStreamer}} to communicate with NN to allocate/update block, > and {{StripedDataStreamer}} s only have to stream blocks to DNs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (HDFS-9040) Erasure coding: A BlockGroupDataStreamer to rule all internal blocks streamers
[ https://issues.apache.org/jira/browse/HDFS-9040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14738076#comment-14738076 ] Li Bo commented on HDFS-9040: - Implementing a block group writer is a better solution but you have to copy a lot of existing code to the new class, which may be hard to be accepted by community. At beginning I also tried this way but gave up after discussion. Current design and implementation give as few changes as possible to the existing code. > Erasure coding: A BlockGroupDataStreamer to rule all internal blocks streamers > -- > > Key: HDFS-9040 > URL: https://issues.apache.org/jira/browse/HDFS-9040 > Project: Hadoop HDFS > Issue Type: Sub-task >Reporter: Walter Su >Assignee: Walter Su > > A {{BlockGroupDataStreamer}} to communicate with NN to allocate/update block, > and {{StripedDataStreamer}} s only have to stream blocks to DNs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (HDFS-9040) Erasure coding: A BlockGroupDataStreamer to rule all internal blocks streamers
[ https://issues.apache.org/jira/browse/HDFS-9040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14736179#comment-14736179 ] Zhe Zhang commented on HDFS-9040: - Thanks for initiating the work Walter! While you are working on the patch, I have a couple of questions: # The role of {{BlockGroupDataStreamer}} looks similar (or closely related) to {{Coordinator}}. Should we make it part of {{Coordinator}}? # I think a challenge is whether / how to reuse the existing {{DFSOutputStream}} / {{DataStreamer}} code. It doesn't seem easy to avoid refactoring them. > Erasure coding: A BlockGroupDataStreamer to rule all internal blocks streamers > -- > > Key: HDFS-9040 > URL: https://issues.apache.org/jira/browse/HDFS-9040 > Project: Hadoop HDFS > Issue Type: Sub-task >Reporter: Walter Su >Assignee: Walter Su > > A {{BlockGroupDataStreamer}} to communicate with NN to allocate/update block, > and {{StripedDataStreamer}} s only have to stream blocks to DNs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)