Repository: hadoop
Updated Branches:
  refs/heads/trunk 1043795f7 -> 08bb6c49a


HDFS-13926. ThreadLocal aggregations for FileSystem.Statistics are incorrect 
with striped reads.
Contributed by Xiao Chen, Hrishikesh Gadre.

Signed-off-by: Xiao Chen <x...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/08bb6c49
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/08bb6c49
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/08bb6c49

Branch: refs/heads/trunk
Commit: 08bb6c49a5aec32b7d9f29238560f947420405d6
Parents: 1043795
Author: Hrishikesh Gadre <hga...@apache.org>
Authored: Mon Oct 8 20:30:53 2018 -0700
Committer: Xiao Chen <x...@apache.org>
Committed: Mon Oct 8 20:31:57 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |  8 +++
 .../hadoop/hdfs/DFSStripedInputStream.java      | 22 +++++++
 .../org/apache/hadoop/hdfs/ReaderStrategy.java  | 15 -----
 .../org/apache/hadoop/hdfs/StripeReader.java    | 23 ++++---
 .../apache/hadoop/hdfs/util/IOUtilsClient.java  | 10 ++-
 .../hadoop/hdfs/util/StripedBlockUtil.java      | 65 ++++++++++++++++++--
 .../erasurecode/ErasureCodingWorker.java        |  3 +-
 .../erasurecode/StripedBlockReader.java         | 14 +++--
 .../datanode/erasurecode/StripedReader.java     | 17 ++---
 .../erasurecode/StripedReconstructor.java       |  3 +-
 .../TestDistributedFileSystemWithECFile.java    | 44 +++++++++++++
 11 files changed, 178 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index e5640d2..52ed1d4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -90,6 +90,8 @@ import com.google.common.annotations.VisibleForTesting;
 
 import javax.annotation.Nonnull;
 
+import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
+
 /****************************************************************
  * DFSInputStream provides bytes from a named file.  It handles
  * negotiation of the namenode and various datanodes as necessary.
@@ -769,6 +771,12 @@ public class DFSInputStream extends FSInputStream
             // got a EOS from reader though we expect more data on it.
             throw new IOException("Unexpected EOS from the reader");
           }
+          updateReadStatistics(readStatistics, result, blockReader);
+          dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
+              result);
+          if (readStatistics.getBlockType() == BlockType.STRIPED) {
+            dfsClient.updateFileSystemECReadStats(result);
+          }
           return result;
         } catch (ChecksumException ce) {
           throw ce;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 5557a50..3f688d4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -54,6 +54,8 @@ import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadPoolExecutor;
 
+import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
+
 /**
  * DFSStripedInputStream reads from striped block groups.
  */
@@ -329,6 +331,26 @@ public class DFSStripedInputStream extends DFSInputStream {
   }
 
   /**
+   * Update read statistics. Note that this has to be done on the thread that
+   * initiates the read, rather than inside each async thread, for
+   * {@link org.apache.hadoop.fs.FileSystem.Statistics} to work correctly with
+   * its ThreadLocal.
+   *
+   * @param stats striped read stats
+   */
+  void updateReadStats(final StripedBlockUtil.BlockReadStats stats) {
+    if (stats == null) {
+      return;
+    }
+    updateReadStatistics(readStatistics, stats.getBytesRead(),
+        stats.isShortCircuit(), stats.getNetworkDistance());
+    dfsClient.updateFileSystemReadStats(stats.getNetworkDistance(),
+        stats.getBytesRead());
+    assert readStatistics.getBlockType() == BlockType.STRIPED;
+    dfsClient.updateFileSystemECReadStats(stats.getBytesRead());
+  }
+
+  /**
    * Seek to a new arbitrary location.
    */
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java
index 39ad2ff..4d5e741 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java
@@ -17,11 +17,8 @@
  */
 package org.apache.hadoop.hdfs;
 
-import org.apache.hadoop.hdfs.protocol.BlockType;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
 
 /**
  * Wraps different possible read implementations so that callers can be
@@ -120,12 +117,6 @@ class ByteArrayStrategy implements ReaderStrategy {
                            int length) throws IOException {
     int nRead = blockReader.read(readBuf, offset, length);
     if (nRead > 0) {
-      updateReadStatistics(readStatistics, nRead, blockReader);
-      dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
-          nRead);
-      if (readStatistics.getBlockType() == BlockType.STRIPED) {
-        dfsClient.updateFileSystemECReadStats(nRead);
-      }
       offset += nRead;
     }
     return nRead;
@@ -190,12 +181,6 @@ class ByteBufferStrategy implements ReaderStrategy {
     // Only when data are read, update the position
     if (nRead > 0) {
       readBuf.position(readBuf.position() + nRead);
-      updateReadStatistics(readStatistics, nRead, blockReader);
-      dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
-          nRead);
-      if (readStatistics.getBlockType() == BlockType.STRIPED) {
-        dfsClient.updateFileSystemECReadStats(nRead);
-      }
     }
 
     return nRead;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
index 0554ebe..e90af84 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
@@ -105,9 +106,10 @@ abstract class StripeReader {
     }
   }
 
-  protected final Map<Future<Void>, Integer> futures = new HashMap<>();
+  private final Map<Future<BlockReadStats>, Integer> futures =
+      new HashMap<>();
   protected final AlignedStripe alignedStripe;
-  protected final CompletionService<Void> service;
+  private final CompletionService<BlockReadStats> service;
   protected final LocatedBlock[] targetBlocks;
   protected final CorruptedBlocks corruptedBlocks;
   protected final BlockReaderInfo[] readerInfos;
@@ -257,7 +259,7 @@ abstract class StripeReader {
     }
   }
 
-  private Callable<Void> readCells(final BlockReader reader,
+  private Callable<BlockReadStats> readCells(final BlockReader reader,
       final DatanodeInfo datanode, final long currentReaderOffset,
       final long targetReaderOffset, final ByteBufferStrategy[] strategies,
       final ExtendedBlock currentBlock) {
@@ -275,10 +277,13 @@ abstract class StripeReader {
             skipped == targetReaderOffset - currentReaderOffset);
       }
 
+      int ret = 0;
       for (ByteBufferStrategy strategy : strategies) {
-        readToBuffer(reader, datanode, strategy, currentBlock);
+        int bytesReead = readToBuffer(reader, datanode, strategy, 
currentBlock);
+        ret += bytesReead;
       }
-      return null;
+      return new BlockReadStats(ret, reader.isShortCircuit(),
+          reader.getNetworkDistance());
     };
   }
 
@@ -303,13 +308,14 @@ abstract class StripeReader {
     }
 
     chunk.state = StripingChunk.PENDING;
-    Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader,
+    Callable<BlockReadStats> readCallable =
+        readCells(readerInfos[chunkIndex].reader,
         readerInfos[chunkIndex].datanode,
         readerInfos[chunkIndex].blockReaderOffset,
         alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
         block.getBlock());
 
-    Future<Void> request = service.submit(readCallable);
+    Future<BlockReadStats> request = service.submit(readCallable);
     futures.put(request, chunkIndex);
     return true;
   }
@@ -342,6 +348,7 @@ abstract class StripeReader {
       try {
         StripingChunkReadResult r = StripedBlockUtil
             .getNextCompletedStripedRead(service, futures, 0);
+        dfsStripedInputStream.updateReadStats(r.getReadStats());
         if (DFSClient.LOG.isDebugEnabled()) {
           DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
               + alignedStripe);
@@ -460,7 +467,7 @@ abstract class StripeReader {
   }
 
   void clearFutures() {
-    for (Future<Void> future : futures.keySet()) {
+    for (Future future : futures.keySet()) {
       future.cancel(false);
     }
     futures.clear();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java
index 85e9cee..25d8a0f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java
@@ -48,13 +48,19 @@ public class IOUtilsClient {
 
   public static void updateReadStatistics(ReadStatistics readStatistics,
                                       int nRead, BlockReader blockReader) {
+    updateReadStatistics(readStatistics, nRead, blockReader.isShortCircuit(),
+        blockReader.getNetworkDistance());
+  }
+
+  public static void updateReadStatistics(ReadStatistics readStatistics,
+      int nRead, boolean isShortCircuit, int networkDistance) {
     if (nRead <= 0) {
       return;
     }
 
-    if (blockReader.isShortCircuit()) {
+    if (isShortCircuit) {
       readStatistics.addShortCircuitBytes(nRead);
-    } else if (blockReader.getNetworkDistance() == 0) {
+    } else if (networkDistance == 0) {
       readStatistics.addLocalBytes(nRead);
     } else {
       readStatistics.addRemoteBytes(nRead);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 0425927..2245757 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -77,6 +77,48 @@ public class StripedBlockUtil {
       LoggerFactory.getLogger(StripedBlockUtil.class);
 
   /**
+   * Struct holding the read statistics. This is used when reads are done
+   * asynchronously, to allow the async threads return the read stats and let
+   * the main reading thread to update the stats. This is important for the
+   * ThreadLocal stats for the main reading thread to be correct.
+   */
+  public static class BlockReadStats {
+    private final int bytesRead;
+    private final boolean isShortCircuit;
+    private final int networkDistance;
+
+    public BlockReadStats(int numBytesRead, boolean shortCircuit,
+        int distance) {
+      bytesRead = numBytesRead;
+      isShortCircuit = shortCircuit;
+      networkDistance = distance;
+    }
+
+    public int getBytesRead() {
+      return bytesRead;
+    }
+
+    public boolean isShortCircuit() {
+      return isShortCircuit;
+    }
+
+    public int getNetworkDistance() {
+      return networkDistance;
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder();
+      sb.append("bytesRead=").append(bytesRead);
+      sb.append(',');
+      sb.append("isShortCircuit=").append(isShortCircuit);
+      sb.append(',');
+      sb.append("networkDistance=").append(networkDistance);
+      return sb.toString();
+    }
+  }
+
+  /**
    * This method parses a striped block group into individual blocks.
    *
    * @param bg The striped block group
@@ -245,10 +287,11 @@ public class StripedBlockUtil {
    * @throws InterruptedException
    */
   public static StripingChunkReadResult getNextCompletedStripedRead(
-      CompletionService<Void> readService, Map<Future<Void>, Integer> futures,
+      CompletionService<BlockReadStats> readService,
+      Map<Future<BlockReadStats>, Integer> futures,
       final long timeoutMillis) throws InterruptedException {
     Preconditions.checkArgument(!futures.isEmpty());
-    Future<Void> future = null;
+    Future<BlockReadStats> future = null;
     try {
       if (timeoutMillis > 0) {
         future = readService.poll(timeoutMillis, TimeUnit.MILLISECONDS);
@@ -256,9 +299,9 @@ public class StripedBlockUtil {
         future = readService.take();
       }
       if (future != null) {
-        future.get();
+        final BlockReadStats stats = future.get();
         return new StripingChunkReadResult(futures.remove(future),
-            StripingChunkReadResult.SUCCESSFUL);
+            StripingChunkReadResult.SUCCESSFUL, stats);
       } else {
         return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT);
       }
@@ -881,24 +924,36 @@ public class StripedBlockUtil {
 
     public final int index;
     public final int state;
+    private final BlockReadStats readStats;
 
     public StripingChunkReadResult(int state) {
       Preconditions.checkArgument(state == TIMEOUT,
           "Only timeout result should return negative index.");
       this.index = -1;
       this.state = state;
+      this.readStats = null;
     }
 
     public StripingChunkReadResult(int index, int state) {
+      this(index, state, null);
+    }
+
+    public StripingChunkReadResult(int index, int state, BlockReadStats stats) 
{
       Preconditions.checkArgument(state != TIMEOUT,
           "Timeout result should return negative index.");
       this.index = index;
       this.state = state;
+      this.readStats = stats;
+    }
+
+    public BlockReadStats getReadStats() {
+      return readStats;
     }
 
     @Override
     public String toString() {
-      return "(index=" + index + ", state =" + state + ")";
+      return "(index=" + index + ", state =" + state + ", readStats ="
+          + readStats + ")";
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index 45e29ff..f9063b7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 
@@ -161,7 +162,7 @@ public final class ErasureCodingWorker {
     return conf;
   }
 
-  CompletionService<Void> createReadService() {
+  CompletionService<BlockReadStats> createReadService() {
     return new ExecutorCompletionService<>(stripedReadPool);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
index cbef318..0db8a6f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
@@ -161,16 +162,15 @@ class StripedBlockReader {
     }
   }
 
-  Callable<Void> readFromBlock(final int length,
+  Callable<BlockReadStats> readFromBlock(final int length,
                                final CorruptedBlocks corruptedBlocks) {
-    return new Callable<Void>() {
+    return new Callable<BlockReadStats>() {
 
       @Override
-      public Void call() throws Exception {
+      public BlockReadStats call() throws Exception {
         try {
           getReadBuffer().limit(length);
-          actualReadFromBlock();
-          return null;
+          return actualReadFromBlock();
         } catch (ChecksumException e) {
           LOG.warn("Found Checksum error for {} from {} at {}", block,
               source, e.getPos());
@@ -187,7 +187,7 @@ class StripedBlockReader {
   /**
    * Perform actual reading of bytes from block.
    */
-  private void actualReadFromBlock() throws IOException {
+  private BlockReadStats actualReadFromBlock() throws IOException {
     int len = buffer.remaining();
     int n = 0;
     while (n < len) {
@@ -198,6 +198,8 @@ class StripedBlockReader {
       n += nread;
       stripedReader.getReconstructor().incrBytesRead(isLocal, nread);
     }
+    return new BlockReadStats(n, blockReader.isShortCircuit(),
+        blockReader.getNetworkDistance());
   }
 
   // close block reader

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
index 96f9791..98edf72 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
 import org.apache.hadoop.util.DataChecksum;
 import org.slf4j.Logger;
@@ -80,8 +81,8 @@ class StripedReader {
 
   private final List<StripedBlockReader> readers;
 
-  private final Map<Future<Void>, Integer> futures = new HashMap<>();
-  private final CompletionService<Void> readService;
+  private final Map<Future<BlockReadStats>, Integer> futures = new HashMap<>();
+  private final CompletionService<BlockReadStats> readService;
 
   StripedReader(StripedReconstructor reconstructor, DataNode datanode,
       Configuration conf, StripedReconstructionInfo stripedReconInfo) {
@@ -289,9 +290,9 @@ class StripedReader {
       int toRead = getReadLength(liveIndices[successList[i]],
           reconstructLength);
       if (toRead > 0) {
-        Callable<Void> readCallable =
+        Callable<BlockReadStats> readCallable =
             reader.readFromBlock(toRead, corruptedBlocks);
-        Future<Void> f = readService.submit(readCallable);
+        Future<BlockReadStats> f = readService.submit(readCallable);
         futures.put(f, successList[i]);
       } else {
         // If the read length is 0, we don't need to do real read
@@ -411,9 +412,9 @@ class StripedReader {
 
     // step3: schedule if find a correct source DN and need to do real read.
     if (reader != null) {
-      Callable<Void> readCallable =
+      Callable<BlockReadStats> readCallable =
           reader.readFromBlock(toRead, corruptedBlocks);
-      Future<Void> f = readService.submit(readCallable);
+      Future<BlockReadStats> f = readService.submit(readCallable);
       futures.put(f, m);
       used.set(m);
     }
@@ -422,8 +423,8 @@ class StripedReader {
   }
 
   // Cancel all reads.
-  private static void cancelReads(Collection<Future<Void>> futures) {
-    for (Future<Void> future : futures) {
+  private static void cancelReads(Collection<Future<BlockReadStats>> futures) {
+    for (Future<BlockReadStats> future : futures) {
       future.cancel(true);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
index bbffcf5..a1f4c7f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
 import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.io.erasurecode.CodecUtil;
@@ -222,7 +223,7 @@ abstract class StripedReconstructor {
     return cachingStrategy;
   }
 
-  CompletionService<Void> createReadService() {
+  CompletionService<BlockReadStats> createReadService() {
     return erasureCodingWorker.createReadService();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystemWithECFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystemWithECFile.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystemWithECFile.java
index 0a3010f..1a2c4de 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystemWithECFile.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystemWithECFile.java
@@ -21,16 +21,21 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.IOUtils;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -61,6 +66,9 @@ public class TestDistributedFileSystemWithECFile {
     return StripedFileTestUtil.getDefaultECPolicy();
   }
 
+  @Rule
+  public final Timeout globalTimeout = new Timeout(60000 * 3);
+
   @Before
   public void setup() throws IOException {
     ecPolicy = getEcPolicy();
@@ -249,4 +257,40 @@ public class TestDistributedFileSystemWithECFile {
     assertEquals(rs63, fs.getErasureCodingPolicy(ecFile));
     assertEquals(rs32, fs.getErasureCodingPolicy(ecFile2));
   }
+
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testStatistics() throws Exception {
+    final String fileName = "/ec/file";
+    final int size = 3200;
+    createFile(fileName, size);
+    InputStream in = null;
+    try {
+      in = fs.open(new Path(fileName));
+      IOUtils.copyBytes(in, System.out, 4096, false);
+    } finally {
+      IOUtils.closeStream(in);
+    }
+
+    // verify stats are correct
+    Long totalBytesRead = 0L;
+    Long ecBytesRead = 0L;
+    for (FileSystem.Statistics stat : FileSystem.getAllStatistics()) {
+      totalBytesRead += stat.getBytesRead();
+      ecBytesRead += stat.getBytesReadErasureCoded();
+    }
+    assertEquals(Long.valueOf(size), totalBytesRead);
+    assertEquals(Long.valueOf(size), ecBytesRead);
+
+    // verify thread local stats are correct
+    Long totalBytesReadThread = 0L;
+    Long ecBytesReadThread = 0L;
+    for (FileSystem.Statistics stat : FileSystem.getAllStatistics()) {
+      FileSystem.Statistics.StatisticsData data = stat.getThreadStatistics();
+      totalBytesReadThread += data.getBytesRead();
+      ecBytesReadThread += data.getBytesReadErasureCoded();
+    }
+    assertEquals(Long.valueOf(size), totalBytesReadThread);
+    assertEquals(Long.valueOf(size), ecBytesReadThread);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to