Author: stack
Date: Mon Feb 24 22:34:41 2014
New Revision: 1571466

URL: http://svn.apache.org/r1571466
Log:
HDFS-5776 Support 'hedged' reads in DFSClient

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1571466&r1=1571465&r2=1571466&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Feb 24 
22:34:41 2014
@@ -353,6 +353,8 @@ Release 2.4.0 - UNRELEASED
     HDFS-5698. Use protobuf to serialize / deserialize FSImage. (See breakdown
     of tasks below for features and contributors)
 
+    HDFS-5776 Support 'hedged' reads in DFSClient (Liang Xie via stack)
+
   IMPROVEMENTS
 
     HDFS-5781. Use an array to record the mapping between FSEditLogOpCode and 

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1571466&r1=1571465&r2=1571466&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 Mon Feb 24 22:34:41 2014
@@ -80,6 +80,10 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.SocketFactory;
 
@@ -173,6 +177,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenRenewer;
+import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum.Type;
 import org.apache.hadoop.util.Progressable;
@@ -222,6 +227,10 @@ public class DFSClient implements java.i
   private final CachingStrategy defaultReadCachingStrategy;
   private final CachingStrategy defaultWriteCachingStrategy;
   private final ClientContext clientContext;
+  private volatile long hedgedReadThresholdMillis;
+  private static DFSHedgedReadMetrics HEDGED_READ_METRIC =
+      new DFSHedgedReadMetrics();
+  private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
   
   /**
    * DFSClient configuration 
@@ -574,6 +583,15 @@ public class DFSClient implements java.i
     this.clientContext = ClientContext.get(
         conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
         dfsClientConf);
+    this.hedgedReadThresholdMillis = conf.getLong(
+        DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
+        DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS);
+    int numThreads = conf.getInt(
+        DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
+        DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE);
+    if (numThreads > 0) {
+      this.initThreadsNumForHedgedReads(numThreads);
+    }
   }
   
   /**
@@ -2714,4 +2732,64 @@ public class DFSClient implements java.i
       }
     }
   }
+
+  /**
+   * Create hedged reads thread pool, HEDGED_READ_THREAD_POOL, if
+   * it does not already exist.
+   * @param num Number of threads for hedged reads thread pool.
+   * If zero, skip hedged reads thread pool creation.
+   */
+  private synchronized void initThreadsNumForHedgedReads(int num) {
+    if (num <= 0 || HEDGED_READ_THREAD_POOL != null) return;
+    HEDGED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
+        TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+        new Daemon.DaemonFactory() {
+          private final AtomicInteger threadIndex =
+            new AtomicInteger(0); 
+          @Override
+          public Thread newThread(Runnable r) {
+            Thread t = super.newThread(r);
+            t.setName("hedgedRead-" +
+              threadIndex.getAndIncrement());
+            return t;
+          }
+        },
+        new ThreadPoolExecutor.CallerRunsPolicy() {
+
+      @Override
+      public void rejectedExecution(Runnable runnable,
+          ThreadPoolExecutor e) {
+        LOG.info("Execution rejected, Executing in current thread");
+        HEDGED_READ_METRIC.incHedgedReadOpsInCurThread();
+        // will run in the current thread
+        super.rejectedExecution(runnable, e);
+      }
+    });
+    HEDGED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Using hedged reads; pool threads=" + num);
+    }
+  }
+
+  long getHedgedReadTimeout() {
+    return this.hedgedReadThresholdMillis;
+  }
+
+  @VisibleForTesting
+  void setHedgedReadTimeout(long timeoutMillis) {
+    this.hedgedReadThresholdMillis = timeoutMillis;
+  }
+
+  ThreadPoolExecutor getHedgedReadsThreadPool() {
+    return HEDGED_READ_THREAD_POOL;
+  }
+
+  boolean isHedgedReadsEnabled() {
+    return (HEDGED_READ_THREAD_POOL != null) &&
+      HEDGED_READ_THREAD_POOL.getMaximumPoolSize() > 0;
+  }
+
+  DFSHedgedReadMetrics getHedgedReadMetrics() {
+    return HEDGED_READ_METRIC;
+  }
 }

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java?rev=1571466&r1=1571465&r2=1571466&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
 Mon Feb 24 22:34:41 2014
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 package org.apache.hadoop.hdfs;
-import java.io.IOException;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -46,4 +45,6 @@ public class DFSClientFaultInjector {
   public boolean failPacket() {
     return false;
   }
+
+  public void startFetchFromDatanode() {}
 }

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1571466&r1=1571465&r2=1571466&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 Mon Feb 24 22:34:41 2014
@@ -591,4 +591,14 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT 
= 500;
   public static final String  DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY = 
"dfs.http.client.failover.sleep.max.millis";
   public static final int     DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT = 
15000;
+
+  // hedged read properties
+  public static final String DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS =
+      "dfs.client.hedged.read.threshold.millis";
+  public static final long DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS =
+      500;
+
+  public static final String DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE =
+      "dfs.client.hedged.read.threadpool.size";
+  public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0;
 }

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1571466&r1=1571465&r2=1571466&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 Mon Feb 24 22:34:41 2014
@@ -17,13 +17,12 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -32,9 +31,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.ByteBufferUtil;
@@ -54,15 +58,12 @@ import org.apache.hadoop.hdfs.protocol.L
 import 
org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.unix.DomainSocket;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.IdentityHashStore;
@@ -555,7 +556,7 @@ implements ByteBufferReadable, CanSetDro
       assert (target==pos) : "Wrong postion " + pos + " expect " + target;
       long offsetIntoBlock = target - targetBlock.getStartOffset();
 
-      DNAddrPair retval = chooseDataNode(targetBlock);
+      DNAddrPair retval = chooseDataNode(targetBlock, null);
       chosenNode = retval.info;
       InetSocketAddress targetAddr = retval.addr;
 
@@ -863,32 +864,30 @@ implements ByteBufferReadable, CanSetDro
       corruptedBlockMap.put(blk, dnSet);
     }
   }
-      
-  private DNAddrPair chooseDataNode(LocatedBlock block)
-    throws IOException {
+
+  private DNAddrPair chooseDataNode(LocatedBlock block,
+      Collection<DatanodeInfo> ignoredNodes) throws IOException {
     while (true) {
       DatanodeInfo[] nodes = block.getLocations();
       try {
-        DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
-        final String dnAddr =
-            chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
-        if (DFSClient.LOG.isDebugEnabled()) {
-          DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
-        }
-        InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
-        return new DNAddrPair(chosenNode, targetAddr);
+        return getBestNodeDNAddrPair(nodes, ignoredNodes);
       } catch (IOException ie) {
+        String errMsg =
+          getBestNodeDNAddrPairErrorString(nodes, deadNodes, ignoredNodes);
         String blockInfo = block.getBlock() + " file=" + src;
         if (failures >= dfsClient.getMaxBlockAcquireFailures()) {
-          throw new BlockMissingException(src, "Could not obtain block: " + 
blockInfo,
-                                          block.getStartOffset());
+          String description = "Could not obtain block: " + blockInfo;
+          DFSClient.LOG.warn(description + errMsg
+              + ". Throwing a BlockMissingException");
+          throw new BlockMissingException(src, description,
+              block.getStartOffset());
         }
         
         if (nodes == null || nodes.length == 0) {
           DFSClient.LOG.info("No node available for " + blockInfo);
         }
         DFSClient.LOG.info("Could not obtain " + block.getBlock()
-            + " from any node: " + ie
+            + " from any node: " + ie + errMsg
             + ". Will get new block locations from namenode and retry...");
         try {
           // Introducing a random factor to the wait time before another retry.
@@ -914,21 +913,99 @@ implements ByteBufferReadable, CanSetDro
         continue;
       }
     }
-  } 
-      
+  }
+
+  /**
+   * Get the best node.
+   * @param nodes Nodes to choose from.
+   * @param ignoredNodes Do not chose nodes in this array (may be null)
+   * @return The DNAddrPair of the best node.
+   * @throws IOException
+   */
+  private DNAddrPair getBestNodeDNAddrPair(final DatanodeInfo[] nodes,
+      Collection<DatanodeInfo> ignoredNodes) throws IOException {
+    DatanodeInfo chosenNode = bestNode(nodes, deadNodes, ignoredNodes);
+    final String dnAddr =
+        chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
+    if (DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
+    }
+    InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
+    return new DNAddrPair(chosenNode, targetAddr);
+  }
+
+  private static String getBestNodeDNAddrPairErrorString(
+      DatanodeInfo nodes[], AbstractMap<DatanodeInfo,
+      DatanodeInfo> deadNodes, Collection<DatanodeInfo> ignoredNodes) {
+    StringBuilder errMsgr = new StringBuilder(
+        " No live nodes contain current block ");
+    errMsgr.append("Block locations:");
+    for (DatanodeInfo datanode : nodes) {
+      errMsgr.append(" ");
+      errMsgr.append(datanode.toString());
+    }
+    errMsgr.append(" Dead nodes: ");
+    for (DatanodeInfo datanode : deadNodes.keySet()) {
+      errMsgr.append(" ");
+      errMsgr.append(datanode.toString());
+    }
+    if (ignoredNodes != null) {
+      errMsgr.append(" Ignored nodes: ");
+      for (DatanodeInfo datanode : ignoredNodes) {
+        errMsgr.append(" ");
+        errMsgr.append(datanode.toString());
+      }
+    }
+    return errMsgr.toString();
+  }
+
   private void fetchBlockByteRange(LocatedBlock block, long start, long end,
       byte[] buf, int offset,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
-    //
-    // Connect to best DataNode for desired Block, with potential offset
-    //
+    block = getBlockAt(block.getStartOffset(), false);
+    while (true) {
+      DNAddrPair addressPair = chooseDataNode(block, null);
+      try {
+        actualGetFromOneDataNode(addressPair, block, start, end, buf, offset,
+            corruptedBlockMap);
+        return;
+      } catch (IOException e) {
+        // Ignore. Already processed inside the function.
+        // Loop through to try the next node.
+      }
+    }
+  }
+
+  private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
+      final LocatedBlock block, final long start, final long end,
+      final ByteBuffer bb,
+      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
+      final CountDownLatch latch) {
+    return new Callable<ByteBuffer>() {
+      @Override
+      public ByteBuffer call() throws Exception {
+        byte[] buf = bb.array();
+        int offset = bb.position();
+        actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
+            corruptedBlockMap);
+        latch.countDown();
+        return bb;
+      }
+    };
+  }
+
+  private void actualGetFromOneDataNode(final DNAddrPair datanode,
+      LocatedBlock block, final long start, final long end, byte[] buf,
+      int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      throws IOException {
+    DFSClientFaultInjector.get().startFetchFromDatanode();
     int refetchToken = 1; // only need to get a new access token once
     int refetchEncryptionKey = 1; // only need to get a new encryption key once
-    
+
     while (true) {
       // cached block locations may have been updated by chooseDataNode()
-      // or fetchBlockAt(). Always get the latest list of locations at the 
+      // or fetchBlockAt(). Always get the latest list of locations at the
       // start of the loop.
       CachingStrategy curCachingStrategy;
       boolean allowShortCircuitLocalReads;
@@ -937,11 +1014,10 @@ implements ByteBufferReadable, CanSetDro
         curCachingStrategy = cachingStrategy;
         allowShortCircuitLocalReads = !shortCircuitForbidden();
       }
-      DNAddrPair retval = chooseDataNode(block);
-      DatanodeInfo chosenNode = retval.info;
-      InetSocketAddress targetAddr = retval.addr;
+      DatanodeInfo chosenNode = datanode.info;
+      InetSocketAddress targetAddr = datanode.addr;
       BlockReader reader = null;
-          
+
       try {
         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
         int len = (int) (end - start + 1);
@@ -969,11 +1045,14 @@ implements ByteBufferReadable, CanSetDro
         }
         return;
       } catch (ChecksumException e) {
-        DFSClient.LOG.warn("fetchBlockByteRange(). Got a checksum exception 
for " +
-                 src + " at " + block.getBlock() + ":" + 
-                 e.getPos() + " from " + chosenNode);
+        String msg = "fetchBlockByteRange(). Got a checksum exception for "
+            + src + " at " + block.getBlock() + ":" + e.getPos() + " from "
+            + chosenNode;
+        DFSClient.LOG.warn(msg);
         // we want to remember what we have tried
         addIntoCorruptedBlockMap(block.getBlock(), chosenNode, 
corruptedBlockMap);
+        addToDeadNodes(chosenNode);
+        throw new IOException(msg);
       } catch (IOException e) {
         if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey 
> 0) {
           DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
@@ -985,22 +1064,164 @@ implements ByteBufferReadable, CanSetDro
           continue;
         } else if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
           refetchToken--;
-          fetchBlockAt(block.getStartOffset());
+          try {
+            fetchBlockAt(block.getStartOffset());
+          } catch (IOException fbae) {
+            // ignore IOE, since we can retry it later in a loop
+          }
           continue;
         } else {
-          DFSClient.LOG.warn("Failed to connect to " + targetAddr + 
-              " for file " + src + " for block " + block.getBlock() + ":" + e);
-          if (DFSClient.LOG.isDebugEnabled()) {
-            DFSClient.LOG.debug("Connection failure ", e);
-          }
+          String msg = "Failed to connect to " + targetAddr + " for file "
+              + src + " for block " + block.getBlock() + ":" + e;
+          DFSClient.LOG.warn("Connection failure: " + msg, e);
+          addToDeadNodes(chosenNode);
+          throw new IOException(msg);
         }
       } finally {
         if (reader != null) {
           reader.close();
         }
       }
-      // Put chosen node into dead list, continue
-      addToDeadNodes(chosenNode);
+    }
+  }
+
+  /**
+   * Like {@link #fetchBlockByteRange(LocatedBlock, long, long, byte[],
+   * int, Map)} except we start up a second, parallel, 'hedged' read
+   * if the first read is taking longer than configured amount of
+   * time.  We then wait on which ever read returns first.
+   * 
+   * @param block
+   * @param start
+   * @param end
+   * @param buf
+   * @param offset
+   * @param corruptedBlockMap
+   * @throws IOException
+   */
+  private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
+      long end, byte[] buf, int offset,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      throws IOException {
+    ArrayList<Future<ByteBuffer>> futures = null;
+    ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>();
+    ByteBuffer bb = null;
+    int len = (int) (end - start + 1);
+    block = getBlockAt(block.getStartOffset(), false);
+    // Latch shared by all outstanding reads.  First to finish closes
+    CountDownLatch hasReceivedResult = new CountDownLatch(1);
+    while (true) {
+      DNAddrPair chosenNode = null;
+      Future<ByteBuffer> future = null;
+      // futures is null if there is no request already executing.
+      if (futures == null) {
+        // chooseDataNode is a commitment.  If no node, we go to
+        // the NN to reget block locations.  Only go here on first read.
+        chosenNode = chooseDataNode(block, ignored);
+        bb = ByteBuffer.wrap(buf, offset, len);
+        future = getHedgedReadFuture(chosenNode, block, start, end, bb,
+          corruptedBlockMap, hasReceivedResult);
+        try {
+          future.get(dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS);
+          return;
+        } catch (TimeoutException e) {
+          if (DFSClient.LOG.isDebugEnabled()) {
+            DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout() +
+              "ms to read from " + chosenNode.info + "; spawning hedged read");
+          }
+          // Ignore this node on next go around.
+          ignored.add(chosenNode.info);
+          dfsClient.getHedgedReadMetrics().incHedgedReadOps();
+          futures = new ArrayList<Future<ByteBuffer>>();
+          futures.add(future);
+          continue; // no need to refresh block locations
+        } catch (InterruptedException e) {
+          // Ignore
+        } catch (ExecutionException e) {
+          // Ignore already logged in the call.
+        }
+      } else {
+        // We are starting up a 'hedged' read.  We have a read already
+        // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
+        // If no nodes to do hedged reads against, pass.
+        try {
+          chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored);
+          bb = ByteBuffer.allocate(len);
+          future = getHedgedReadFuture(chosenNode, block, start, end, bb,
+            corruptedBlockMap, hasReceivedResult);
+          futures.add(future);
+        } catch (IOException ioe) {
+          if (DFSClient.LOG.isDebugEnabled()) {
+            DFSClient.LOG.debug("Failed getting node for hedged read: " +
+              ioe.getMessage());
+          }
+        }
+        // if not succeeded. Submit callables for each datanode in a loop, wait
+        // for a fixed interval and get the result from the fastest one.
+        try {
+          ByteBuffer result = getFirstToComplete(futures, hasReceivedResult);
+          // cancel the rest.
+          cancelAll(futures);
+          if (result.array() != buf) { // compare the array pointers
+            dfsClient.getHedgedReadMetrics().incHedgedReadWins();
+            System.arraycopy(result.array(), result.position(), buf, offset,
+                len);
+          } else {
+            dfsClient.getHedgedReadMetrics().incHedgedReadOps();
+          }
+          return;
+        } catch (InterruptedException ie) {
+          // Ignore
+        } catch (ExecutionException e) {
+          // exception already handled in the call method. getFirstToComplete
+          // will remove the failing future from the list. nothing more to do.
+        }
+        // We got here if exception.  Ignore this node on next go around.
+        ignored.add(chosenNode.info);
+      }
+      // executed if we get an error from a data node
+      block = getBlockAt(block.getStartOffset(), false);
+    }
+  }
+
+  private Future<ByteBuffer> getHedgedReadFuture(final DNAddrPair chosenNode,
+      final LocatedBlock block, long start,
+      final long end, final ByteBuffer bb,
+      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
+      final CountDownLatch hasReceivedResult) {
+    Callable<ByteBuffer> getFromDataNodeCallable =
+        getFromOneDataNode(chosenNode, block, start, end, bb,
+          corruptedBlockMap, hasReceivedResult);
+    return 
dfsClient.getHedgedReadsThreadPool().submit(getFromDataNodeCallable);
+  }
+
+  private ByteBuffer getFirstToComplete(ArrayList<Future<ByteBuffer>> futures,
+      CountDownLatch latch) throws ExecutionException, InterruptedException {
+    latch.await();
+    for (Future<ByteBuffer> future : futures) {
+      if (future.isDone()) {
+        try {
+          return future.get();
+        } catch (ExecutionException e) {
+          // already logged in the Callable
+          futures.remove(future);
+          throw e;
+        }
+      }
+    }
+    throw new InterruptedException("latch has counted down to zero but no"
+        + "result available yet, for safety try to request another one from"
+        + "outside loop, this should be rare");
+  }
+
+  private void cancelAll(List<Future<ByteBuffer>> futures) {
+    for (Future<ByteBuffer> future : futures) {
+      // Unfortunately, hdfs reads do not take kindly to interruption.
+      // Threads return a variety of interrupted-type exceptions but
+      // also complaints about invalid pbs -- likely because read
+      // is interrupted before gets whole pb.  Also verbose WARN
+      // logging.  So, for now, do not interrupt running read.
+      future.cancel(false);
     }
   }
 
@@ -1070,8 +1291,13 @@ implements ByteBufferReadable, CanSetDro
       long targetStart = position - blk.getStartOffset();
       long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
       try {
-        fetchBlockByteRange(blk, targetStart, 
-            targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
+        if (dfsClient.isHedgedReadsEnabled()) {
+          hedgedFetchBlockByteRange(blk, targetStart, targetStart + bytesToRead
+              - 1, buffer, offset, corruptedBlockMap);
+        } else {
+          fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
+              buffer, offset, corruptedBlockMap);
+        }
       } finally {
         // Check and report if any block replicas are corrupted.
         // BlockMissingException may be caught if all block replicas are
@@ -1265,12 +1491,13 @@ implements ByteBufferReadable, CanSetDro
    * Pick the best node from which to stream the data.
    * Entries in <i>nodes</i> are already in the priority order
    */
-  static DatanodeInfo bestNode(DatanodeInfo nodes[], 
-                               AbstractMap<DatanodeInfo, DatanodeInfo> 
deadNodes)
-                               throws IOException {
-    if (nodes != null) { 
+  static DatanodeInfo bestNode(DatanodeInfo nodes[],
+      AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes,
+      Collection<DatanodeInfo> ignoredNodes) throws IOException {
+    if (nodes != null) {
       for (int i = 0; i < nodes.length; i++) {
-        if (!deadNodes.containsKey(nodes[i])) {
+        if (!deadNodes.containsKey(nodes[i])
+            && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
           return nodes[i];
         }
       }

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java?rev=1571466&r1=1571465&r2=1571466&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
 Mon Feb 24 22:34:41 2014
@@ -20,9 +20,14 @@ package org.apache.hadoop.hdfs;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
@@ -33,6 +38,9 @@ import org.apache.hadoop.hdfs.protocol.d
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.log4j.Level;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 /**
  * This class tests the DFS positional read functionality in a single node
@@ -44,9 +52,10 @@ public class TestPread {
   boolean simulatedStorage = false;
 
   private void writeFile(FileSystem fileSys, Path name) throws IOException {
+    int replication = 3;// We need > 1 blocks to test out the hedged reads.
     // test empty file open and read
     DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 0,
-        blockSize, (short) 1, seed);
+      blockSize, (short)replication, seed);
     FSDataInputStream in = fileSys.open(name);
     byte[] buffer = new byte[12 * blockSize];
     in.readFully(0, buffer, 0, 0);
@@ -191,26 +200,128 @@ public class TestPread {
     assertTrue(fileSys.delete(name, true));
     assertTrue(!fileSys.exists(name));
   }
-  
+
+  private Callable<Void> getPReadFileCallable(final FileSystem fileSys,
+      final Path file) {
+    return new Callable<Void>() {
+      public Void call() throws IOException {
+        pReadFile(fileSys, file);
+        return null;
+      }
+    };
+  }
+
   /**
    * Tests positional read in DFS.
    */
   @Test
   public void testPreadDFS() throws IOException {
-    dfsPreadTest(false, true); //normal pread
-    dfsPreadTest(true, true); //trigger read code path without transferTo.
+    Configuration conf = new Configuration();
+    dfsPreadTest(conf, false, true); // normal pread
+    dfsPreadTest(conf, true, true); // trigger read code path without
+                                    // transferTo.
   }
   
   @Test
   public void testPreadDFSNoChecksum() throws IOException {
+    Configuration conf = new Configuration();
     ((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL);
-    dfsPreadTest(false, false);
-    dfsPreadTest(true, false);
+    dfsPreadTest(conf, false, false);
+    dfsPreadTest(conf, true, false);
   }
   
-  private void dfsPreadTest(boolean disableTransferTo, boolean verifyChecksum)
+  /**
+   * Tests positional read in DFS, with hedged reads enabled.
+   */
+  @Test
+  public void testHedgedPreadDFSBasic() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
+    conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 
100);
+    dfsPreadTest(conf, false, true); // normal pread
+    dfsPreadTest(conf, true, true); // trigger read code path without
+                                    // transferTo.
+  }
+
+  @Test
+  public void testMaxOutHedgedReadPool() throws IOException,
+      InterruptedException, ExecutionException {
+    Configuration conf = new Configuration();
+    int numHedgedReadPoolThreads = 5;
+    final int initialHedgedReadTimeoutMillis = 500;
+    final int fixedSleepIntervalMillis = 50;
+    conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
+        numHedgedReadPoolThreads);
+    conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
+        initialHedgedReadTimeoutMillis);
+
+    // Set up the InjectionHandler
+    DFSClientFaultInjector.instance = Mockito
+        .mock(DFSClientFaultInjector.class);
+    DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
+    // make preads sleep for 50ms
+    Mockito.doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        Thread.sleep(fixedSleepIntervalMillis);
+        return null;
+      }
+    }).when(injector).startFetchFromDatanode();
+
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+        .format(true).build();
+    DistributedFileSystem fileSys = cluster.getFileSystem();
+    DFSClient dfsClient = fileSys.getClient();
+    DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
+
+    try {
+      Path file1 = new Path("hedgedReadMaxOut.dat");
+      writeFile(fileSys, file1);
+      // Basic test. Reads complete within timeout. Assert that there were no
+      // hedged reads.
+      pReadFile(fileSys, file1);
+      // assert that there were no hedged reads. 50ms + delta < 500ms
+      assertTrue(metrics.getHedgedReadOps() == 0);
+      assertTrue(metrics.getHedgedReadOpsInCurThread() == 0);
+      /*
+       * Reads take longer than timeout. But, only one thread reading. Assert
+       * that there were hedged reads. But, none of the reads had to run in the
+       * current thread.
+       */
+      dfsClient.setHedgedReadTimeout(50); // 50ms
+      pReadFile(fileSys, file1);
+      // assert that there were hedged reads
+      assertTrue(metrics.getHedgedReadOps() > 0);
+      assertTrue(metrics.getHedgedReadOpsInCurThread() == 0);
+      /*
+       * Multiple threads reading. Reads take longer than timeout. Assert that
+       * there were hedged reads. And that reads had to run in the current
+       * thread.
+       */
+      int factor = 10;
+      int numHedgedReads = numHedgedReadPoolThreads * factor;
+      long initialReadOpsValue = metrics.getHedgedReadOps();
+      ExecutorService executor = Executors.newFixedThreadPool(numHedgedReads);
+      ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
+      for (int i = 0; i < numHedgedReads; i++) {
+        futures.add(executor.submit(getPReadFileCallable(fileSys, file1)));
+      }
+      for (int i = 0; i < numHedgedReads; i++) {
+        futures.get(i).get();
+      }
+      assertTrue(metrics.getHedgedReadOps() > initialReadOpsValue);
+      assertTrue(metrics.getHedgedReadOpsInCurThread() > 0);
+      cleanupFile(fileSys, file1);
+      executor.shutdown();
+    } finally {
+      fileSys.close();
+      cluster.shutdown();
+      Mockito.reset(injector);
+    }
+  }
+
+  private void dfsPreadTest(Configuration conf, boolean disableTransferTo, 
boolean verifyChecksum)
       throws IOException {
-    Configuration conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
     conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
     if (simulatedStorage) {


Reply via email to