Repository: hadoop Updated Branches: refs/heads/branch-2.7 a23107596 -> 5cb80992f
HDFS-9178. Slow datanode I/O can cause a wrong node to be marked bad. Contributed by Kihwal Lee. (cherry picked from commit 99e5204ff5326430558b6f6fd9da7c44654c15d7) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5cb80992 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5cb80992 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5cb80992 Branch: refs/heads/branch-2.7 Commit: 5cb80992fb7b5565433f8f217e89b6012e298df3 Parents: a231075 Author: Kihwal Lee <kih...@apache.org> Authored: Wed Oct 7 10:24:55 2015 -0500 Committer: Kihwal Lee <kih...@apache.org> Committed: Wed Oct 7 10:24:55 2015 -0500 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/server/datanode/BlockReceiver.java | 52 +++++++++++++++- .../server/datanode/DataNodeFaultInjector.java | 8 +++ .../TestClientProtocolForPipelineRecovery.java | 63 ++++++++++++++++++++ 4 files changed, 124 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cb80992/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index aa348b2..7020c7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -57,6 +57,9 @@ Release 2.7.2 - UNRELEASED pool to be scanned but there are suspicious blocks. (Colin Patrick McCabe via yliu) + HDFS-9178. Slow datanode I/O can cause a wrong node to be marked bad + (kihwal) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cb80992/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 8fba2ad..3be8d4c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -24,6 +24,7 @@ import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; +import java.io.EOFException; import java.io.FileDescriptor; import java.io.FileOutputStream; import java.io.IOException; @@ -134,6 +135,8 @@ class BlockReceiver implements Closeable { private DataOutputStream replyOut = null; private boolean pinning; + private long lastSentTime; + private long maxSendIdleTime; BlockReceiver(final ExtendedBlock block, final StorageType storageType, final DataInputStream in, @@ -160,7 +163,8 @@ class BlockReceiver implements Closeable { this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs; // For replaceBlock() calls response should be sent to avoid socketTimeout // at clients. So sending with the interval of 0.5 * socketTimeout - this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5); + final long readTimeout = datanode.getDnConf().socketTimeout; + this.responseInterval = (long) (readTimeout * 0.5); //for datanode, we have //1: clientName.length() == 0, and //2: stage == null or PIPELINE_SETUP_CREATE @@ -169,6 +173,12 @@ class BlockReceiver implements Closeable { || stage == BlockConstructionStage.TRANSFER_FINALIZED; this.pinning = pinning; + this.lastSentTime = Time.monotonicNow(); + // Downstream will timeout in readTimeout on receiving the next packet. + // If there is no data traffic, a heartbeat packet is sent at + // the interval of 0.5*readTimeout. Here, we set 0.9*readTimeout to be + // the threshold for detecting congestion. + this.maxSendIdleTime = (long) (readTimeout * 0.9); if (LOG.isDebugEnabled()) { LOG.debug(getClass().getSimpleName() + ": " + block + "\n isClient =" + isClient + ", clientname=" + clientname @@ -351,6 +361,25 @@ class BlockReceiver implements Closeable { } } + synchronized void setLastSentTime(long sentTime) { + lastSentTime = sentTime; + } + + /** + * It can return false if + * - upstream did not send packet for a long time + * - a packet was received but got stuck in local disk I/O. + * - a packet was received but got stuck on send to mirror. + */ + synchronized boolean packetSentInTime() { + long diff = Time.monotonicNow() - lastSentTime; + if (diff > maxSendIdleTime) { + LOG.info("A packet was last sent " + diff + " milliseconds ago."); + return false; + } + return true; + } + /** * Flush block data and metadata files to disk. * @throws IOException @@ -514,13 +543,21 @@ class BlockReceiver implements Closeable { lastPacketInBlock, offsetInBlock, Status.SUCCESS); } + // Drop heartbeat for testing. + if (seqno < 0 && len == 0 && + DataNodeFaultInjector.get().dropHeartbeatPacket()) { + return 0; + } + //First write the packet to the mirror: if (mirrorOut != null && !mirrorError) { try { long begin = Time.monotonicNow(); packetReceiver.mirrorPacketTo(mirrorOut); mirrorOut.flush(); - long duration = Time.monotonicNow() - begin; + long now = Time.monotonicNow(); + setLastSentTime(now); + long duration = now - begin; if (duration > datanodeSlowLogThresholdMs) { LOG.warn("Slow BlockReceiver write packet to mirror took " + duration + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)"); @@ -1298,6 +1335,17 @@ class BlockReceiver implements Closeable { } catch (IOException ioe) { if (Thread.interrupted()) { isInterrupted = true; + } else if (ioe instanceof EOFException && !packetSentInTime()) { + // The downstream error was caused by upstream including this + // node not sending packet in time. Let the upstream determine + // who is at fault. If the immediate upstream node thinks it + // has sent a packet in time, this node will be reported as bad. + // Otherwise, the upstream node will propagate the error up by + // closing the connection. + LOG.warn("The downstream error might be due to congestion in " + + "upstream including this node. Propagating the error: ", + ioe); + throw ioe; } else { // continue to run even if can not read from mirror // notify client of the error http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cb80992/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 65f0506..46ec3ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -36,9 +36,17 @@ public class DataNodeFaultInjector { return instance; } + public static void set(DataNodeFaultInjector injector) { + instance = injector; + } + public void getHdfsBlocksMetadata() {} public void writeBlockAfterFlush() throws IOException {} public void sendShortCircuitShmResponse() throws IOException {} + + public boolean dropHeartbeatPacket() { + return false; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cb80992/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index 04853bd..d71bc4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -21,10 +21,13 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.tools.DFSAdmin; @@ -162,6 +165,66 @@ public class TestClientProtocolForPipelineRecovery { } } + @Test + public void testPacketTransmissionDelay() throws Exception { + // Make the first datanode to not relay heartbeat packet. + DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() { + @Override + public boolean dropHeartbeatPacket() { + return true; + } + }; + DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get(); + DataNodeFaultInjector.set(dnFaultInjector); + + // Setting the timeout to be 3 seconds. Normally heartbeat packet + // would be sent every 1.5 seconds if there is no data traffic. + Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, "3000"); + MiniDFSCluster cluster = null; + + try { + int numDataNodes = 2; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + + FSDataOutputStream out = fs.create(new Path("noheartbeat.dat"), (short)2); + out.write(0x31); + out.hflush(); + + DFSOutputStream dfsOut = (DFSOutputStream)out.getWrappedStream(); + + // original pipeline + DatanodeInfo[] orgNodes = dfsOut.getPipeline(); + + // Cause the second datanode to timeout on reading packet + Thread.sleep(3500); + out.write(0x32); + out.hflush(); + + // new pipeline + DatanodeInfo[] newNodes = dfsOut.getPipeline(); + out.close(); + + boolean contains = false; + for (int i = 0; i < newNodes.length; i++) { + if (orgNodes[0].getXferAddr().equals(newNodes[i].getXferAddr())) { + throw new IOException("The first datanode should have been replaced."); + } + if (orgNodes[1].getXferAddr().equals(newNodes[i].getXferAddr())) { + contains = true; + } + } + Assert.assertTrue(contains); + } finally { + DataNodeFaultInjector.set(oldDnInjector); + if (cluster != null) { + cluster.shutdown(); + } + } + } + /** * Test recovery on restart OOB message. It also tests the delivery of * OOB ack originating from the primary datanode. Since there is only