This is an automated email from the ASF dual-hosted git repository. yqlin pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new c8bef4d HDFS-14649. Add suspect probe for DeadNodeDetector. Contributed by Lisheng Sun. c8bef4d is described below commit c8bef4d6a6d7d5affd00cff6ea4a2e2ef778050e Author: Yiqun Lin <yq...@apache.org> AuthorDate: Wed Nov 27 10:57:20 2019 +0800 HDFS-14649. Add suspect probe for DeadNodeDetector. Contributed by Lisheng Sun. --- .../org/apache/hadoop/hdfs/DeadNodeDetector.java | 169 +++++++++++++++++---- .../hadoop/hdfs/client/HdfsClientConfigKeys.java | 13 ++ .../src/main/resources/hdfs-default.xml | 24 +++ .../apache/hadoop/hdfs/TestDeadNodeDetection.java | 104 +++++++++++-- 4 files changed, 264 insertions(+), 46 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java index 2fe7cf8..ce50547 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -48,8 +49,14 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; /** @@ -83,13 +90,13 @@ public class DeadNodeDetector implements Runnable { private final Map<String, DatanodeInfo> deadNodes; /** - * Record dead nodes by one DFSInputStream. When dead node is not used by one - * DFSInputStream, remove it from dfsInputStreamNodes#DFSInputStream. If - * DFSInputStream does not include any dead node, remove DFSInputStream from - * dfsInputStreamNodes. + * Record suspect and dead nodes by one DFSInputStream. When node is not used + * by one DFSInputStream, remove it from suspectAndDeadNodes#DFSInputStream. + * If DFSInputStream does not include any node, remove DFSInputStream from + * suspectAndDeadNodes. */ private final Map<DFSInputStream, HashSet<DatanodeInfo>> - dfsInputStreamNodes; + suspectAndDeadNodes; /** * Datanodes that is being probed. @@ -108,11 +115,21 @@ public class DeadNodeDetector implements Runnable { private long deadNodeDetectInterval = 0; /** + * Interval time in milliseconds for probing suspect node behavior. + */ + private long suspectNodeDetectInterval = 0; + + /** * The max queue size of probing dead node. */ private int maxDeadNodesProbeQueueLen = 0; /** + * The max queue size of probing suspect node. + */ + private int maxSuspectNodesProbeQueueLen; + + /** * Connection timeout for probing dead node in milliseconds. */ private long probeConnectionTimeoutMs; @@ -123,16 +140,31 @@ public class DeadNodeDetector implements Runnable { private Queue<DatanodeInfo> deadNodesProbeQueue; /** + * The suspect node probe queue. + */ + private Queue<DatanodeInfo> suspectNodesProbeQueue; + + /** * The thread pool of probing dead node. */ private ExecutorService probeDeadNodesThreadPool; /** + * The thread pool of probing suspect node. + */ + private ExecutorService probeSuspectNodesThreadPool; + + /** * The scheduler thread of probing dead node. */ private Thread probeDeadNodesSchedulerThr; /** + * The scheduler thread of probing suspect node. + */ + private Thread probeSuspectNodesSchedulerThr; + + /** * The thread pool of probing datanodes' rpc request. Sometimes the data node * can hang and not respond to the client in a short time. And these node will * filled with probe thread pool and block other normal node probing. @@ -145,7 +177,7 @@ public class DeadNodeDetector implements Runnable { * The type of probe. */ private enum ProbeType { - CHECK_DEAD + CHECK_DEAD, CHECK_SUSPECT } /** @@ -155,41 +187,61 @@ public class DeadNodeDetector implements Runnable { INIT, CHECK_DEAD, IDLE, ERROR } + /** + * Disabled start probe suspect/dead thread for the testing. + */ + private static volatile boolean disabledProbeThreadForTest = false; + private State state; public DeadNodeDetector(String name, Configuration conf) { this.conf = new Configuration(conf); this.deadNodes = new ConcurrentHashMap<String, DatanodeInfo>(); - this.dfsInputStreamNodes = + this.suspectAndDeadNodes = new ConcurrentHashMap<DFSInputStream, HashSet<DatanodeInfo>>(); this.name = name; deadNodeDetectInterval = conf.getLong( DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY, DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT); + suspectNodeDetectInterval = conf.getLong( + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT); socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT); maxDeadNodesProbeQueueLen = conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY, DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT); + maxSuspectNodesProbeQueueLen = + conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT); probeConnectionTimeoutMs = conf.getLong( DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY, DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT); this.deadNodesProbeQueue = new ArrayBlockingQueue<DatanodeInfo>(maxDeadNodesProbeQueueLen); + this.suspectNodesProbeQueue = + new ArrayBlockingQueue<DatanodeInfo>(maxSuspectNodesProbeQueueLen); int deadNodeDetectDeadThreads = conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY, DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT); + int suspectNodeDetectDeadThreads = conf.getInt( + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_DEFAULT); int rpcThreads = conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY, DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT); probeDeadNodesThreadPool = Executors.newFixedThreadPool( deadNodeDetectDeadThreads, new Daemon.DaemonFactory()); + probeSuspectNodesThreadPool = Executors.newFixedThreadPool( + suspectNodeDetectDeadThreads, new Daemon.DaemonFactory()); rpcThreadPool = Executors.newFixedThreadPool(rpcThreads, new Daemon.DaemonFactory()); - startProbeScheduler(); + if (!disabledProbeThreadForTest) { + startProbeScheduler(); + } LOG.info("Start dead node detector for DFSClient {}.", this.name); state = State.INIT; @@ -223,14 +275,25 @@ public class DeadNodeDetector implements Runnable { } } + @VisibleForTesting + static void disabledProbeThreadForTest() { + disabledProbeThreadForTest = true; + } + /** - * Start probe dead node thread. + * Start probe dead node and suspect node thread. */ - private void startProbeScheduler() { + @VisibleForTesting + void startProbeScheduler() { probeDeadNodesSchedulerThr = new Thread(new ProbeScheduler(this, ProbeType.CHECK_DEAD)); probeDeadNodesSchedulerThr.setDaemon(true); probeDeadNodesSchedulerThr.start(); + + probeSuspectNodesSchedulerThr = + new Thread(new ProbeScheduler(this, ProbeType.CHECK_SUSPECT)); + probeSuspectNodesSchedulerThr.setDaemon(true); + probeSuspectNodesSchedulerThr.start(); } /** @@ -250,6 +313,15 @@ public class DeadNodeDetector implements Runnable { Probe probe = new Probe(this, datanodeInfo, ProbeType.CHECK_DEAD); probeDeadNodesThreadPool.execute(probe); } + } else if (type == ProbeType.CHECK_SUSPECT) { + while ((datanodeInfo = suspectNodesProbeQueue.poll()) != null) { + if (probeInProg.containsKey(datanodeInfo.getDatanodeUuid())) { + continue; + } + probeInProg.put(datanodeInfo.getDatanodeUuid(), datanodeInfo); + Probe probe = new Probe(this, datanodeInfo, ProbeType.CHECK_SUSPECT); + probeSuspectNodesThreadPool.execute(probe); + } } } @@ -263,7 +335,7 @@ public class DeadNodeDetector implements Runnable { private ProbeType type; Probe(DeadNodeDetector deadNodeDetector, DatanodeInfo datanodeInfo, - ProbeType type) { + ProbeType type) { this.deadNodeDetector = deadNodeDetector; this.datanodeInfo = datanodeInfo; this.type = type; @@ -323,9 +395,19 @@ public class DeadNodeDetector implements Runnable { probeInProg.remove(probe.getDatanodeInfo().getDatanodeUuid()); if (success) { if (probe.getType() == ProbeType.CHECK_DEAD) { - LOG.info("Remove the node out from dead node list: {}. ", + LOG.info("Remove the node out from dead node list: {}.", + probe.getDatanodeInfo()); + removeDeadNode(probe.getDatanodeInfo()); + } else if (probe.getType() == ProbeType.CHECK_SUSPECT) { + LOG.debug("Remove the node out from suspect node list: {}.", + probe.getDatanodeInfo()); + removeNodeFromDeadNodeDetector(probe.getDatanodeInfo()); + } + } else { + if (probe.getType() == ProbeType.CHECK_SUSPECT) { + LOG.info("Add the node to dead node list: {}.", probe.getDatanodeInfo()); - removeNodeFromDeadNode(probe.getDatanodeInfo()); + addToDead(probe.getDatanodeInfo()); } } } @@ -381,34 +463,43 @@ public class DeadNodeDetector implements Runnable { return deadNodesProbeQueue; } + public Queue<DatanodeInfo> getSuspectNodesProbeQueue() { + return suspectNodesProbeQueue; + } + /** - * Add datanode in deadNodes and dfsInputStreamNodes. The node is considered - * to dead node. The dead node is shared by all the DFSInputStreams in the - * same client. + * Add datanode to suspectNodes and suspectAndDeadNodes. */ public synchronized void addNodeToDetect(DFSInputStream dfsInputStream, DatanodeInfo datanodeInfo) { HashSet<DatanodeInfo> datanodeInfos = - dfsInputStreamNodes.get(dfsInputStream); + suspectAndDeadNodes.get(dfsInputStream); if (datanodeInfos == null) { datanodeInfos = new HashSet<DatanodeInfo>(); datanodeInfos.add(datanodeInfo); - dfsInputStreamNodes.putIfAbsent(dfsInputStream, datanodeInfos); + suspectAndDeadNodes.putIfAbsent(dfsInputStream, datanodeInfos); } else { datanodeInfos.add(datanodeInfo); } - addToDead(datanodeInfo); + addSuspectNodeToDetect(datanodeInfo); } /** - * Remove dead node which is not used by any DFSInputStream from deadNodes. - * @return new dead node shared by all DFSInputStreams. + * Add datanode to suspectNodes. */ + private boolean addSuspectNodeToDetect(DatanodeInfo datanodeInfo) { + return suspectNodesProbeQueue.offer(datanodeInfo); + } + + /** + * Remove dead node which is not used by any DFSInputStream from deadNodes. + * @return new dead node shared by all DFSInputStreams. + */ public synchronized Set<DatanodeInfo> clearAndGetDetectedDeadNodes() { // remove the dead nodes who doesn't have any inputstream first Set<DatanodeInfo> newDeadNodes = new HashSet<DatanodeInfo>(); - for (HashSet<DatanodeInfo> datanodeInfos : dfsInputStreamNodes.values()) { + for (HashSet<DatanodeInfo> datanodeInfos : suspectAndDeadNodes.values()) { newDeadNodes.addAll(datanodeInfos); } @@ -421,34 +512,46 @@ public class DeadNodeDetector implements Runnable { } /** - * Remove dead node from dfsInputStreamNodes#dfsInputStream. If - * dfsInputStreamNodes#dfsInputStream does not contain any dead node, remove - * it from dfsInputStreamNodes. + * Remove suspect and dead node from suspectAndDeadNodes#dfsInputStream and + * local deadNodes. */ public synchronized void removeNodeFromDeadNodeDetector( DFSInputStream dfsInputStream, DatanodeInfo datanodeInfo) { - Set<DatanodeInfo> datanodeInfos = dfsInputStreamNodes.get(dfsInputStream); + Set<DatanodeInfo> datanodeInfos = suspectAndDeadNodes.get(dfsInputStream); if (datanodeInfos != null) { datanodeInfos.remove(datanodeInfo); + dfsInputStream.removeFromLocalDeadNodes(datanodeInfo); if (datanodeInfos.isEmpty()) { - dfsInputStreamNodes.remove(dfsInputStream); + suspectAndDeadNodes.remove(dfsInputStream); } } } /** - * Remove dead node from dfsInputStreamNodes#dfsInputStream and deadNodes. + * Remove suspect and dead node from suspectAndDeadNodes#dfsInputStream and + * local deadNodes. */ - public synchronized void removeNodeFromDeadNode(DatanodeInfo datanodeInfo) { + private synchronized void removeNodeFromDeadNodeDetector( + DatanodeInfo datanodeInfo) { for (Map.Entry<DFSInputStream, HashSet<DatanodeInfo>> entry : - dfsInputStreamNodes.entrySet()) { + suspectAndDeadNodes.entrySet()) { Set<DatanodeInfo> datanodeInfos = entry.getValue(); if (datanodeInfos.remove(datanodeInfo)) { DFSInputStream dfsInputStream = entry.getKey(); dfsInputStream.removeFromLocalDeadNodes(datanodeInfo); + if (datanodeInfos.isEmpty()) { + suspectAndDeadNodes.remove(dfsInputStream); + } } } + } + /** + * Remove suspect and dead node from suspectAndDeadNodes#dfsInputStream and + * deadNodes. + */ + private void removeDeadNode(DatanodeInfo datanodeInfo) { + removeNodeFromDeadNodeDetector(datanodeInfo); removeFromDead(datanodeInfo); } @@ -476,7 +579,11 @@ public class DeadNodeDetector implements Runnable { public void run() { while (true) { deadNodeDetector.scheduleProbe(type); - probeSleep(deadNodeDetector.deadNodeDetectInterval); + if (type == ProbeType.CHECK_SUSPECT) { + probeSleep(deadNodeDetector.suspectNodeDetectInterval); + } else { + probeSleep(deadNodeDetector.deadNodeDetectInterval); + } } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 4218cc0..fec3958 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -160,6 +160,10 @@ public interface HdfsClientConfigKeys { "dfs.client.deadnode.detection.deadnode.queue.max"; int DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT = 100; + String DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY = + "dfs.client.deadnode.detection.suspectnode.queue.max"; + int DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT = 1000; + String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY = "dfs.client.deadnode.detection.probe.connection.timeout.ms"; long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT = @@ -169,6 +173,10 @@ public interface HdfsClientConfigKeys { "dfs.client.deadnode.detection.probe.deadnode.threads"; int DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT = 10; + String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY = + "dfs.client.deadnode.detection.probe.suspectnode.threads"; + int DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_DEFAULT = 10; + String DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY = "dfs.client.deadnode.detection.rpc.threads"; int DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT = 20; @@ -178,6 +186,11 @@ public interface HdfsClientConfigKeys { long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT = 60 * 1000; // 60s + String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY = + "dfs.client.deadnode.detection.probe.suspectnode.interval.ms"; + long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT = + 300; // 300ms + String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = "dfs.datanode.kerberos.principal"; String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 684b617..a7e91f1 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3005,6 +3005,14 @@ </property> <property> + <name>dfs.client.deadnode.detection.suspectnode.queue.max</name> + <value>1000</value> + <description> + The max queue size of probing suspect node. + </description> + </property> + + <property> <name>dfs.client.deadnode.detection.probe.deadnode.threads</name> <value>10</value> <description> @@ -3013,6 +3021,14 @@ </property> <property> + <name>dfs.client.deadnode.detection.probe.suspectnode.threads</name> + <value>10</value> + <description> + The maximum number of threads to use for probing suspect node. + </description> + </property> + + <property> <name>dfs.client.deadnode.detection.rpc.threads</name> <value>20</value> <description> @@ -3029,6 +3045,14 @@ </property> <property> + <name>dfs.client.deadnode.detection.probe.suspectnode.interval.ms</name> + <value>300</value> + <description> + Interval time in milliseconds for probing suspect node behavior. + </description> + </property> + + <property> <name>dfs.client.deadnode.detection.probe.connection.timeout.ms</name> <value>20000</value> <description> diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java index da800f7..58f6d5d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java @@ -35,6 +35,8 @@ import java.io.IOException; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY; import static org.junit.Assert.assertEquals; /** @@ -59,10 +61,15 @@ public class TestDeadNodeDetection { } @Test - public void testDeadNodeDetectionInBackground() throws IOException { + public void testDeadNodeDetectionInBackground() throws Exception { + conf = new HdfsConfiguration(); conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true); + conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY, + 1000); + conf.setLong( + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY, 100); // We'll be using a 512 bytes block size just for tests - // so making sure the checksum bytes too match it. + // so making sure the checksum bytes match it too. conf.setInt("io.bytes.per.checksum", 512); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster.waitActive(); @@ -91,22 +98,21 @@ public class TestDeadNodeDetection { cluster.stopDataNode(0); FSDataInputStream in = fs.open(filePath); - DFSInputStream din = null; - DFSClient dfsClient = null; + DFSInputStream din = (DFSInputStream) in.getWrappedStream(); + DFSClient dfsClient = din.getDFSClient(); try { try { in.read(); } catch (BlockMissingException e) { } - din = (DFSInputStream) in.getWrappedStream(); - dfsClient = din.getDFSClient(); + waitForDeadNode(dfsClient, 3); assertEquals(3, dfsClient.getDeadNodes(din).size()); assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector() .clearAndGetDetectedDeadNodes().size()); } finally { in.close(); - fs.delete(new Path("/testDetectDeadNodeInBackground"), true); + fs.delete(filePath, true); // check the dead node again here, the dead node is expected be removed assertEquals(0, dfsClient.getDeadNodes(din).size()); assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector() @@ -119,7 +125,7 @@ public class TestDeadNodeDetection { throws IOException { conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true); // We'll be using a 512 bytes block size just for tests - // so making sure the checksum bytes too match it. + // so making sure the checksum bytes match it too. conf.setInt("io.bytes.per.checksum", 512); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); @@ -178,7 +184,7 @@ public class TestDeadNodeDetection { conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY, 1000); // We'll be using a 512 bytes block size just for tests - // so making sure the checksum bytes too match it. + // so making sure the checksum bytes match it too. conf.setInt("io.bytes.per.checksum", 512); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster.waitActive(); @@ -201,12 +207,13 @@ public class TestDeadNodeDetection { } catch (BlockMissingException e) { } + waitForDeadNode(dfsClient, 3); assertEquals(3, dfsClient.getDeadNodes(din).size()); assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector() .clearAndGetDetectedDeadNodes().size()); cluster.restartDataNode(one, true); - waitForDeadNodeRecovery(din); + waitForDeadNode(dfsClient, 2); assertEquals(2, dfsClient.getDeadNodes(din).size()); assertEquals(2, dfsClient.getClientContext().getDeadNodeDetector() .clearAndGetDetectedDeadNodes().size()); @@ -225,15 +232,14 @@ public class TestDeadNodeDetection { conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY, 1000); conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY, 1); - // We'll be using a 512 bytes block size just for tests - // so making sure the checksum bytes too match it. + // so making sure the checksum bytes match it too. conf.setInt("io.bytes.per.checksum", 512); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster.waitActive(); FileSystem fs = cluster.getFileSystem(); - Path filePath = new Path("testDeadNodeDetectionMaxDeadNodesProbeQueue"); + Path filePath = new Path("/testDeadNodeDetectionMaxDeadNodesProbeQueue"); createFile(fs, filePath); // Remove three DNs, @@ -260,6 +266,55 @@ public class TestDeadNodeDetection { } } + @Test + public void testDeadNodeDetectionSuspectNode() throws Exception { + conf = new HdfsConfiguration(); + conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true); + conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY, 1); + // We'll be using a 512 bytes block size just for tests + // so making sure the checksum bytes match it too. + conf.setInt("io.bytes.per.checksum", 512); + DeadNodeDetector.disabledProbeThreadForTest(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + + FileSystem fs = cluster.getFileSystem(); + Path filePath = new Path("/testDeadNodeDetectionSuspectNode"); + createFile(fs, filePath); + + MiniDFSCluster.DataNodeProperties one = cluster.stopDataNode(0); + + FSDataInputStream in = fs.open(filePath); + DFSInputStream din = (DFSInputStream) in.getWrappedStream(); + DFSClient dfsClient = din.getDFSClient(); + DeadNodeDetector deadNodeDetector = + dfsClient.getClientContext().getDeadNodeDetector(); + try { + try { + in.read(); + } catch (BlockMissingException e) { + } + waitForSuspectNode(din.getDFSClient()); + cluster.restartDataNode(one, true); + Assert.assertEquals(1, + deadNodeDetector.getSuspectNodesProbeQueue().size()); + Assert.assertEquals(0, + deadNodeDetector.clearAndGetDetectedDeadNodes().size()); + deadNodeDetector.startProbeScheduler(); + Thread.sleep(1000); + Assert.assertEquals(0, + deadNodeDetector.getSuspectNodesProbeQueue().size()); + Assert.assertEquals(0, + deadNodeDetector.clearAndGetDetectedDeadNodes().size()); + } finally { + in.close(); + deleteFile(fs, filePath); + assertEquals(0, dfsClient.getDeadNodes(din).size()); + assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + } + } + private void createFile(FileSystem fs, Path filePath) throws IOException { FSDataOutputStream out = null; try { @@ -286,12 +341,31 @@ public class TestDeadNodeDetection { fs.delete(filePath, true); } - private void waitForDeadNodeRecovery(DFSInputStream din) throws Exception { + private void waitForDeadNode(DFSClient dfsClient, int size) throws Exception { + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + try { + if (dfsClient.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size() == size) { + return true; + } + } catch (Exception e) { + // Ignore the exception + } + + return false; + } + }, 5000, 100000); + } + + private void waitForSuspectNode(DFSClient dfsClient) throws Exception { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { try { - if (din.getDFSClient().getDeadNodes(din).size() == 2) { + if (dfsClient.getClientContext().getDeadNodeDetector() + .getSuspectNodesProbeQueue().size() > 0) { return true; } } catch (Exception e) { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org