This is an automated email from the ASF dual-hosted git repository.

yqlin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9b6906f  HDFS-14651. DeadNodeDetector checks dead node periodically. 
Contributed by Lisheng Sun.
9b6906f is described below

commit 9b6906fe914829f50076c2291dba59d425475d7b
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Fri Nov 22 10:53:55 2019 +0800

    HDFS-14651. DeadNodeDetector checks dead node periodically. Contributed by 
Lisheng Sun.
---
 .../java/org/apache/hadoop/hdfs/ClientContext.java |   2 +-
 .../org/apache/hadoop/hdfs/DeadNodeDetector.java   | 308 ++++++++++++++++++++-
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   |  22 ++
 .../src/main/resources/hdfs-default.xml            |  40 +++
 .../apache/hadoop/hdfs/TestDeadNodeDetection.java  | 154 +++++++++--
 5 files changed, 504 insertions(+), 22 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
index abb039c..6ee5277 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
@@ -150,7 +150,7 @@ public class ClientContext {
         conf.getWriteByteArrayManagerConf());
     this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled();
     if (deadNodeDetectionEnabled && deadNodeDetector == null) {
-      deadNodeDetector = new DeadNodeDetector(name);
+      deadNodeDetector = new DeadNodeDetector(name, config);
       deadNodeDetectorThr = new Daemon(deadNodeDetector);
       deadNodeDetectorThr.start();
     }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
index 1ac29a7..2fe7cf8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
@@ -17,13 +17,40 @@
  */
 package org.apache.hadoop.hdfs;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 
 /**
  * Detect the dead nodes in advance, and share this information among all the
@@ -48,10 +75,12 @@ public class DeadNodeDetector implements Runnable {
    */
   private String name;
 
+  private Configuration conf;
+
   /**
    * Dead nodes shared by all the DFSInputStreams of the client.
    */
-  private final ConcurrentHashMap<String, DatanodeInfo> deadNodes;
+  private final Map<String, DatanodeInfo> deadNodes;
 
   /**
    * Record dead nodes by one DFSInputStream. When dead node is not used by one
@@ -59,10 +88,67 @@ public class DeadNodeDetector implements Runnable {
    * DFSInputStream does not include any dead node, remove DFSInputStream from
    * dfsInputStreamNodes.
    */
-  private final ConcurrentHashMap<DFSInputStream, HashSet<DatanodeInfo>>
+  private final Map<DFSInputStream, HashSet<DatanodeInfo>>
           dfsInputStreamNodes;
 
   /**
+   * Datanodes that is being probed.
+   */
+  private Map<String, DatanodeInfo> probeInProg =
+      new ConcurrentHashMap<String, DatanodeInfo>();
+
+  /**
+   * The last time when detect dead node.
+   */
+  private long lastDetectDeadTS = 0;
+
+  /**
+   * Interval time in milliseconds for probing dead node behavior.
+   */
+  private long deadNodeDetectInterval = 0;
+
+  /**
+   * The max queue size of probing dead node.
+   */
+  private int maxDeadNodesProbeQueueLen = 0;
+
+  /**
+   * Connection timeout for probing dead node in milliseconds.
+   */
+  private long probeConnectionTimeoutMs;
+
+  /**
+   * The dead node probe queue.
+   */
+  private Queue<DatanodeInfo> deadNodesProbeQueue;
+
+  /**
+   * The thread pool of probing dead node.
+   */
+  private ExecutorService probeDeadNodesThreadPool;
+
+  /**
+   * The scheduler thread of probing dead node.
+   */
+  private Thread probeDeadNodesSchedulerThr;
+
+  /**
+   * The thread pool of probing datanodes' rpc request. Sometimes the data node
+   * can hang and not respond to the client in a short time. And these node 
will
+   * filled with probe thread pool and block other normal node probing.
+   */
+  private ExecutorService rpcThreadPool;
+
+  private int socketTimeout;
+
+  /**
+   * The type of probe.
+   */
+  private enum ProbeType {
+    CHECK_DEAD
+  }
+
+  /**
    * The state of DeadNodeDetector.
    */
   private enum State {
@@ -71,12 +157,40 @@ public class DeadNodeDetector implements Runnable {
 
   private State state;
 
-  public DeadNodeDetector(String name) {
+  public DeadNodeDetector(String name, Configuration conf) {
+    this.conf = new Configuration(conf);
     this.deadNodes = new ConcurrentHashMap<String, DatanodeInfo>();
     this.dfsInputStreamNodes =
         new ConcurrentHashMap<DFSInputStream, HashSet<DatanodeInfo>>();
     this.name = name;
 
+    deadNodeDetectInterval = conf.getLong(
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT);
+    socketTimeout =
+        conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT);
+    maxDeadNodesProbeQueueLen =
+        conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY,
+            DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT);
+    probeConnectionTimeoutMs = conf.getLong(
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY,
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT);
+
+    this.deadNodesProbeQueue =
+        new ArrayBlockingQueue<DatanodeInfo>(maxDeadNodesProbeQueueLen);
+
+    int deadNodeDetectDeadThreads =
+        conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY,
+            DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT);
+    int rpcThreads = 
conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY,
+        DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT);
+    probeDeadNodesThreadPool = Executors.newFixedThreadPool(
+        deadNodeDetectDeadThreads, new Daemon.DaemonFactory());
+    rpcThreadPool =
+        Executors.newFixedThreadPool(rpcThreads, new Daemon.DaemonFactory());
+
+    startProbeScheduler();
+
     LOG.info("Start dead node detector for DFSClient {}.", this.name);
     state = State.INIT;
   }
@@ -91,6 +205,9 @@ public class DeadNodeDetector implements Runnable {
       case INIT:
         init();
         break;
+      case CHECK_DEAD:
+        checkDeadNodes();
+        break;
       case IDLE:
         idle();
         break;
@@ -106,6 +223,134 @@ public class DeadNodeDetector implements Runnable {
     }
   }
 
+  /**
+   * Start probe dead node thread.
+   */
+  private void startProbeScheduler() {
+    probeDeadNodesSchedulerThr =
+            new Thread(new ProbeScheduler(this, ProbeType.CHECK_DEAD));
+    probeDeadNodesSchedulerThr.setDaemon(true);
+    probeDeadNodesSchedulerThr.start();
+  }
+
+  /**
+   * Prode datanode by probe byte.
+   */
+  private void scheduleProbe(ProbeType type) {
+    LOG.debug("Schedule probe datanode for probe type: {}.", type);
+    DatanodeInfo datanodeInfo = null;
+    if (type == ProbeType.CHECK_DEAD) {
+      while ((datanodeInfo = deadNodesProbeQueue.poll()) != null) {
+        if (probeInProg.containsKey(datanodeInfo.getDatanodeUuid())) {
+          LOG.debug("The datanode {} is already contained in probe queue, " +
+              "skip to add it.", datanodeInfo);
+          continue;
+        }
+        probeInProg.put(datanodeInfo.getDatanodeUuid(), datanodeInfo);
+        Probe probe = new Probe(this, datanodeInfo, ProbeType.CHECK_DEAD);
+        probeDeadNodesThreadPool.execute(probe);
+      }
+    }
+  }
+
+  /**
+   * Request the data node through rpc, and determine the data node status 
based
+   * on the returned result.
+   */
+  class Probe implements Runnable {
+    private DeadNodeDetector deadNodeDetector;
+    private DatanodeInfo datanodeInfo;
+    private ProbeType type;
+
+    Probe(DeadNodeDetector deadNodeDetector, DatanodeInfo datanodeInfo,
+        ProbeType type) {
+      this.deadNodeDetector = deadNodeDetector;
+      this.datanodeInfo = datanodeInfo;
+      this.type = type;
+    }
+
+    public DatanodeInfo getDatanodeInfo() {
+      return datanodeInfo;
+    }
+
+    public ProbeType getType() {
+      return type;
+    }
+
+    @Override
+    public void run() {
+      LOG.debug("Check node: {}, type: {}.", datanodeInfo, type);
+      try {
+        final ClientDatanodeProtocol proxy =
+            DFSUtilClient.createClientDatanodeProtocolProxy(datanodeInfo,
+                deadNodeDetector.conf, socketTimeout, true);
+
+        Future<DatanodeLocalInfo> future = rpcThreadPool.submit(new Callable() 
{
+          @Override
+          public DatanodeLocalInfo call() throws Exception {
+            return proxy.getDatanodeInfo();
+          }
+        });
+
+        try {
+          future.get(probeConnectionTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          LOG.error("Probe failed, datanode: {}, type: {}.", datanodeInfo, 
type,
+              e);
+          deadNodeDetector.probeCallBack(this, false);
+          return;
+        } finally {
+          future.cancel(true);
+        }
+        deadNodeDetector.probeCallBack(this, true);
+        return;
+      } catch (Exception e) {
+        LOG.error("Probe failed, datanode: {}, type: {}.", datanodeInfo, type,
+            e);
+      }
+
+      deadNodeDetector.probeCallBack(this, false);
+    }
+  }
+
+  /**
+   * Handle data node, according to probe result. When ProbeType is CHECK_DEAD,
+   * remove the datanode from DeadNodeDetector#deadNodes if probe success.
+   */
+  private void probeCallBack(Probe probe, boolean success) {
+    LOG.debug("Probe datanode: {} result: {}, type: {}",
+        probe.getDatanodeInfo(), success, probe.getType());
+    probeInProg.remove(probe.getDatanodeInfo().getDatanodeUuid());
+    if (success) {
+      if (probe.getType() == ProbeType.CHECK_DEAD) {
+        LOG.info("Remove the node out from dead node list: {}. ",
+            probe.getDatanodeInfo());
+        removeNodeFromDeadNode(probe.getDatanodeInfo());
+      }
+    }
+  }
+
+  /**
+   * Check dead node periodically.
+   */
+  private void checkDeadNodes() {
+    long ts = Time.monotonicNow();
+    if (ts - lastDetectDeadTS > deadNodeDetectInterval) {
+      Set<DatanodeInfo> datanodeInfos = clearAndGetDetectedDeadNodes();
+      for (DatanodeInfo datanodeInfo : datanodeInfos) {
+        LOG.debug("Add dead node to check: {}.", datanodeInfo);
+        if (!deadNodesProbeQueue.offer(datanodeInfo)) {
+          LOG.debug("Skip to add dead node {} to check " +
+              "since the probe queue is full.", datanodeInfo);
+          break;
+        }
+      }
+      lastDetectDeadTS = ts;
+    }
+
+    state = State.IDLE;
+  }
+
   private void idle() {
     try {
       Thread.sleep(IDLE_SLEEP_MS);
@@ -113,11 +358,11 @@ public class DeadNodeDetector implements Runnable {
 
     }
 
-    state = State.IDLE;
+    state = State.CHECK_DEAD;
   }
 
   private void init() {
-    state = State.IDLE;
+    state = State.CHECK_DEAD;
   }
 
   private void addToDead(DatanodeInfo datanodeInfo) {
@@ -128,6 +373,14 @@ public class DeadNodeDetector implements Runnable {
     return deadNodes.containsKey(datanodeInfo.getDatanodeUuid());
   }
 
+  private void removeFromDead(DatanodeInfo datanodeInfo) {
+    deadNodes.remove(datanodeInfo.getDatanodeUuid());
+  }
+
+  public Queue<DatanodeInfo> getDeadNodesProbeQueue() {
+    return deadNodesProbeQueue;
+  }
+
   /**
    * Add datanode in deadNodes and dfsInputStreamNodes. The node is considered
    * to dead node. The dead node is shared by all the DFSInputStreams in the
@@ -182,4 +435,49 @@ public class DeadNodeDetector implements Runnable {
       }
     }
   }
+
+  /**
+   * Remove dead node from dfsInputStreamNodes#dfsInputStream and deadNodes.
+   */
+  public synchronized void removeNodeFromDeadNode(DatanodeInfo datanodeInfo) {
+    for (Map.Entry<DFSInputStream, HashSet<DatanodeInfo>> entry :
+            dfsInputStreamNodes.entrySet()) {
+      Set<DatanodeInfo> datanodeInfos = entry.getValue();
+      if (datanodeInfos.remove(datanodeInfo)) {
+        DFSInputStream dfsInputStream = entry.getKey();
+        dfsInputStream.removeFromLocalDeadNodes(datanodeInfo);
+      }
+    }
+
+    removeFromDead(datanodeInfo);
+  }
+
+  private static void probeSleep(long time) {
+    try {
+      Thread.sleep(time);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
+   * Schedule probe data node.
+   */
+  static class ProbeScheduler implements Runnable {
+    private DeadNodeDetector deadNodeDetector;
+    private ProbeType type;
+
+    ProbeScheduler(DeadNodeDetector deadNodeDetector, ProbeType type) {
+      this.deadNodeDetector = deadNodeDetector;
+      this.type = type;
+    }
+
+    @Override
+    public void run() {
+      while (true) {
+        deadNodeDetector.scheduleProbe(type);
+        probeSleep(deadNodeDetector.deadNodeDetectInterval);
+      }
+    }
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index e32a3bb..4218cc0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -156,6 +156,28 @@ public interface HdfsClientConfigKeys {
           "dfs.client.deadnode.detection.enabled";
   boolean DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT = false;
 
+  String DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY =
+      "dfs.client.deadnode.detection.deadnode.queue.max";
+  int DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT = 100;
+
+  String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY =
+      "dfs.client.deadnode.detection.probe.connection.timeout.ms";
+  long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT =
+      20000;
+
+  String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY =
+      "dfs.client.deadnode.detection.probe.deadnode.threads";
+  int DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT = 10;
+
+  String DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY =
+      "dfs.client.deadnode.detection.rpc.threads";
+  int DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT = 20;
+
+  String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY =
+      "dfs.client.deadnode.detection.probe.deadnode.interval.ms";
+  long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT =
+      60 * 1000; // 60s
+
   String  DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
       "dfs.datanode.kerberos.principal";
   String  DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index d2103fb..684b617 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2996,6 +2996,46 @@
     </description>
   </property>
 
+  <property>
+    <name>dfs.client.deadnode.detection.deadnode.queue.max</name>
+    <value>100</value>
+    <description>
+      The max queue size of probing dead node.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.client.deadnode.detection.probe.deadnode.threads</name>
+    <value>10</value>
+    <description>
+      The maximum number of threads to use for probing dead node.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.client.deadnode.detection.rpc.threads</name>
+    <value>20</value>
+    <description>
+      The maximum number of threads to use for calling RPC call to recheck the 
liveness of dead node.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.client.deadnode.detection.probe.deadnode.interval.ms</name>
+    <value>60000</value>
+    <description>
+      Interval time in milliseconds for probing dead node behavior.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.client.deadnode.detection.probe.connection.timeout.ms</name>
+    <value>20000</value>
+    <description>
+      Connection timeout for probing dead node in milliseconds.
+    </description>
+  </property>
+
 <property>
   <name>dfs.namenode.lease-recheck-interval-ms</name>
   <value>2000</value>
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
index 9b997ab..da800f7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
@@ -17,19 +17,24 @@
  */
 package org.apache.hadoop.hdfs;
 
+import com.google.common.base.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -121,21 +126,7 @@ public class TestDeadNodeDetection {
 
     FileSystem fs = cluster.getFileSystem();
     Path filePath = new Path("/testDeadNodeMultipleDFSInputStream");
-
-    // 256 bytes data chunk for writes
-    byte[] bytes = new byte[256];
-    for (int index = 0; index < bytes.length; index++) {
-      bytes[index] = '0';
-    }
-
-    // File with a 512 bytes block size
-    FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512);
-
-    // Write a block to DN (2x256bytes).
-    out.write(bytes);
-    out.write(bytes);
-    out.hflush();
-    out.close();
+    createFile(fs, filePath);
 
     String datanodeUuid = cluster.getDataNodes().get(0).getDatanodeUuid();
     FSDataInputStream in1 = fs.open(filePath);
@@ -170,7 +161,7 @@ public class TestDeadNodeDetection {
     } finally {
       in1.close();
       in2.close();
-      fs.delete(new Path("/testDeadNodeMultipleDFSInputStream"), true);
+      deleteFile(fs, filePath);
       // check the dead node again here, the dead node is expected be removed
       assertEquals(0, dfsClient1.getDeadNodes(din1).size());
       assertEquals(0, dfsClient2.getDeadNodes(din2).size());
@@ -180,4 +171,135 @@ public class TestDeadNodeDetection {
           .clearAndGetDetectedDeadNodes().size());
     }
   }
+
+  @Test
+  public void testDeadNodeDetectionDeadNodeRecovery() throws Exception {
+    conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
+    
conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
+        1000);
+    // We'll be using a 512 bytes block size just for tests
+    // so making sure the checksum bytes too match it.
+    conf.setInt("io.bytes.per.checksum", 512);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitActive();
+
+    FileSystem fs = cluster.getFileSystem();
+    Path filePath = new Path("/testDeadNodeDetectionDeadNodeRecovery");
+    createFile(fs, filePath);
+
+    // Remove three DNs,
+    MiniDFSCluster.DataNodeProperties one = cluster.stopDataNode(0);
+    cluster.stopDataNode(0);
+    cluster.stopDataNode(0);
+
+    FSDataInputStream in = fs.open(filePath);
+    DFSInputStream din = (DFSInputStream) in.getWrappedStream();
+    DFSClient dfsClient = din.getDFSClient();
+    try {
+      try {
+        in.read();
+      } catch (BlockMissingException e) {
+      }
+
+      assertEquals(3, dfsClient.getDeadNodes(din).size());
+      assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+
+      cluster.restartDataNode(one, true);
+      waitForDeadNodeRecovery(din);
+      assertEquals(2, dfsClient.getDeadNodes(din).size());
+      assertEquals(2, dfsClient.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+    } finally {
+      in.close();
+      deleteFile(fs, filePath);
+      assertEquals(0, dfsClient.getDeadNodes(din).size());
+      assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+    }
+  }
+
+  @Test
+  public void testDeadNodeDetectionMaxDeadNodesProbeQueue() throws Exception {
+    conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
+    
conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
+        1000);
+    conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY, 1);
+
+    // We'll be using a 512 bytes block size just for tests
+    // so making sure the checksum bytes too match it.
+    conf.setInt("io.bytes.per.checksum", 512);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitActive();
+
+    FileSystem fs = cluster.getFileSystem();
+    Path filePath = new Path("testDeadNodeDetectionMaxDeadNodesProbeQueue");
+    createFile(fs, filePath);
+
+    // Remove three DNs,
+    cluster.stopDataNode(0);
+    cluster.stopDataNode(0);
+    cluster.stopDataNode(0);
+
+    FSDataInputStream in = fs.open(filePath);
+    DFSInputStream din = (DFSInputStream) in.getWrappedStream();
+    DFSClient dfsClient = din.getDFSClient();
+    try {
+      try {
+        in.read();
+      } catch (BlockMissingException e) {
+      }
+
+      Thread.sleep(1500);
+      Assert.assertTrue((dfsClient.getClientContext().getDeadNodeDetector()
+          .getDeadNodesProbeQueue().size()
+          + dfsClient.getDeadNodes(din).size()) <= 4);
+    } finally {
+      in.close();
+      deleteFile(fs, filePath);
+    }
+  }
+
+  private void createFile(FileSystem fs, Path filePath) throws IOException {
+    FSDataOutputStream out = null;
+    try {
+      // 256 bytes data chunk for writes
+      byte[] bytes = new byte[256];
+      for (int index = 0; index < bytes.length; index++) {
+        bytes[index] = '0';
+      }
+
+      // File with a 512 bytes block size
+      out = fs.create(filePath, true, 4096, (short) 3, 512);
+
+      // Write a block to all 3 DNs (2x256bytes).
+      out.write(bytes);
+      out.write(bytes);
+      out.hflush();
+
+    } finally {
+      out.close();
+    }
+  }
+
+  private void deleteFile(FileSystem fs, Path filePath) throws IOException {
+    fs.delete(filePath, true);
+  }
+
+  private void waitForDeadNodeRecovery(DFSInputStream din) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          if (din.getDFSClient().getDeadNodes(din).size() == 2) {
+            return true;
+          }
+        } catch (Exception e) {
+          // Ignore the exception
+        }
+
+        return false;
+      }
+    }, 5000, 100000);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to