This is an automated email from the ASF dual-hosted git repository. weichiu pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit a8c0083b1bea830454bfee37805c67828cd05f4f Author: sunlisheng <sunlish...@apache.org> AuthorDate: Mon Mar 15 11:34:13 2021 +0800 HDFS-15809. DeadNodeDetector does not remove live nodes from dead node set. Contributed by Jinglun. (cherry picked from commit 7025f39944e628345109b43cba2cd4d49ca8cc6b) --- .../org/apache/hadoop/hdfs/DeadNodeDetector.java | 85 +++++++++++++--------- .../hadoop/hdfs/client/HdfsClientConfigKeys.java | 10 +-- .../src/main/resources/hdfs-default.xml | 20 ++--- .../apache/hadoop/hdfs/TestDeadNodeDetection.java | 71 ++++++++++++------ 4 files changed, 107 insertions(+), 79 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java index e17f261..cd46551 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java @@ -29,9 +29,9 @@ import org.slf4j.LoggerFactory; import java.util.HashSet; import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.Deque; +import java.util.LinkedList; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -40,8 +40,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT; @@ -54,9 +52,9 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_DEFAULT; /** * Detect the dead nodes in advance, and share this information among all the @@ -74,7 +72,7 @@ public class DeadNodeDetector extends Daemon { /** * Waiting time when DeadNodeDetector's state is idle. */ - private static final long IDLE_SLEEP_MS = 10000; + private final long idleSleepMs; /** * Client context name. @@ -114,16 +112,6 @@ public class DeadNodeDetector extends Daemon { private long suspectNodeDetectInterval = 0; /** - * The max queue size of probing dead node. - */ - private int maxDeadNodesProbeQueueLen = 0; - - /** - * The max queue size of probing suspect node. - */ - private int maxSuspectNodesProbeQueueLen; - - /** * Connection timeout for probing dead node in milliseconds. */ private long probeConnectionTimeoutMs; @@ -131,12 +119,12 @@ public class DeadNodeDetector extends Daemon { /** * The dead node probe queue. */ - private Queue<DatanodeInfo> deadNodesProbeQueue; + private UniqueQueue<DatanodeInfo> deadNodesProbeQueue; /** * The suspect node probe queue. */ - private Queue<DatanodeInfo> suspectNodesProbeQueue; + private UniqueQueue<DatanodeInfo> suspectNodesProbeQueue; /** * The thread pool of probing dead node. @@ -182,6 +170,32 @@ public class DeadNodeDetector extends Daemon { } /** + * The thread safe unique queue. + */ + static class UniqueQueue<T> { + private Deque<T> queue = new LinkedList<>(); + private Set<T> set = new HashSet<>(); + + synchronized boolean offer(T dn) { + if (set.add(dn)) { + queue.addLast(dn); + return true; + } + return false; + } + + synchronized T poll() { + T dn = queue.pollFirst(); + set.remove(dn); + return dn; + } + + synchronized int size() { + return set.size(); + } + } + + /** * Disabled start probe suspect/dead thread for the testing. */ private static volatile boolean disabledProbeThreadForTest = false; @@ -203,20 +217,14 @@ public class DeadNodeDetector extends Daemon { DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT); socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT); - maxDeadNodesProbeQueueLen = - conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY, - DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT); - maxSuspectNodesProbeQueueLen = - conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY, - DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT); probeConnectionTimeoutMs = conf.getLong( DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY, DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT); + this.deadNodesProbeQueue = new UniqueQueue<>(); + this.suspectNodesProbeQueue = new UniqueQueue<>(); - this.deadNodesProbeQueue = - new ArrayBlockingQueue<DatanodeInfo>(maxDeadNodesProbeQueueLen); - this.suspectNodesProbeQueue = - new ArrayBlockingQueue<DatanodeInfo>(maxSuspectNodesProbeQueueLen); + idleSleepMs = conf.getLong(DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_DEFAULT); int deadNodeDetectDeadThreads = conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY, @@ -447,8 +455,7 @@ public class DeadNodeDetector extends Daemon { for (DatanodeInfo datanodeInfo : datanodeInfos) { if (!deadNodesProbeQueue.offer(datanodeInfo)) { LOG.debug("Skip to add dead node {} to check " + - "since the probe queue is full.", datanodeInfo); - break; + "since the node is already in the probe queue.", datanodeInfo); } else { LOG.debug("Add dead node to check: {}.", datanodeInfo); } @@ -458,7 +465,7 @@ public class DeadNodeDetector extends Daemon { private void idle() { try { - Thread.sleep(IDLE_SLEEP_MS); + Thread.sleep(idleSleepMs); } catch (InterruptedException e) { LOG.debug("Got interrupted while DeadNodeDetector is idle.", e); Thread.currentThread().interrupt(); @@ -483,14 +490,24 @@ public class DeadNodeDetector extends Daemon { deadNodes.remove(datanodeInfo.getDatanodeUuid()); } - public Queue<DatanodeInfo> getDeadNodesProbeQueue() { + public UniqueQueue<DatanodeInfo> getDeadNodesProbeQueue() { return deadNodesProbeQueue; } - public Queue<DatanodeInfo> getSuspectNodesProbeQueue() { + public UniqueQueue<DatanodeInfo> getSuspectNodesProbeQueue() { return suspectNodesProbeQueue; } + @VisibleForTesting + void setSuspectQueue(UniqueQueue<DatanodeInfo> queue) { + this.suspectNodesProbeQueue = queue; + } + + @VisibleForTesting + void setDeadQueue(UniqueQueue<DatanodeInfo> queue) { + this.deadNodesProbeQueue = queue; + } + /** * Add datanode to suspectNodes and suspectAndDeadNodes. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 0c35c8d..aef3102 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -161,13 +161,9 @@ public interface HdfsClientConfigKeys { "dfs.client.deadnode.detection.enabled"; boolean DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT = false; - String DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY = - "dfs.client.deadnode.detection.deadnode.queue.max"; - int DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT = 100; - - String DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY = - "dfs.client.deadnode.detection.suspectnode.queue.max"; - int DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT = 1000; + String DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY = + "dfs.client.deadnode.detection.idle.sleep.ms"; + long DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_DEFAULT = 10000; String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY = "dfs.client.deadnode.detection.probe.connection.timeout.ms"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index ca1618f..6424265 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3120,26 +3120,18 @@ </property> <property> - <name>dfs.client.deadnode.detection.deadnode.queue.max</name> - <value>100</value> - <description> - The max queue size of probing dead node. - </description> - </property> - - <property> - <name>dfs.client.deadnode.detection.suspectnode.queue.max</name> - <value>1000</value> + <name>dfs.client.deadnode.detection.probe.deadnode.threads</name> + <value>10</value> <description> - The max queue size of probing suspect node. + The maximum number of threads to use for probing dead node. </description> </property> <property> - <name>dfs.client.deadnode.detection.probe.deadnode.threads</name> - <value>10</value> + <name>dfs.client.deadnode.detection.idle.sleep.ms</name> + <value>10000</value> <description> - The maximum number of threads to use for probing dead node. + The sleep time of DeadNodeDetector per iteration. </description> </property> diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java index 9134f36..e8da918 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java @@ -30,19 +30,20 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import java.io.IOException; +import java.util.Collection; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertNotSame; @@ -73,6 +74,7 @@ public class TestDeadNodeDetection { DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY, 1000); conf.setInt(DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 0); + conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY, 100); } @After @@ -247,42 +249,63 @@ public class TestDeadNodeDetection { } @Test - public void testDeadNodeDetectionMaxDeadNodesProbeQueue() throws Exception { - conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY, 1); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); - cluster.waitActive(); - - FileSystem fs = cluster.getFileSystem(); - Path filePath = new Path("/testDeadNodeDetectionMaxDeadNodesProbeQueue"); - createFile(fs, filePath); - - // Remove three DNs, - cluster.stopDataNode(0); - cluster.stopDataNode(0); - cluster.stopDataNode(0); - - FSDataInputStream in = fs.open(filePath); - DFSInputStream din = (DFSInputStream) in.getWrappedStream(); - DFSClient dfsClient = din.getDFSClient(); + public void testDeadNodeDetectionDeadNodeProbe() throws Exception { + FileSystem fs = null; + FSDataInputStream in = null; + Path filePath = new Path("/" + GenericTestUtils.getMethodName()); try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + + fs = cluster.getFileSystem(); + createFile(fs, filePath); + + // Remove three DNs, + cluster.stopDataNode(0); + cluster.stopDataNode(0); + cluster.stopDataNode(0); + + in = fs.open(filePath); + DFSInputStream din = (DFSInputStream) in.getWrappedStream(); + DFSClient dfsClient = din.getDFSClient(); + DeadNodeDetector deadNodeDetector = + dfsClient.getClientContext().getDeadNodeDetector(); + // Spy suspect queue and dead queue. + DeadNodeDetector.UniqueQueue<DatanodeInfo> queue = + deadNodeDetector.getSuspectNodesProbeQueue(); + DeadNodeDetector.UniqueQueue<DatanodeInfo> suspectSpy = + Mockito.spy(queue); + deadNodeDetector.setSuspectQueue(suspectSpy); + queue = deadNodeDetector.getDeadNodesProbeQueue(); + DeadNodeDetector.UniqueQueue<DatanodeInfo> deadSpy = Mockito.spy(queue); + deadNodeDetector.setDeadQueue(deadSpy); + // Trigger dead node detection. try { in.read(); } catch (BlockMissingException e) { } Thread.sleep(1500); - Assert.assertTrue((dfsClient.getClientContext().getDeadNodeDetector() - .getDeadNodesProbeQueue().size() - + dfsClient.getDeadNodes(din).size()) <= 4); + Collection<DatanodeInfo> deadNodes = + dfsClient.getDeadNodeDetector().clearAndGetDetectedDeadNodes(); + assertEquals(3, deadNodes.size()); + for (DatanodeInfo dead : deadNodes) { + // Each node is suspected once then marked as dead. + Mockito.verify(suspectSpy, Mockito.times(1)).offer(dead); + // All the dead nodes should be scheduled and probed at least once. + Mockito.verify(deadSpy, Mockito.atLeastOnce()).offer(dead); + Mockito.verify(deadSpy, Mockito.atLeastOnce()).poll(); + } } finally { - in.close(); + if (in != null) { + in.close(); + } deleteFile(fs, filePath); } } @Test public void testDeadNodeDetectionSuspectNode() throws Exception { - conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY, 1); DeadNodeDetector.setDisabledProbeThreadForTest(true); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org