HDFS-11303. Hedged read might hang infinitely if read data from all DN failed . Contributed by Chen Zhang, Wei-chiu Chuang, and John Zhuge.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8b242f09 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8b242f09 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8b242f09 Branch: refs/heads/HDFS-7240 Commit: 8b242f09a61a7536d2422546bfa6c2aaf1d57ed6 Parents: 28d97b7 Author: John Zhuge <jzh...@cloudera.com> Authored: Thu Aug 10 14:04:36 2017 -0700 Committer: John Zhuge <jzh...@apache.org> Committed: Fri Aug 11 19:42:07 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSInputStream.java | 11 ++-- .../java/org/apache/hadoop/hdfs/TestPread.java | 63 ++++++++++++++++++++ 2 files changed, 70 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b242f09/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index dcc997c..6bff172 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -1131,8 +1131,9 @@ public class DFSInputStream extends FSInputStream Future<ByteBuffer> firstRequest = hedgedService .submit(getFromDataNodeCallable); futures.add(firstRequest); + Future<ByteBuffer> future = null; try { - Future<ByteBuffer> future = hedgedService.poll( + future = hedgedService.poll( conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS); if (future != null) { ByteBuffer result = future.get(); @@ -1142,16 +1143,18 @@ public class DFSInputStream extends FSInputStream } DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged " + "read", conf.getHedgedReadThresholdMillis(), chosenNode.info); - // Ignore this node on next go around. - ignored.add(chosenNode.info); dfsClient.getHedgedReadMetrics().incHedgedReadOps(); // continue; no need to refresh block locations } catch (ExecutionException e) { - // Ignore + futures.remove(future); } catch (InterruptedException e) { throw new InterruptedIOException( "Interrupted while waiting for reading task"); } + // Ignore this node on next go around. + // If poll timeout and the request still ongoing, don't consider it + // again. If read data failed, don't consider it either. + ignored.add(chosenNode.info); } else { // We are starting up a 'hedged' read. We have a read already // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode. http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b242f09/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java index 85fc97b..bcb02b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java @@ -59,6 +59,8 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import com.google.common.base.Supplier; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; /** * This class tests the DFS positional read functionality in a single node @@ -72,6 +74,9 @@ public class TestPread { boolean simulatedStorage; boolean isHedgedRead; + private static final Logger LOG = + LoggerFactory.getLogger(TestPread.class.getName()); + @Before public void setup() { simulatedStorage = false; @@ -551,6 +556,64 @@ public class TestPread { } } + @Test(timeout=30000) + public void testHedgedReadFromAllDNFailed() throws IOException { + Configuration conf = new Configuration(); + int numHedgedReadPoolThreads = 5; + final int hedgedReadTimeoutMillis = 50; + + conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, + numHedgedReadPoolThreads); + conf.setLong(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY, + hedgedReadTimeoutMillis); + conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 0); + // Set up the InjectionHandler + DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class)); + DFSClientFaultInjector injector = DFSClientFaultInjector.get(); + Mockito.doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + if (true) { + LOG.info("-------------- throw Checksum Exception"); + throw new ChecksumException("ChecksumException test", 100); + } + return null; + } + }).when(injector).fetchFromDatanodeException(); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .format(true).build(); + DistributedFileSystem fileSys = cluster.getFileSystem(); + DFSClient dfsClient = fileSys.getClient(); + FSDataOutputStream output = null; + DFSInputStream input = null; + String filename = "/hedgedReadMaxOut.dat"; + DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics(); + // Metrics instance is static, so we need to reset counts from prior tests. + metrics.hedgedReadOps.set(0); + try { + Path file = new Path(filename); + output = fileSys.create(file, (short) 2); + byte[] data = new byte[64 * 1024]; + output.write(data); + output.flush(); + output.close(); + byte[] buffer = new byte[64 * 1024]; + input = dfsClient.open(filename); + input.read(0, buffer, 0, 1024); + Assert.fail("Reading the block should have thrown BlockMissingException"); + } catch (BlockMissingException e) { + assertEquals(3, input.getHedgedReadOpsLoopNumForTesting()); + assertTrue(metrics.getHedgedReadOps() == 0); + } finally { + Mockito.reset(injector); + IOUtils.cleanupWithLogger(LOG, input); + IOUtils.cleanupWithLogger(LOG, output); + fileSys.close(); + cluster.shutdown(); + } + } + /** * Scenario: 1. Write a file with RF=2, DN1 and DN2<br> * 2. Open the stream, Consider Locations are [DN1, DN2] in LocatedBlock.<br> --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org