[ 
https://issues.apache.org/jira/browse/HDFS-15925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18042647#comment-18042647
 ] 

ASF GitHub Bot commented on HDFS-15925:
---------------------------------------

github-actions[bot] closed pull request #2821: HDFS-15925. The lack of 
packet-level mirrorError state synchronization in BlockReceiver can cause the 
HDFS client to hang
URL: https://github.com/apache/hadoop/pull/2821




> The lack of packet-level mirrorError state synchronization in 
> BlockReceiver$PacketResponder can cause the HDFS client to hang
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HDFS-15925
>                 URL: https://issues.apache.org/jira/browse/HDFS-15925
>             Project: Hadoop HDFS
>          Issue Type: Bug
>          Components: datanode
>    Affects Versions: 3.2.2
>            Reporter: Haoze Wu
>            Priority: Critical
>              Labels: pull-request-available
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
>     When the datanode is receiving data block packets from a HDFS client and 
> forwarding these packets to a mirror (another datanode) simultaneously, a 
> single IOException in the datanode’s forwarding path can cause the client to 
> get stuck for 1 min, without any logging. After 1 min, the client’s log shows 
> a warning of EOFException and `Slow waitForAckedSeqno took 60106ms 
> (threshold=30000ms)`.
>     Normally the datanode will inform the client of this error state 
> immediately, and then the client will resend the packets immediately. The 
> whole process is very fast. After careful analyses, we find the above symptom 
> is due to the lack of packet-level mirrorError state synchronization in 
> BlockReceiver$PacketResponder: in some concurrency condition, the 
> BlockReceiver$PacketResponder will hang for 1 min and then exit, without 
> sending the error state to the client.
> *Root Cause Analysis* 
> {code:java}
> //hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
> class BlockReceiver implements Closeable {
>   // ...
>   private void handleMirrorOutError(IOException ioe) throws IOException {
>     // ...
>     if (Thread.interrupted()) {
>       throw ioe;
>     } else { // encounter an error while writing to mirror
>       // continue to run even if can not write to mirror
>       // notify client of the error
>       // and wait for the client to shut down the pipeline
>       mirrorError = true;                                            // line 
> 461
>     }
>   }
>   private int receivePacket() throws IOException {
>     // read the next packet
>     packetReceiver.receiveNextPacket(in);                            // line 
> 528
>     // ...
>     boolean lastPacketInBlock = header.isLastPacketInBlock();        // line 
> 551
>     //First write the packet to the mirror:
>     if (mirrorOut != null && !mirrorError) {
>       try {
>         // ...
>         packetReceiver.mirrorPacketTo(mirrorOut);                    // line 
> 588
>         // ...
>       } catch (IOException e) {
>         handleMirrorOutError(e);                                     // line 
> 604
>       }
>     }
>     // ...
>     return lastPacketInBlock?-1:len;                                 // line 
> 849
>   }
>   void receiveBlock(...) throws IOException {
>     // ...
>     try {
>       if (isClient && !isTransfer) {
>         responder = new Daemon(datanode.threadGroup, 
>             new PacketResponder(replyOut, mirrIn, downstreams));
>         responder.start();                                           // line 
> 968
>       }
>       while(receivePacket() >= 0){/*Receive until the last packet*/} // line 
> 971
>       // wait for all outstanding packet responses. And then
>       // indicate responder to gracefully shutdown.
>       // Mark that responder has been closed for future processing
>       if (responder != null) {
>         ((PacketResponder)responder.getRunnable()).close();          // line 
> 977
>         responderClosed = true;
>       }
>       // ...
>     } catch (IOException ioe) {                                      // line 
> 1003
>       // ...
>     } finally {
>       // ...
>       if (!responderClosed) { // Data transfer was not complete.
>         if (responder != null) {
>           // ...
>           responder.interrupt();                                     // line 
> 1046
>         }
>         // ...
>       }
>       if (responder != null) {
>         try {
>           responder.interrupt();                                     // line 
> 1053
>           // ...
>         } catch (InterruptedException e) {
>           responder.interrupt();                                     // line 
> 1067
>           // ...
>         }
>         // ...
>       }
>     }
>   }
> }
> {code}
>     In the `BlockReceiver.receivePacket` method, if the datanode fails to 
> forward the packet to the mirror ( (line 588) due to an IOException, it is 
> handled by line 604, which sets the mirrorError flag in line 461. According 
> to the comments, the BlockReceiver keeps going with the mirrorError state, 
> and the client would be notified of the error.
>     However, jstack shows that the datanode gets stuck in the `DataXceiver` 
> thread (receiving data block packets from client) and the 
> `BlockReceiver$PacketResponder` thread (replying ACK packets to client). In 
> particular, the `DataXceiver` thread gets stuck in the loop in line 971, 
> which is further caused by blocking in line 528, meaning that the 
> `lastPacketInBlock` packet has not arrived, and no more packets are coming in.
> {code:java}
> //hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
> class BlockReceiver implements Closeable {
>   // ...
>   class PacketResponder implements Runnable, Closeable {
>     // ...
>     public void run() {
>       // ...
>       while (isRunning() && !lastPacketInBlock) {
>         // ...
>         try {
>           // ...
>           PipelineAck ack = new PipelineAck();
>           // ...
>           try {
>             if (... && !mirrorError) {                               // line 
> 1381
>               // ...
>               // read an ack from downstream datanode
>               ack.readFields(downstreamIn);                          // line 
> 1384
>               // ...
>             }
>             // ...
>           } catch (InterruptedException ine) {
>             isInterrupted = true;                                    // line 
> 1434
>           } catch (IOException ioe) {
>             if (Thread.interrupted()) {
>               isInterrupted = true;                                  // line 
> 1437
>             } else ...
>           }
>           if (Thread.interrupted() || isInterrupted) {               // line 
> 1458
>             // ...
>             LOG.info(myString + ": Thread is interrupted.");
>             running = false;
>             continue;                                                // line 
> 1472
>           }
>           // ...
>           sendAckUpstream(ack, expected, totalAckTimeNanos,          // line 
> 1481
>             (pkt != null ? pkt.offsetInBlock : 0),
>             PipelineAck.combineHeader(datanode.getECN(), myStatus));
>           // ...
>         } catch (IOException e) {
>           // ...
>         } catch (Throwable e) {
>           // ...
>         }
>       }
>       LOG.info(myString + " terminating");
>     }
>     private void sendAckUpstream(...) throws IOException {
>       try {
>         // ...
>         try {
>           if (!running) return;
>           sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos,  // line 
> 1568
>               offsetInBlock, myHeader);
>         } finally {
>           // ...
>         }
>       } catch (InterruptedException ie) {
>         // ...
>       }
>     }
>     private void sendAckUpstreamUnprotected(...) throws IOException {
>       final int[] replies;
>       if (ack == null) {
>         // ...
>         replies = new int[] { myHeader };
>       } else if (mirrorError) { // ack read error
>         int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS);
>         int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR);
>         replies = new int[] {h, h1};                                 // line 
> 1602
>       } else {
>         // ...
>       }
>       PipelineAck replyAck = new PipelineAck(seqno, replies,
>           totalAckTimeNanos);
>       // ...
>       replyAck.write(upstreamOut);                                   // line 
> 1632
>       // ...
>     }
>   }
> }
> {code}
>     The `BlockReceiver$PacketResponder` thread checks the mirrorError flag in 
> line 1381. The `DataXceiver` thread is run concurrently. If 
> `BlockReceiver$PacketResponder` finds mirrorError is false, it will try to 
> read the ACK packet from downstream (the mirror, another datanode) in line 
> 1384, which is a blocking call.
>     However, there is a race condition. If the mirrorError flag set by the 
> `handleMirrorOutError` method is noticed in line 1381, then the 
> `BlockReceiver$PacketResponder` thread will not run the blocking network I/O 
> call in line 1384. Instead, it will go to line 1481, and then 1568, and then 
> 1632. According to the code around line 1602, this ACK contains 
> `Status.ERROR` which can warn the client. On the contrary, if the mirrorError 
> flag is set after the timing of line 1381, the 
> `BlockReceiver$PacketResponder` thread gets blocked in line 1384. In our 
> scenario, a data block packet is not sent to the mirror datanode due to the 
> IOException, so the corresponding ACK packet will not be sent by the mirror 
> datanode either. Therefore, the `BlockReceiver$PacketResponder` thread will 
> be blocked here for a long time.
> *Fix*
>     The key is to avoid the problematic concurrency between 
> `BlockReceiver#receivePacket` and the ACK packet (from downstream mirror 
> datanode) reading in `BlockReceiver$PacketResponder`. The simplest way to do 
> it is that, every time `BlockReceiver#receivePacket` successfully forwards a 
> packet to the downstream mirror datanode, we grant one chance for 
> `BlockReceiver$PacketResponder` to check the mirrorError state and read the 
> ACK with the blocking I/O call. It is reasonable because if the datanode has 
> not sent the packet, it is impossible for the `BlockReceiver$PacketResponder` 
> to get the corresponding ACK.
>     The implementation only needs a semaphore in 
> `BlockReceiver$PacketResponder`, and will not affect the other components.
> *P.S.*
>     We only talk about the reasoning on the symptom and the fix of this issue 
> here. Actually this bug is also related to some behaviors in client side, but 
> the reasoning would be a little complex. We have the complete analysis 
> ([https://docs.google.com/document/d/1Hq1qhbNFfS7y9zTNZ0VXsN3rxqExlMPaAqz4RfCurpE/edit?usp=sharing])
>  for reference, which analyzes the packet receiving & sending threads of 
> datanode & client and explain how the aforementioned injection can make these 
> 4 threads stuck in "deadlock".
> *Reproduction*
>     Start HDFS (1 namanode, 2 datanodes) with the default configuration. Then 
> execute a client (we used the command `bin/hdfs dfs -copyFromLocal ./foo.txt 
> /1.txt` in the terminal). For each data block packet the client sends to the 
> datanode, the datanode forwards it by line 588 in `BlockReceiver.java` 
> ([https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java#L588]).
>  Inject one single IOException there.
>     Most of the time, we don't have the concurrency condition to trigger this 
> bug. Now the reliable way we use to reproduce this bug is setting 
> `dfs.blocksize` as `1m` in `hdfs-site.xml`. Then run `bin/hdfs dfs 
> -copyFromLocal ./foo.txt /1.txt` where `./foo.txt` is a file of 15MB 
> (generated from `fallocate -l 15000000 foo.txt`). Then do the aforementioned 
> injection in the timing of the 12th occurrence of 
> [https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java#L747].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to