Repository: hadoop
Updated Branches:
  refs/heads/trunk 61b3547c4 -> 99e5204ff


HDFS-9178. Slow datanode I/O can cause a wrong node to be marked bad. 
Contributed by Kihwal Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/99e5204f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/99e5204f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/99e5204f

Branch: refs/heads/trunk
Commit: 99e5204ff5326430558b6f6fd9da7c44654c15d7
Parents: 61b3547
Author: Kihwal Lee <kih...@apache.org>
Authored: Wed Oct 7 10:17:17 2015 -0500
Committer: Kihwal Lee <kih...@apache.org>
Committed: Wed Oct 7 10:17:17 2015 -0500

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../hdfs/server/datanode/BlockReceiver.java     | 52 +++++++++++++++-
 .../server/datanode/DataNodeFaultInjector.java  |  8 +++
 .../TestClientProtocolForPipelineRecovery.java  | 63 ++++++++++++++++++++
 4 files changed, 124 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5204f/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index e495497..33a78f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -2028,6 +2028,9 @@ Release 2.7.2 - UNRELEASED
     pool to be scanned but there are suspicious blocks. (Colin Patrick McCabe
     via yliu)
 
+    HDFS-9178. Slow datanode I/O can cause a wrong node to be marked bad
+    (kihwal)
+
 Release 2.7.1 - 2015-07-06
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5204f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index eec2b2d..4c40e83 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -23,6 +23,7 @@ import java.io.BufferedOutputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.FileDescriptor;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -136,6 +137,8 @@ class BlockReceiver implements Closeable {
   private DataOutputStream replyOut = null;
   
   private boolean pinning;
+  private long lastSentTime;
+  private long maxSendIdleTime;
 
   BlockReceiver(final ExtendedBlock block, final StorageType storageType,
       final DataInputStream in,
@@ -162,7 +165,8 @@ class BlockReceiver implements Closeable {
       this.datanodeSlowLogThresholdMs = 
datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
       // For replaceBlock() calls response should be sent to avoid 
socketTimeout
       // at clients. So sending with the interval of 0.5 * socketTimeout
-      this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 
0.5);
+      final long readTimeout = datanode.getDnConf().socketTimeout;
+      this.responseInterval = (long) (readTimeout * 0.5);
       //for datanode, we have
       //1: clientName.length() == 0, and
       //2: stage == null or PIPELINE_SETUP_CREATE
@@ -171,6 +175,12 @@ class BlockReceiver implements Closeable {
           || stage == BlockConstructionStage.TRANSFER_FINALIZED;
 
       this.pinning = pinning;
+      this.lastSentTime = Time.monotonicNow();
+      // Downstream will timeout in readTimeout on receiving the next packet.
+      // If there is no data traffic, a heartbeat packet is sent at
+      // the interval of 0.5*readTimeout. Here, we set 0.9*readTimeout to be
+      // the threshold for detecting congestion.
+      this.maxSendIdleTime = (long) (readTimeout * 0.9);
       if (LOG.isDebugEnabled()) {
         LOG.debug(getClass().getSimpleName() + ": " + block
             + "\n  isClient  =" + isClient + ", clientname=" + clientname
@@ -357,6 +367,25 @@ class BlockReceiver implements Closeable {
     }
   }
 
+  synchronized void setLastSentTime(long sentTime) {
+    lastSentTime = sentTime;
+  }
+
+  /**
+   * It can return false if
+   * - upstream did not send packet for a long time
+   * - a packet was received but got stuck in local disk I/O.
+   * - a packet was received but got stuck on send to mirror.
+   */
+  synchronized boolean packetSentInTime() {
+    long diff = Time.monotonicNow() - lastSentTime;
+    if (diff > maxSendIdleTime) {
+      LOG.info("A packet was last sent " + diff + " milliseconds ago.");
+      return false;
+    }
+    return true;
+  }
+
   /**
    * Flush block data and metadata files to disk.
    * @throws IOException
@@ -520,13 +549,21 @@ class BlockReceiver implements Closeable {
           lastPacketInBlock, offsetInBlock, Status.SUCCESS);
     }
 
+    // Drop heartbeat for testing.
+    if (seqno < 0 && len == 0 &&
+        DataNodeFaultInjector.get().dropHeartbeatPacket()) {
+      return 0;
+    }
+
     //First write the packet to the mirror:
     if (mirrorOut != null && !mirrorError) {
       try {
         long begin = Time.monotonicNow();
         packetReceiver.mirrorPacketTo(mirrorOut);
         mirrorOut.flush();
-        long duration = Time.monotonicNow() - begin;
+        long now = Time.monotonicNow();
+        setLastSentTime(now);
+        long duration = now - begin;
         if (duration > datanodeSlowLogThresholdMs) {
           LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
               + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
@@ -1296,6 +1333,17 @@ class BlockReceiver implements Closeable {
           } catch (IOException ioe) {
             if (Thread.interrupted()) {
               isInterrupted = true;
+            } else if (ioe instanceof EOFException && !packetSentInTime()) {
+              // The downstream error was caused by upstream including this
+              // node not sending packet in time. Let the upstream determine
+              // who is at fault.  If the immediate upstream node thinks it
+              // has sent a packet in time, this node will be reported as bad.
+              // Otherwise, the upstream node will propagate the error up by
+              // closing the connection.
+              LOG.warn("The downstream error might be due to congestion in " +
+                  "upstream including this node. Propagating the error: ",
+                  ioe);
+              throw ioe;
             } else {
               // continue to run even if can not read from mirror
               // notify client of the error

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5204f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index 65f0506..46ec3ae 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -36,9 +36,17 @@ public class DataNodeFaultInjector {
     return instance;
   }
 
+  public static void set(DataNodeFaultInjector injector) {
+    instance = injector;
+  }
+
   public void getHdfsBlocksMetadata() {}
 
   public void writeBlockAfterFlush() throws IOException {}
 
   public void sendShortCircuitShmResponse() throws IOException {}
+
+  public boolean dropHeartbeatPacket() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/99e5204f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
index b83157d..77cfb7c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
@@ -21,11 +21,14 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
@@ -160,6 +163,66 @@ public class TestClientProtocolForPipelineRecovery {
     }
   }
 
+  @Test
+  public void testPacketTransmissionDelay() throws Exception {
+    // Make the first datanode to not relay heartbeat packet.
+    DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() {
+      @Override
+      public boolean dropHeartbeatPacket() {
+        return true;
+      }
+    };
+    DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
+    DataNodeFaultInjector.set(dnFaultInjector);
+
+    // Setting the timeout to be 3 seconds. Normally heartbeat packet
+    // would be sent every 1.5 seconds if there is no data traffic.
+    Configuration conf = new HdfsConfiguration();
+    conf.set(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, "3000");
+    MiniDFSCluster cluster = null;
+
+    try {
+      int numDataNodes = 2;
+      cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+
+      FSDataOutputStream out = fs.create(new Path("noheartbeat.dat"), 
(short)2);
+      out.write(0x31);
+      out.hflush();
+
+      DFSOutputStream dfsOut = (DFSOutputStream)out.getWrappedStream();
+
+      // original pipeline
+      DatanodeInfo[] orgNodes = dfsOut.getPipeline();
+
+      // Cause the second datanode to timeout on reading packet
+      Thread.sleep(3500);
+      out.write(0x32);
+      out.hflush();
+
+      // new pipeline
+      DatanodeInfo[] newNodes = dfsOut.getPipeline();
+      out.close();
+
+      boolean contains = false;
+      for (int i = 0; i < newNodes.length; i++) {
+        if (orgNodes[0].getXferAddr().equals(newNodes[i].getXferAddr())) {
+          throw new IOException("The first datanode should have been 
replaced.");
+        }
+        if (orgNodes[1].getXferAddr().equals(newNodes[i].getXferAddr())) {
+          contains = true;
+        }
+      }
+      Assert.assertTrue(contains);
+    } finally {
+      DataNodeFaultInjector.set(oldDnInjector);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
   /**
    * Test recovery on restart OOB message. It also tests the delivery of 
    * OOB ack originating from the primary datanode. Since there is only

Reply via email to