This is an automated email from the ASF dual-hosted git repository. yqlin pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 9b6906f HDFS-14651. DeadNodeDetector checks dead node periodically. Contributed by Lisheng Sun. 9b6906f is described below commit 9b6906fe914829f50076c2291dba59d425475d7b Author: Yiqun Lin <yq...@apache.org> AuthorDate: Fri Nov 22 10:53:55 2019 +0800 HDFS-14651. DeadNodeDetector checks dead node periodically. Contributed by Lisheng Sun. --- .../java/org/apache/hadoop/hdfs/ClientContext.java | 2 +- .../org/apache/hadoop/hdfs/DeadNodeDetector.java | 308 ++++++++++++++++++++- .../hadoop/hdfs/client/HdfsClientConfigKeys.java | 22 ++ .../src/main/resources/hdfs-default.xml | 40 +++ .../apache/hadoop/hdfs/TestDeadNodeDetection.java | 154 +++++++++-- 5 files changed, 504 insertions(+), 22 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index abb039c..6ee5277 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -150,7 +150,7 @@ public class ClientContext { conf.getWriteByteArrayManagerConf()); this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled(); if (deadNodeDetectionEnabled && deadNodeDetector == null) { - deadNodeDetector = new DeadNodeDetector(name); + deadNodeDetector = new DeadNodeDetector(name, config); deadNodeDetectorThr = new Daemon(deadNodeDetector); deadNodeDetectorThr.start(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java index 1ac29a7..2fe7cf8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java @@ -17,13 +17,40 @@ */ package org.apache.hadoop.hdfs; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashSet; +import java.util.Map; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; /** * Detect the dead nodes in advance, and share this information among all the @@ -48,10 +75,12 @@ public class DeadNodeDetector implements Runnable { */ private String name; + private Configuration conf; + /** * Dead nodes shared by all the DFSInputStreams of the client. */ - private final ConcurrentHashMap<String, DatanodeInfo> deadNodes; + private final Map<String, DatanodeInfo> deadNodes; /** * Record dead nodes by one DFSInputStream. When dead node is not used by one @@ -59,10 +88,67 @@ public class DeadNodeDetector implements Runnable { * DFSInputStream does not include any dead node, remove DFSInputStream from * dfsInputStreamNodes. */ - private final ConcurrentHashMap<DFSInputStream, HashSet<DatanodeInfo>> + private final Map<DFSInputStream, HashSet<DatanodeInfo>> dfsInputStreamNodes; /** + * Datanodes that is being probed. + */ + private Map<String, DatanodeInfo> probeInProg = + new ConcurrentHashMap<String, DatanodeInfo>(); + + /** + * The last time when detect dead node. + */ + private long lastDetectDeadTS = 0; + + /** + * Interval time in milliseconds for probing dead node behavior. + */ + private long deadNodeDetectInterval = 0; + + /** + * The max queue size of probing dead node. + */ + private int maxDeadNodesProbeQueueLen = 0; + + /** + * Connection timeout for probing dead node in milliseconds. + */ + private long probeConnectionTimeoutMs; + + /** + * The dead node probe queue. + */ + private Queue<DatanodeInfo> deadNodesProbeQueue; + + /** + * The thread pool of probing dead node. + */ + private ExecutorService probeDeadNodesThreadPool; + + /** + * The scheduler thread of probing dead node. + */ + private Thread probeDeadNodesSchedulerThr; + + /** + * The thread pool of probing datanodes' rpc request. Sometimes the data node + * can hang and not respond to the client in a short time. And these node will + * filled with probe thread pool and block other normal node probing. + */ + private ExecutorService rpcThreadPool; + + private int socketTimeout; + + /** + * The type of probe. + */ + private enum ProbeType { + CHECK_DEAD + } + + /** * The state of DeadNodeDetector. */ private enum State { @@ -71,12 +157,40 @@ public class DeadNodeDetector implements Runnable { private State state; - public DeadNodeDetector(String name) { + public DeadNodeDetector(String name, Configuration conf) { + this.conf = new Configuration(conf); this.deadNodes = new ConcurrentHashMap<String, DatanodeInfo>(); this.dfsInputStreamNodes = new ConcurrentHashMap<DFSInputStream, HashSet<DatanodeInfo>>(); this.name = name; + deadNodeDetectInterval = conf.getLong( + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT); + socketTimeout = + conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT); + maxDeadNodesProbeQueueLen = + conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT); + probeConnectionTimeoutMs = conf.getLong( + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT); + + this.deadNodesProbeQueue = + new ArrayBlockingQueue<DatanodeInfo>(maxDeadNodesProbeQueueLen); + + int deadNodeDetectDeadThreads = + conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT); + int rpcThreads = conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT); + probeDeadNodesThreadPool = Executors.newFixedThreadPool( + deadNodeDetectDeadThreads, new Daemon.DaemonFactory()); + rpcThreadPool = + Executors.newFixedThreadPool(rpcThreads, new Daemon.DaemonFactory()); + + startProbeScheduler(); + LOG.info("Start dead node detector for DFSClient {}.", this.name); state = State.INIT; } @@ -91,6 +205,9 @@ public class DeadNodeDetector implements Runnable { case INIT: init(); break; + case CHECK_DEAD: + checkDeadNodes(); + break; case IDLE: idle(); break; @@ -106,6 +223,134 @@ public class DeadNodeDetector implements Runnable { } } + /** + * Start probe dead node thread. + */ + private void startProbeScheduler() { + probeDeadNodesSchedulerThr = + new Thread(new ProbeScheduler(this, ProbeType.CHECK_DEAD)); + probeDeadNodesSchedulerThr.setDaemon(true); + probeDeadNodesSchedulerThr.start(); + } + + /** + * Prode datanode by probe byte. + */ + private void scheduleProbe(ProbeType type) { + LOG.debug("Schedule probe datanode for probe type: {}.", type); + DatanodeInfo datanodeInfo = null; + if (type == ProbeType.CHECK_DEAD) { + while ((datanodeInfo = deadNodesProbeQueue.poll()) != null) { + if (probeInProg.containsKey(datanodeInfo.getDatanodeUuid())) { + LOG.debug("The datanode {} is already contained in probe queue, " + + "skip to add it.", datanodeInfo); + continue; + } + probeInProg.put(datanodeInfo.getDatanodeUuid(), datanodeInfo); + Probe probe = new Probe(this, datanodeInfo, ProbeType.CHECK_DEAD); + probeDeadNodesThreadPool.execute(probe); + } + } + } + + /** + * Request the data node through rpc, and determine the data node status based + * on the returned result. + */ + class Probe implements Runnable { + private DeadNodeDetector deadNodeDetector; + private DatanodeInfo datanodeInfo; + private ProbeType type; + + Probe(DeadNodeDetector deadNodeDetector, DatanodeInfo datanodeInfo, + ProbeType type) { + this.deadNodeDetector = deadNodeDetector; + this.datanodeInfo = datanodeInfo; + this.type = type; + } + + public DatanodeInfo getDatanodeInfo() { + return datanodeInfo; + } + + public ProbeType getType() { + return type; + } + + @Override + public void run() { + LOG.debug("Check node: {}, type: {}.", datanodeInfo, type); + try { + final ClientDatanodeProtocol proxy = + DFSUtilClient.createClientDatanodeProtocolProxy(datanodeInfo, + deadNodeDetector.conf, socketTimeout, true); + + Future<DatanodeLocalInfo> future = rpcThreadPool.submit(new Callable() { + @Override + public DatanodeLocalInfo call() throws Exception { + return proxy.getDatanodeInfo(); + } + }); + + try { + future.get(probeConnectionTimeoutMs, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + LOG.error("Probe failed, datanode: {}, type: {}.", datanodeInfo, type, + e); + deadNodeDetector.probeCallBack(this, false); + return; + } finally { + future.cancel(true); + } + deadNodeDetector.probeCallBack(this, true); + return; + } catch (Exception e) { + LOG.error("Probe failed, datanode: {}, type: {}.", datanodeInfo, type, + e); + } + + deadNodeDetector.probeCallBack(this, false); + } + } + + /** + * Handle data node, according to probe result. When ProbeType is CHECK_DEAD, + * remove the datanode from DeadNodeDetector#deadNodes if probe success. + */ + private void probeCallBack(Probe probe, boolean success) { + LOG.debug("Probe datanode: {} result: {}, type: {}", + probe.getDatanodeInfo(), success, probe.getType()); + probeInProg.remove(probe.getDatanodeInfo().getDatanodeUuid()); + if (success) { + if (probe.getType() == ProbeType.CHECK_DEAD) { + LOG.info("Remove the node out from dead node list: {}. ", + probe.getDatanodeInfo()); + removeNodeFromDeadNode(probe.getDatanodeInfo()); + } + } + } + + /** + * Check dead node periodically. + */ + private void checkDeadNodes() { + long ts = Time.monotonicNow(); + if (ts - lastDetectDeadTS > deadNodeDetectInterval) { + Set<DatanodeInfo> datanodeInfos = clearAndGetDetectedDeadNodes(); + for (DatanodeInfo datanodeInfo : datanodeInfos) { + LOG.debug("Add dead node to check: {}.", datanodeInfo); + if (!deadNodesProbeQueue.offer(datanodeInfo)) { + LOG.debug("Skip to add dead node {} to check " + + "since the probe queue is full.", datanodeInfo); + break; + } + } + lastDetectDeadTS = ts; + } + + state = State.IDLE; + } + private void idle() { try { Thread.sleep(IDLE_SLEEP_MS); @@ -113,11 +358,11 @@ public class DeadNodeDetector implements Runnable { } - state = State.IDLE; + state = State.CHECK_DEAD; } private void init() { - state = State.IDLE; + state = State.CHECK_DEAD; } private void addToDead(DatanodeInfo datanodeInfo) { @@ -128,6 +373,14 @@ public class DeadNodeDetector implements Runnable { return deadNodes.containsKey(datanodeInfo.getDatanodeUuid()); } + private void removeFromDead(DatanodeInfo datanodeInfo) { + deadNodes.remove(datanodeInfo.getDatanodeUuid()); + } + + public Queue<DatanodeInfo> getDeadNodesProbeQueue() { + return deadNodesProbeQueue; + } + /** * Add datanode in deadNodes and dfsInputStreamNodes. The node is considered * to dead node. The dead node is shared by all the DFSInputStreams in the @@ -182,4 +435,49 @@ public class DeadNodeDetector implements Runnable { } } } + + /** + * Remove dead node from dfsInputStreamNodes#dfsInputStream and deadNodes. + */ + public synchronized void removeNodeFromDeadNode(DatanodeInfo datanodeInfo) { + for (Map.Entry<DFSInputStream, HashSet<DatanodeInfo>> entry : + dfsInputStreamNodes.entrySet()) { + Set<DatanodeInfo> datanodeInfos = entry.getValue(); + if (datanodeInfos.remove(datanodeInfo)) { + DFSInputStream dfsInputStream = entry.getKey(); + dfsInputStream.removeFromLocalDeadNodes(datanodeInfo); + } + } + + removeFromDead(datanodeInfo); + } + + private static void probeSleep(long time) { + try { + Thread.sleep(time); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + /** + * Schedule probe data node. + */ + static class ProbeScheduler implements Runnable { + private DeadNodeDetector deadNodeDetector; + private ProbeType type; + + ProbeScheduler(DeadNodeDetector deadNodeDetector, ProbeType type) { + this.deadNodeDetector = deadNodeDetector; + this.type = type; + } + + @Override + public void run() { + while (true) { + deadNodeDetector.scheduleProbe(type); + probeSleep(deadNodeDetector.deadNodeDetectInterval); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index e32a3bb..4218cc0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -156,6 +156,28 @@ public interface HdfsClientConfigKeys { "dfs.client.deadnode.detection.enabled"; boolean DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT = false; + String DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY = + "dfs.client.deadnode.detection.deadnode.queue.max"; + int DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT = 100; + + String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY = + "dfs.client.deadnode.detection.probe.connection.timeout.ms"; + long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT = + 20000; + + String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY = + "dfs.client.deadnode.detection.probe.deadnode.threads"; + int DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT = 10; + + String DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY = + "dfs.client.deadnode.detection.rpc.threads"; + int DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT = 20; + + String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY = + "dfs.client.deadnode.detection.probe.deadnode.interval.ms"; + long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT = + 60 * 1000; // 60s + String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = "dfs.datanode.kerberos.principal"; String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index d2103fb..684b617 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2996,6 +2996,46 @@ </description> </property> + <property> + <name>dfs.client.deadnode.detection.deadnode.queue.max</name> + <value>100</value> + <description> + The max queue size of probing dead node. + </description> + </property> + + <property> + <name>dfs.client.deadnode.detection.probe.deadnode.threads</name> + <value>10</value> + <description> + The maximum number of threads to use for probing dead node. + </description> + </property> + + <property> + <name>dfs.client.deadnode.detection.rpc.threads</name> + <value>20</value> + <description> + The maximum number of threads to use for calling RPC call to recheck the liveness of dead node. + </description> + </property> + + <property> + <name>dfs.client.deadnode.detection.probe.deadnode.interval.ms</name> + <value>60000</value> + <description> + Interval time in milliseconds for probing dead node behavior. + </description> + </property> + + <property> + <name>dfs.client.deadnode.detection.probe.connection.timeout.ms</name> + <value>20000</value> + <description> + Connection timeout for probing dead node in milliseconds. + </description> + </property> + <property> <name>dfs.namenode.lease-recheck-interval-ms</name> <value>2000</value> diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java index 9b997ab..da800f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java @@ -17,19 +17,24 @@ */ package org.apache.hadoop.hdfs; +import com.google.common.base.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY; import static org.junit.Assert.assertEquals; /** @@ -121,21 +126,7 @@ public class TestDeadNodeDetection { FileSystem fs = cluster.getFileSystem(); Path filePath = new Path("/testDeadNodeMultipleDFSInputStream"); - - // 256 bytes data chunk for writes - byte[] bytes = new byte[256]; - for (int index = 0; index < bytes.length; index++) { - bytes[index] = '0'; - } - - // File with a 512 bytes block size - FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512); - - // Write a block to DN (2x256bytes). - out.write(bytes); - out.write(bytes); - out.hflush(); - out.close(); + createFile(fs, filePath); String datanodeUuid = cluster.getDataNodes().get(0).getDatanodeUuid(); FSDataInputStream in1 = fs.open(filePath); @@ -170,7 +161,7 @@ public class TestDeadNodeDetection { } finally { in1.close(); in2.close(); - fs.delete(new Path("/testDeadNodeMultipleDFSInputStream"), true); + deleteFile(fs, filePath); // check the dead node again here, the dead node is expected be removed assertEquals(0, dfsClient1.getDeadNodes(din1).size()); assertEquals(0, dfsClient2.getDeadNodes(din2).size()); @@ -180,4 +171,135 @@ public class TestDeadNodeDetection { .clearAndGetDetectedDeadNodes().size()); } } + + @Test + public void testDeadNodeDetectionDeadNodeRecovery() throws Exception { + conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true); + conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY, + 1000); + // We'll be using a 512 bytes block size just for tests + // so making sure the checksum bytes too match it. + conf.setInt("io.bytes.per.checksum", 512); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + + FileSystem fs = cluster.getFileSystem(); + Path filePath = new Path("/testDeadNodeDetectionDeadNodeRecovery"); + createFile(fs, filePath); + + // Remove three DNs, + MiniDFSCluster.DataNodeProperties one = cluster.stopDataNode(0); + cluster.stopDataNode(0); + cluster.stopDataNode(0); + + FSDataInputStream in = fs.open(filePath); + DFSInputStream din = (DFSInputStream) in.getWrappedStream(); + DFSClient dfsClient = din.getDFSClient(); + try { + try { + in.read(); + } catch (BlockMissingException e) { + } + + assertEquals(3, dfsClient.getDeadNodes(din).size()); + assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + + cluster.restartDataNode(one, true); + waitForDeadNodeRecovery(din); + assertEquals(2, dfsClient.getDeadNodes(din).size()); + assertEquals(2, dfsClient.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + } finally { + in.close(); + deleteFile(fs, filePath); + assertEquals(0, dfsClient.getDeadNodes(din).size()); + assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + } + } + + @Test + public void testDeadNodeDetectionMaxDeadNodesProbeQueue() throws Exception { + conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true); + conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY, + 1000); + conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY, 1); + + // We'll be using a 512 bytes block size just for tests + // so making sure the checksum bytes too match it. + conf.setInt("io.bytes.per.checksum", 512); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + + FileSystem fs = cluster.getFileSystem(); + Path filePath = new Path("testDeadNodeDetectionMaxDeadNodesProbeQueue"); + createFile(fs, filePath); + + // Remove three DNs, + cluster.stopDataNode(0); + cluster.stopDataNode(0); + cluster.stopDataNode(0); + + FSDataInputStream in = fs.open(filePath); + DFSInputStream din = (DFSInputStream) in.getWrappedStream(); + DFSClient dfsClient = din.getDFSClient(); + try { + try { + in.read(); + } catch (BlockMissingException e) { + } + + Thread.sleep(1500); + Assert.assertTrue((dfsClient.getClientContext().getDeadNodeDetector() + .getDeadNodesProbeQueue().size() + + dfsClient.getDeadNodes(din).size()) <= 4); + } finally { + in.close(); + deleteFile(fs, filePath); + } + } + + private void createFile(FileSystem fs, Path filePath) throws IOException { + FSDataOutputStream out = null; + try { + // 256 bytes data chunk for writes + byte[] bytes = new byte[256]; + for (int index = 0; index < bytes.length; index++) { + bytes[index] = '0'; + } + + // File with a 512 bytes block size + out = fs.create(filePath, true, 4096, (short) 3, 512); + + // Write a block to all 3 DNs (2x256bytes). + out.write(bytes); + out.write(bytes); + out.hflush(); + + } finally { + out.close(); + } + } + + private void deleteFile(FileSystem fs, Path filePath) throws IOException { + fs.delete(filePath, true); + } + + private void waitForDeadNodeRecovery(DFSInputStream din) throws Exception { + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + try { + if (din.getDFSClient().getDeadNodes(din).size() == 2) { + return true; + } + } catch (Exception e) { + // Ignore the exception + } + + return false; + } + }, 5000, 100000); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org