Author: cnauroth Date: Mon Jun 30 20:46:44 2014 New Revision: 1606927 URL: http://svn.apache.org/r1606927 Log: HDFS-6591. while loop is executed tens of thousands of times in Hedged Read. Contributed by Liang Xie.
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1606927&r1=1606926&r2=1606927&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Jun 30 20:46:44 2014 @@ -739,6 +739,9 @@ Release 2.5.0 - UNRELEASED HDFS-6558. Missing newline in the description of dfsadmin -rollingUpgrade. (Chen He via kihwal) + HDFS-6591. while loop is executed tens of thousands of times in Hedged Read + (Liang Xie via cnauroth) + BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh) Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java?rev=1606927&r1=1606926&r2=1606927&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java Mon Jun 30 20:46:44 2014 @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs; +import java.util.concurrent.atomic.AtomicLong; + import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; @@ -29,6 +31,7 @@ import org.apache.hadoop.classification. @InterfaceAudience.Private public class DFSClientFaultInjector { public static DFSClientFaultInjector instance = new DFSClientFaultInjector(); + public static AtomicLong exceptionNum = new AtomicLong(0); public static DFSClientFaultInjector get() { return instance; @@ -47,4 +50,6 @@ public class DFSClientFaultInjector { } public void startFetchFromDatanode() {} + + public void fetchFromDatanodeException() {} } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1606927&r1=1606926&r2=1606927&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Mon Jun 30 20:46:44 2014 @@ -32,12 +32,14 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; @@ -81,6 +83,7 @@ implements ByteBufferReadable, CanSetDro HasEnhancedByteBufferAccess { @VisibleForTesting public static boolean tcpReadsDisabledForTesting = false; + private long hedgedReadOpsLoopNumForTesting = 0; private final DFSClient dfsClient; private boolean closed = false; private final String src; @@ -976,20 +979,15 @@ implements ByteBufferReadable, CanSetDro private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode, final LocatedBlock block, final long start, final long end, final ByteBuffer bb, - final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, - final CountDownLatch latch) { + final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { return new Callable<ByteBuffer>() { @Override public ByteBuffer call() throws Exception { - try { - byte[] buf = bb.array(); - int offset = bb.position(); - actualGetFromOneDataNode(datanode, block, start, end, buf, offset, - corruptedBlockMap); - return bb; - } finally { - latch.countDown(); - } + byte[] buf = bb.array(); + int offset = bb.position(); + actualGetFromOneDataNode(datanode, block, start, end, buf, offset, + corruptedBlockMap); + return bb; } }; } @@ -1018,6 +1016,7 @@ implements ByteBufferReadable, CanSetDro BlockReader reader = null; try { + DFSClientFaultInjector.get().fetchFromDatanodeException(); Token<BlockTokenIdentifier> blockToken = block.getBlockToken(); int len = (int) (end - start + 1); reader = new BlockReaderFactory(dfsClient.getConf()). @@ -1097,35 +1096,43 @@ implements ByteBufferReadable, CanSetDro Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) throws IOException { ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>(); + CompletionService<ByteBuffer> hedgedService = + new ExecutorCompletionService<ByteBuffer>( + dfsClient.getHedgedReadsThreadPool()); ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>(); ByteBuffer bb = null; int len = (int) (end - start + 1); block = getBlockAt(block.getStartOffset(), false); - // Latch shared by all outstanding reads. First to finish closes - CountDownLatch hasReceivedResult = new CountDownLatch(1); while (true) { + // see HDFS-6591, this metric is used to verify/catch unnecessary loops + hedgedReadOpsLoopNumForTesting++; DNAddrPair chosenNode = null; - Future<ByteBuffer> future = null; - // futures is null if there is no request already executing. + // there is no request already executing. if (futures.isEmpty()) { - // chooseDataNode is a commitment. If no node, we go to - // the NN to reget block locations. Only go here on first read. + // chooseDataNode is a commitment. If no node, we go to + // the NN to reget block locations. Only go here on first read. chosenNode = chooseDataNode(block, ignored); bb = ByteBuffer.wrap(buf, offset, len); - future = getHedgedReadFuture(chosenNode, block, start, end, bb, - corruptedBlockMap, hasReceivedResult); + Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( + chosenNode, block, start, end, bb, corruptedBlockMap); + Future<ByteBuffer> firstRequest = hedgedService + .submit(getFromDataNodeCallable); + futures.add(firstRequest); try { - future.get(dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS); - return; - } catch (TimeoutException e) { + Future<ByteBuffer> future = hedgedService.poll( + dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS); + if (future != null) { + future.get(); + return; + } if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout() + - "ms to read from " + chosenNode.info + "; spawning hedged read"); + DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout() + + "ms to read from " + chosenNode.info + + "; spawning hedged read"); } // Ignore this node on next go around. ignored.add(chosenNode.info); dfsClient.getHedgedReadMetrics().incHedgedReadOps(); - futures.add(future); continue; // no need to refresh block locations } catch (InterruptedException e) { // Ignore @@ -1133,25 +1140,31 @@ implements ByteBufferReadable, CanSetDro // Ignore already logged in the call. } } else { - // We are starting up a 'hedged' read. We have a read already + // We are starting up a 'hedged' read. We have a read already // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode. // If no nodes to do hedged reads against, pass. try { - chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored); + try { + chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored); + } catch (IOException ioe) { + chosenNode = chooseDataNode(block, ignored); + } bb = ByteBuffer.allocate(len); - future = getHedgedReadFuture(chosenNode, block, start, end, bb, - corruptedBlockMap, hasReceivedResult); - futures.add(future); + Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( + chosenNode, block, start, end, bb, corruptedBlockMap); + Future<ByteBuffer> oneMoreRequest = hedgedService + .submit(getFromDataNodeCallable); + futures.add(oneMoreRequest); } catch (IOException ioe) { if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Failed getting node for hedged read: " + - ioe.getMessage()); + DFSClient.LOG.debug("Failed getting node for hedged read: " + + ioe.getMessage()); } } // if not succeeded. Submit callables for each datanode in a loop, wait // for a fixed interval and get the result from the fastest one. try { - ByteBuffer result = getFirstToComplete(futures, hasReceivedResult); + ByteBuffer result = getFirstToComplete(hedgedService, futures); // cancel the rest. cancelAll(futures); if (result.array() != buf) { // compare the array pointers @@ -1163,50 +1176,43 @@ implements ByteBufferReadable, CanSetDro } return; } catch (InterruptedException ie) { - // Ignore - } catch (ExecutionException e) { - // exception already handled in the call method. getFirstToComplete - // will remove the failing future from the list. nothing more to do. + // Ignore and retry } - // We got here if exception. Ignore this node on next go around IFF + // We got here if exception. Ignore this node on next go around IFF // we found a chosenNode to hedge read against. if (chosenNode != null && chosenNode.info != null) { ignored.add(chosenNode.info); } } - // executed if we get an error from a data node - block = getBlockAt(block.getStartOffset(), false); } } - private Future<ByteBuffer> getHedgedReadFuture(final DNAddrPair chosenNode, - final LocatedBlock block, long start, - final long end, final ByteBuffer bb, - final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, - final CountDownLatch hasReceivedResult) { - Callable<ByteBuffer> getFromDataNodeCallable = - getFromOneDataNode(chosenNode, block, start, end, bb, - corruptedBlockMap, hasReceivedResult); - return dfsClient.getHedgedReadsThreadPool().submit(getFromDataNodeCallable); + @VisibleForTesting + public long getHedgedReadOpsLoopNumForTesting() { + return hedgedReadOpsLoopNumForTesting; } - private ByteBuffer getFirstToComplete(ArrayList<Future<ByteBuffer>> futures, - CountDownLatch latch) throws ExecutionException, InterruptedException { - latch.await(); - for (Future<ByteBuffer> future : futures) { - if (future.isDone()) { - try { - return future.get(); - } catch (ExecutionException e) { - // already logged in the Callable - futures.remove(future); - throw e; - } - } + private ByteBuffer getFirstToComplete( + CompletionService<ByteBuffer> hedgedService, + ArrayList<Future<ByteBuffer>> futures) throws InterruptedException { + if (futures.isEmpty()) { + throw new InterruptedException("let's retry"); + } + Future<ByteBuffer> future = null; + try { + future = hedgedService.take(); + ByteBuffer bb = future.get(); + futures.remove(future); + return bb; + } catch (ExecutionException e) { + // already logged in the Callable + futures.remove(future); + } catch (CancellationException ce) { + // already logged in the Callable + futures.remove(future); } - throw new InterruptedException("latch has counted down to zero but no" - + "result available yet, for safety try to request another one from" - + "outside loop, this should be rare"); + + throw new InterruptedException("let's retry"); } private void cancelAll(List<Future<ByteBuffer>> futures) { Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java?rev=1606927&r1=1606926&r2=1606927&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java Mon Jun 30 20:46:44 2014 @@ -31,12 +31,16 @@ import java.util.concurrent.Future; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.io.IOUtils; import org.apache.log4j.Level; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -49,7 +53,14 @@ import org.mockito.stubbing.Answer; public class TestPread { static final long seed = 0xDEADBEEFL; static final int blockSize = 4096; - boolean simulatedStorage = false; + boolean simulatedStorage; + boolean isHedgedRead; + + @Before + public void setup() { + simulatedStorage = false; + isHedgedRead = false; + } private void writeFile(FileSystem fileSys, Path name) throws IOException { int replication = 3;// We need > 1 blocks to test out the hedged reads. @@ -73,7 +84,7 @@ public class TestPread { // now create the real file DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 12 * blockSize, - blockSize, (short) 1, seed); + blockSize, (short) replication, seed); } private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) { @@ -104,8 +115,13 @@ public class TestPread { } if (dfstm != null) { - assertEquals("Expected read statistic to be incremented", length, dfstm - .getReadStatistics().getTotalBytesRead() - totalRead); + if (isHedgedRead) { + assertTrue("Expected read statistic to be incremented", length <= dfstm + .getReadStatistics().getTotalBytesRead() - totalRead); + } else { + assertEquals("Expected read statistic to be incremented", length, dfstm + .getReadStatistics().getTotalBytesRead() - totalRead); + } } } @@ -208,7 +224,7 @@ public class TestPread { stm.readFully(0, actual); checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Test"); } - + private void cleanupFile(FileSystem fileSys, Path name) throws IOException { assertTrue(fileSys.exists(name)); assertTrue(fileSys.delete(name, true)); @@ -249,6 +265,7 @@ public class TestPread { */ @Test public void testHedgedPreadDFSBasic() throws IOException { + isHedgedRead = true; Configuration conf = new Configuration(); conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5); conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 1); @@ -258,8 +275,72 @@ public class TestPread { } @Test + public void testHedgedReadLoopTooManyTimes() throws IOException { + Configuration conf = new Configuration(); + int numHedgedReadPoolThreads = 5; + final int hedgedReadTimeoutMillis = 50; + + conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, + numHedgedReadPoolThreads); + conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, + hedgedReadTimeoutMillis); + // Set up the InjectionHandler + DFSClientFaultInjector.instance = Mockito + .mock(DFSClientFaultInjector.class); + DFSClientFaultInjector injector = DFSClientFaultInjector.instance; + Mockito.doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + if (true) { + Thread.sleep(hedgedReadTimeoutMillis + 1); + if (DFSClientFaultInjector.exceptionNum.compareAndSet(0, 1)) { + System.out.println("-------------- throw Checksum Exception"); + throw new ChecksumException("ChecksumException test", 100); + } + } + return null; + } + }).when(injector).fetchFromDatanodeException(); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) + .format(true).build(); + DistributedFileSystem fileSys = cluster.getFileSystem(); + DFSClient dfsClient = fileSys.getClient(); + FSDataOutputStream output = null; + DFSInputStream input = null; + String filename = "/hedgedReadMaxOut.dat"; + try { + + Path file = new Path(filename); + output = fileSys.create(file, (short) 2); + byte[] data = new byte[64 * 1024]; + output.write(data); + output.flush(); + output.write(data); + output.flush(); + output.write(data); + output.flush(); + output.close(); + byte[] buffer = new byte[64 * 1024]; + input = dfsClient.open(filename); + input.read(0, buffer, 0, 1024); + input.close(); + assertEquals(3, input.getHedgedReadOpsLoopNumForTesting()); + } catch (BlockMissingException e) { + assertTrue(false); + } finally { + IOUtils.cleanup(null, input); + IOUtils.cleanup(null, output); + fileSys.close(); + cluster.shutdown(); + Mockito.reset(injector); + } + } + + @Test public void testMaxOutHedgedReadPool() throws IOException, InterruptedException, ExecutionException { + isHedgedRead = true; Configuration conf = new Configuration(); int numHedgedReadPoolThreads = 5; final int initialHedgedReadTimeoutMillis = 50000; @@ -367,7 +448,6 @@ public class TestPread { public void testPreadDFSSimulated() throws IOException { simulatedStorage = true; testPreadDFS(); - simulatedStorage = false; } /**