[
https://issues.apache.org/jira/browse/HDFS-15925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18042346#comment-18042346
]
ASF GitHub Bot commented on HDFS-15925:
---------------------------------------
github-actions[bot] commented on PR #2821:
URL: https://github.com/apache/hadoop/pull/2821#issuecomment-3604493805
We're closing this stale PR because it has been open for 100 days with no
activity. This isn't a judgement on the merit of the PR in any way. It's just a
way of keeping the PR queue manageable.
If you feel like this was a mistake, or you would like to continue working
on it, please feel free to re-open it and ask for a committer to remove the
stale tag and review again.
Thanks all for your contribution.
> The lack of packet-level mirrorError state synchronization in
> BlockReceiver$PacketResponder can cause the HDFS client to hang
> -----------------------------------------------------------------------------------------------------------------------------
>
> Key: HDFS-15925
> URL: https://issues.apache.org/jira/browse/HDFS-15925
> Project: Hadoop HDFS
> Issue Type: Bug
> Components: datanode
> Affects Versions: 3.2.2
> Reporter: Haoze Wu
> Priority: Critical
> Labels: pull-request-available
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> When the datanode is receiving data block packets from a HDFS client and
> forwarding these packets to a mirror (another datanode) simultaneously, a
> single IOException in the datanode’s forwarding path can cause the client to
> get stuck for 1 min, without any logging. After 1 min, the client’s log shows
> a warning of EOFException and `Slow waitForAckedSeqno took 60106ms
> (threshold=30000ms)`.
> Normally the datanode will inform the client of this error state
> immediately, and then the client will resend the packets immediately. The
> whole process is very fast. After careful analyses, we find the above symptom
> is due to the lack of packet-level mirrorError state synchronization in
> BlockReceiver$PacketResponder: in some concurrency condition, the
> BlockReceiver$PacketResponder will hang for 1 min and then exit, without
> sending the error state to the client.
> *Root Cause Analysis*
> {code:java}
> //hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
> class BlockReceiver implements Closeable {
> // ...
> private void handleMirrorOutError(IOException ioe) throws IOException {
> // ...
> if (Thread.interrupted()) {
> throw ioe;
> } else { // encounter an error while writing to mirror
> // continue to run even if can not write to mirror
> // notify client of the error
> // and wait for the client to shut down the pipeline
> mirrorError = true; // line
> 461
> }
> }
> private int receivePacket() throws IOException {
> // read the next packet
> packetReceiver.receiveNextPacket(in); // line
> 528
> // ...
> boolean lastPacketInBlock = header.isLastPacketInBlock(); // line
> 551
> //First write the packet to the mirror:
> if (mirrorOut != null && !mirrorError) {
> try {
> // ...
> packetReceiver.mirrorPacketTo(mirrorOut); // line
> 588
> // ...
> } catch (IOException e) {
> handleMirrorOutError(e); // line
> 604
> }
> }
> // ...
> return lastPacketInBlock?-1:len; // line
> 849
> }
> void receiveBlock(...) throws IOException {
> // ...
> try {
> if (isClient && !isTransfer) {
> responder = new Daemon(datanode.threadGroup,
> new PacketResponder(replyOut, mirrIn, downstreams));
> responder.start(); // line
> 968
> }
> while(receivePacket() >= 0){/*Receive until the last packet*/} // line
> 971
> // wait for all outstanding packet responses. And then
> // indicate responder to gracefully shutdown.
> // Mark that responder has been closed for future processing
> if (responder != null) {
> ((PacketResponder)responder.getRunnable()).close(); // line
> 977
> responderClosed = true;
> }
> // ...
> } catch (IOException ioe) { // line
> 1003
> // ...
> } finally {
> // ...
> if (!responderClosed) { // Data transfer was not complete.
> if (responder != null) {
> // ...
> responder.interrupt(); // line
> 1046
> }
> // ...
> }
> if (responder != null) {
> try {
> responder.interrupt(); // line
> 1053
> // ...
> } catch (InterruptedException e) {
> responder.interrupt(); // line
> 1067
> // ...
> }
> // ...
> }
> }
> }
> }
> {code}
> In the `BlockReceiver.receivePacket` method, if the datanode fails to
> forward the packet to the mirror ( (line 588) due to an IOException, it is
> handled by line 604, which sets the mirrorError flag in line 461. According
> to the comments, the BlockReceiver keeps going with the mirrorError state,
> and the client would be notified of the error.
> However, jstack shows that the datanode gets stuck in the `DataXceiver`
> thread (receiving data block packets from client) and the
> `BlockReceiver$PacketResponder` thread (replying ACK packets to client). In
> particular, the `DataXceiver` thread gets stuck in the loop in line 971,
> which is further caused by blocking in line 528, meaning that the
> `lastPacketInBlock` packet has not arrived, and no more packets are coming in.
> {code:java}
> //hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
> class BlockReceiver implements Closeable {
> // ...
> class PacketResponder implements Runnable, Closeable {
> // ...
> public void run() {
> // ...
> while (isRunning() && !lastPacketInBlock) {
> // ...
> try {
> // ...
> PipelineAck ack = new PipelineAck();
> // ...
> try {
> if (... && !mirrorError) { // line
> 1381
> // ...
> // read an ack from downstream datanode
> ack.readFields(downstreamIn); // line
> 1384
> // ...
> }
> // ...
> } catch (InterruptedException ine) {
> isInterrupted = true; // line
> 1434
> } catch (IOException ioe) {
> if (Thread.interrupted()) {
> isInterrupted = true; // line
> 1437
> } else ...
> }
> if (Thread.interrupted() || isInterrupted) { // line
> 1458
> // ...
> LOG.info(myString + ": Thread is interrupted.");
> running = false;
> continue; // line
> 1472
> }
> // ...
> sendAckUpstream(ack, expected, totalAckTimeNanos, // line
> 1481
> (pkt != null ? pkt.offsetInBlock : 0),
> PipelineAck.combineHeader(datanode.getECN(), myStatus));
> // ...
> } catch (IOException e) {
> // ...
> } catch (Throwable e) {
> // ...
> }
> }
> LOG.info(myString + " terminating");
> }
> private void sendAckUpstream(...) throws IOException {
> try {
> // ...
> try {
> if (!running) return;
> sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos, // line
> 1568
> offsetInBlock, myHeader);
> } finally {
> // ...
> }
> } catch (InterruptedException ie) {
> // ...
> }
> }
> private void sendAckUpstreamUnprotected(...) throws IOException {
> final int[] replies;
> if (ack == null) {
> // ...
> replies = new int[] { myHeader };
> } else if (mirrorError) { // ack read error
> int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS);
> int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR);
> replies = new int[] {h, h1}; // line
> 1602
> } else {
> // ...
> }
> PipelineAck replyAck = new PipelineAck(seqno, replies,
> totalAckTimeNanos);
> // ...
> replyAck.write(upstreamOut); // line
> 1632
> // ...
> }
> }
> }
> {code}
> The `BlockReceiver$PacketResponder` thread checks the mirrorError flag in
> line 1381. The `DataXceiver` thread is run concurrently. If
> `BlockReceiver$PacketResponder` finds mirrorError is false, it will try to
> read the ACK packet from downstream (the mirror, another datanode) in line
> 1384, which is a blocking call.
> However, there is a race condition. If the mirrorError flag set by the
> `handleMirrorOutError` method is noticed in line 1381, then the
> `BlockReceiver$PacketResponder` thread will not run the blocking network I/O
> call in line 1384. Instead, it will go to line 1481, and then 1568, and then
> 1632. According to the code around line 1602, this ACK contains
> `Status.ERROR` which can warn the client. On the contrary, if the mirrorError
> flag is set after the timing of line 1381, the
> `BlockReceiver$PacketResponder` thread gets blocked in line 1384. In our
> scenario, a data block packet is not sent to the mirror datanode due to the
> IOException, so the corresponding ACK packet will not be sent by the mirror
> datanode either. Therefore, the `BlockReceiver$PacketResponder` thread will
> be blocked here for a long time.
> *Fix*
> The key is to avoid the problematic concurrency between
> `BlockReceiver#receivePacket` and the ACK packet (from downstream mirror
> datanode) reading in `BlockReceiver$PacketResponder`. The simplest way to do
> it is that, every time `BlockReceiver#receivePacket` successfully forwards a
> packet to the downstream mirror datanode, we grant one chance for
> `BlockReceiver$PacketResponder` to check the mirrorError state and read the
> ACK with the blocking I/O call. It is reasonable because if the datanode has
> not sent the packet, it is impossible for the `BlockReceiver$PacketResponder`
> to get the corresponding ACK.
> The implementation only needs a semaphore in
> `BlockReceiver$PacketResponder`, and will not affect the other components.
> *P.S.*
> We only talk about the reasoning on the symptom and the fix of this issue
> here. Actually this bug is also related to some behaviors in client side, but
> the reasoning would be a little complex. We have the complete analysis
> ([https://docs.google.com/document/d/1Hq1qhbNFfS7y9zTNZ0VXsN3rxqExlMPaAqz4RfCurpE/edit?usp=sharing])
> for reference, which analyzes the packet receiving & sending threads of
> datanode & client and explain how the aforementioned injection can make these
> 4 threads stuck in "deadlock".
> *Reproduction*
> Start HDFS (1 namanode, 2 datanodes) with the default configuration. Then
> execute a client (we used the command `bin/hdfs dfs -copyFromLocal ./foo.txt
> /1.txt` in the terminal). For each data block packet the client sends to the
> datanode, the datanode forwards it by line 588 in `BlockReceiver.java`
> ([https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java#L588]).
> Inject one single IOException there.
> Most of the time, we don't have the concurrency condition to trigger this
> bug. Now the reliable way we use to reproduce this bug is setting
> `dfs.blocksize` as `1m` in `hdfs-site.xml`. Then run `bin/hdfs dfs
> -copyFromLocal ./foo.txt /1.txt` where `./foo.txt` is a file of 15MB
> (generated from `fallocate -l 15000000 foo.txt`). Then do the aforementioned
> injection in the timing of the 12th occurrence of
> [https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java#L747].
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]