Author: cnauroth
Date: Mon Jun 30 20:46:44 2014
New Revision: 1606927

URL: http://svn.apache.org/r1606927
Log:
HDFS-6591. while loop is executed tens of thousands of times in Hedged Read. 
Contributed by Liang Xie.

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1606927&r1=1606926&r2=1606927&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Jun 30 
20:46:44 2014
@@ -739,6 +739,9 @@ Release 2.5.0 - UNRELEASED
     HDFS-6558. Missing newline in the description of dfsadmin -rollingUpgrade.
     (Chen He via kihwal)
 
+    HDFS-6591. while loop is executed tens of thousands of times in Hedged Read
+    (Liang Xie via cnauroth)
+
   BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS
 
     HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via 
umamahesh)

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java?rev=1606927&r1=1606926&r2=1606927&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
 Mon Jun 30 20:46:44 2014
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -29,6 +31,7 @@ import org.apache.hadoop.classification.
 @InterfaceAudience.Private
 public class DFSClientFaultInjector {
   public static DFSClientFaultInjector instance = new DFSClientFaultInjector();
+  public static AtomicLong exceptionNum = new AtomicLong(0);
 
   public static DFSClientFaultInjector get() {
     return instance;
@@ -47,4 +50,6 @@ public class DFSClientFaultInjector {
   }
 
   public void startFetchFromDatanode() {}
+
+  public void fetchFromDatanodeException() {}
 }

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1606927&r1=1606926&r2=1606927&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 Mon Jun 30 20:46:44 2014
@@ -32,12 +32,14 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -81,6 +83,7 @@ implements ByteBufferReadable, CanSetDro
     HasEnhancedByteBufferAccess {
   @VisibleForTesting
   public static boolean tcpReadsDisabledForTesting = false;
+  private long hedgedReadOpsLoopNumForTesting = 0;
   private final DFSClient dfsClient;
   private boolean closed = false;
   private final String src;
@@ -976,20 +979,15 @@ implements ByteBufferReadable, CanSetDro
   private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
       final LocatedBlock block, final long start, final long end,
       final ByteBuffer bb,
-      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
-      final CountDownLatch latch) {
+      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
     return new Callable<ByteBuffer>() {
       @Override
       public ByteBuffer call() throws Exception {
-        try {
-          byte[] buf = bb.array();
-          int offset = bb.position();
-          actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
-              corruptedBlockMap);
-          return bb;
-        } finally {
-          latch.countDown();
-        }
+        byte[] buf = bb.array();
+        int offset = bb.position();
+        actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
+            corruptedBlockMap);
+        return bb;
       }
     };
   }
@@ -1018,6 +1016,7 @@ implements ByteBufferReadable, CanSetDro
       BlockReader reader = null;
 
       try {
+        DFSClientFaultInjector.get().fetchFromDatanodeException();
         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
         int len = (int) (end - start + 1);
         reader = new BlockReaderFactory(dfsClient.getConf()).
@@ -1097,35 +1096,43 @@ implements ByteBufferReadable, CanSetDro
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
     ArrayList<Future<ByteBuffer>> futures = new 
ArrayList<Future<ByteBuffer>>();
+    CompletionService<ByteBuffer> hedgedService =
+        new ExecutorCompletionService<ByteBuffer>(
+        dfsClient.getHedgedReadsThreadPool());
     ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>();
     ByteBuffer bb = null;
     int len = (int) (end - start + 1);
     block = getBlockAt(block.getStartOffset(), false);
-    // Latch shared by all outstanding reads.  First to finish closes
-    CountDownLatch hasReceivedResult = new CountDownLatch(1);
     while (true) {
+      // see HDFS-6591, this metric is used to verify/catch unnecessary loops
+      hedgedReadOpsLoopNumForTesting++;
       DNAddrPair chosenNode = null;
-      Future<ByteBuffer> future = null;
-      // futures is null if there is no request already executing.
+      // there is no request already executing.
       if (futures.isEmpty()) {
-        // chooseDataNode is a commitment.  If no node, we go to
-        // the NN to reget block locations.  Only go here on first read.
+        // chooseDataNode is a commitment. If no node, we go to
+        // the NN to reget block locations. Only go here on first read.
         chosenNode = chooseDataNode(block, ignored);
         bb = ByteBuffer.wrap(buf, offset, len);
-        future = getHedgedReadFuture(chosenNode, block, start, end, bb,
-          corruptedBlockMap, hasReceivedResult);
+        Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
+            chosenNode, block, start, end, bb, corruptedBlockMap);
+        Future<ByteBuffer> firstRequest = hedgedService
+            .submit(getFromDataNodeCallable);
+        futures.add(firstRequest);
         try {
-          future.get(dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS);
-          return;
-        } catch (TimeoutException e) {
+          Future<ByteBuffer> future = hedgedService.poll(
+              dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS);
+          if (future != null) {
+            future.get();
+            return;
+          }
           if (DFSClient.LOG.isDebugEnabled()) {
-            DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout() +
-              "ms to read from " + chosenNode.info + "; spawning hedged read");
+            DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout()
+                + "ms to read from " + chosenNode.info
+                + "; spawning hedged read");
           }
           // Ignore this node on next go around.
           ignored.add(chosenNode.info);
           dfsClient.getHedgedReadMetrics().incHedgedReadOps();
-          futures.add(future);
           continue; // no need to refresh block locations
         } catch (InterruptedException e) {
           // Ignore
@@ -1133,25 +1140,31 @@ implements ByteBufferReadable, CanSetDro
           // Ignore already logged in the call.
         }
       } else {
-        // We are starting up a 'hedged' read.  We have a read already
+        // We are starting up a 'hedged' read. We have a read already
         // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
         // If no nodes to do hedged reads against, pass.
         try {
-          chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored);
+          try {
+            chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored);
+          } catch (IOException ioe) {
+            chosenNode = chooseDataNode(block, ignored);
+          }
           bb = ByteBuffer.allocate(len);
-          future = getHedgedReadFuture(chosenNode, block, start, end, bb,
-            corruptedBlockMap, hasReceivedResult);
-          futures.add(future);
+          Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
+              chosenNode, block, start, end, bb, corruptedBlockMap);
+          Future<ByteBuffer> oneMoreRequest = hedgedService
+              .submit(getFromDataNodeCallable);
+          futures.add(oneMoreRequest);
         } catch (IOException ioe) {
           if (DFSClient.LOG.isDebugEnabled()) {
-            DFSClient.LOG.debug("Failed getting node for hedged read: " +
-              ioe.getMessage());
+            DFSClient.LOG.debug("Failed getting node for hedged read: "
+                + ioe.getMessage());
           }
         }
         // if not succeeded. Submit callables for each datanode in a loop, wait
         // for a fixed interval and get the result from the fastest one.
         try {
-          ByteBuffer result = getFirstToComplete(futures, hasReceivedResult);
+          ByteBuffer result = getFirstToComplete(hedgedService, futures);
           // cancel the rest.
           cancelAll(futures);
           if (result.array() != buf) { // compare the array pointers
@@ -1163,50 +1176,43 @@ implements ByteBufferReadable, CanSetDro
           }
           return;
         } catch (InterruptedException ie) {
-          // Ignore
-        } catch (ExecutionException e) {
-          // exception already handled in the call method. getFirstToComplete
-          // will remove the failing future from the list. nothing more to do.
+          // Ignore and retry
         }
-        // We got here if exception.  Ignore this node on next go around IFF
+        // We got here if exception. Ignore this node on next go around IFF
         // we found a chosenNode to hedge read against.
         if (chosenNode != null && chosenNode.info != null) {
           ignored.add(chosenNode.info);
         }
       }
-      // executed if we get an error from a data node
-      block = getBlockAt(block.getStartOffset(), false);
     }
   }
 
-  private Future<ByteBuffer> getHedgedReadFuture(final DNAddrPair chosenNode,
-      final LocatedBlock block, long start,
-      final long end, final ByteBuffer bb,
-      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
-      final CountDownLatch hasReceivedResult) {
-    Callable<ByteBuffer> getFromDataNodeCallable =
-        getFromOneDataNode(chosenNode, block, start, end, bb,
-          corruptedBlockMap, hasReceivedResult);
-    return 
dfsClient.getHedgedReadsThreadPool().submit(getFromDataNodeCallable);
+  @VisibleForTesting
+  public long getHedgedReadOpsLoopNumForTesting() {
+    return hedgedReadOpsLoopNumForTesting;
   }
 
-  private ByteBuffer getFirstToComplete(ArrayList<Future<ByteBuffer>> futures,
-      CountDownLatch latch) throws ExecutionException, InterruptedException {
-    latch.await();
-    for (Future<ByteBuffer> future : futures) {
-      if (future.isDone()) {
-        try {
-          return future.get();
-        } catch (ExecutionException e) {
-          // already logged in the Callable
-          futures.remove(future);
-          throw e;
-        }
-      }
+  private ByteBuffer getFirstToComplete(
+      CompletionService<ByteBuffer> hedgedService,
+      ArrayList<Future<ByteBuffer>> futures) throws InterruptedException {
+    if (futures.isEmpty()) {
+      throw new InterruptedException("let's retry");
+    }
+    Future<ByteBuffer> future = null;
+    try {
+      future = hedgedService.take();
+      ByteBuffer bb = future.get();
+      futures.remove(future);
+      return bb;
+    } catch (ExecutionException e) {
+      // already logged in the Callable
+      futures.remove(future);
+    } catch (CancellationException ce) {
+      // already logged in the Callable
+      futures.remove(future);
     }
-    throw new InterruptedException("latch has counted down to zero but no"
-        + "result available yet, for safety try to request another one from"
-        + "outside loop, this should be rare");
+
+    throw new InterruptedException("let's retry");
   }
 
   private void cancelAll(List<Future<ByteBuffer>> futures) {

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java?rev=1606927&r1=1606926&r2=1606927&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
 Mon Jun 30 20:46:44 2014
@@ -31,12 +31,16 @@ import java.util.concurrent.Future;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.log4j.Level;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -49,7 +53,14 @@ import org.mockito.stubbing.Answer;
 public class TestPread {
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 4096;
-  boolean simulatedStorage = false;
+  boolean simulatedStorage;
+  boolean isHedgedRead;
+
+  @Before
+  public void setup() {
+    simulatedStorage = false;
+    isHedgedRead = false;
+  }
 
   private void writeFile(FileSystem fileSys, Path name) throws IOException {
     int replication = 3;// We need > 1 blocks to test out the hedged reads.
@@ -73,7 +84,7 @@ public class TestPread {
     
     // now create the real file
     DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 12 * blockSize,
-        blockSize, (short) 1, seed);
+        blockSize, (short) replication, seed);
   }
   
   private void checkAndEraseData(byte[] actual, int from, byte[] expected, 
String message) {
@@ -104,8 +115,13 @@ public class TestPread {
     }
 
     if (dfstm != null) {
-      assertEquals("Expected read statistic to be incremented", length, dfstm
-          .getReadStatistics().getTotalBytesRead() - totalRead);
+      if (isHedgedRead) {
+        assertTrue("Expected read statistic to be incremented", length <= dfstm
+            .getReadStatistics().getTotalBytesRead() - totalRead);
+      } else {
+        assertEquals("Expected read statistic to be incremented", length, dfstm
+            .getReadStatistics().getTotalBytesRead() - totalRead);
+      }
     }
   }
   
@@ -208,7 +224,7 @@ public class TestPread {
     stm.readFully(0, actual);
     checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Test");
   }
-  
+
   private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
     assertTrue(fileSys.exists(name));
     assertTrue(fileSys.delete(name, true));
@@ -249,6 +265,7 @@ public class TestPread {
    */
   @Test
   public void testHedgedPreadDFSBasic() throws IOException {
+    isHedgedRead = true;
     Configuration conf = new Configuration();
     conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
     conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 1);
@@ -258,8 +275,72 @@ public class TestPread {
   }
 
   @Test
+  public void testHedgedReadLoopTooManyTimes() throws IOException {
+    Configuration conf = new Configuration();
+    int numHedgedReadPoolThreads = 5;
+    final int hedgedReadTimeoutMillis = 50;
+
+    conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
+        numHedgedReadPoolThreads);
+    conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
+        hedgedReadTimeoutMillis);
+    // Set up the InjectionHandler
+    DFSClientFaultInjector.instance = Mockito
+        .mock(DFSClientFaultInjector.class);
+    DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
+    Mockito.doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        if (true) {
+          Thread.sleep(hedgedReadTimeoutMillis + 1);
+          if (DFSClientFaultInjector.exceptionNum.compareAndSet(0, 1)) {
+            System.out.println("-------------- throw Checksum Exception");
+            throw new ChecksumException("ChecksumException test", 100);
+          }
+        }
+        return null;
+      }
+    }).when(injector).fetchFromDatanodeException();
+
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+        .format(true).build();
+    DistributedFileSystem fileSys = cluster.getFileSystem();
+    DFSClient dfsClient = fileSys.getClient();
+    FSDataOutputStream output = null;
+    DFSInputStream input = null;
+    String filename = "/hedgedReadMaxOut.dat";
+    try {
+      
+      Path file = new Path(filename);
+      output = fileSys.create(file, (short) 2);
+      byte[] data = new byte[64 * 1024];
+      output.write(data);
+      output.flush();
+      output.write(data);
+      output.flush();
+      output.write(data);
+      output.flush();
+      output.close();
+      byte[] buffer = new byte[64 * 1024];
+      input = dfsClient.open(filename);
+      input.read(0, buffer, 0, 1024);
+      input.close();
+      assertEquals(3, input.getHedgedReadOpsLoopNumForTesting());
+    } catch (BlockMissingException e) {
+      assertTrue(false);
+    } finally {
+      IOUtils.cleanup(null, input);
+      IOUtils.cleanup(null, output);
+      fileSys.close();
+      cluster.shutdown();
+      Mockito.reset(injector);
+    }
+  }
+
+  @Test
   public void testMaxOutHedgedReadPool() throws IOException,
       InterruptedException, ExecutionException {
+    isHedgedRead = true;
     Configuration conf = new Configuration();
     int numHedgedReadPoolThreads = 5;
     final int initialHedgedReadTimeoutMillis = 50000;
@@ -367,7 +448,6 @@ public class TestPread {
   public void testPreadDFSSimulated() throws IOException {
     simulatedStorage = true;
     testPreadDFS();
-    simulatedStorage = false;
   }
   
   /**


Reply via email to