Repository: hadoop Updated Branches: refs/heads/trunk 1043795f7 -> 08bb6c49a
HDFS-13926. ThreadLocal aggregations for FileSystem.Statistics are incorrect with striped reads. Contributed by Xiao Chen, Hrishikesh Gadre. Signed-off-by: Xiao Chen <x...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/08bb6c49 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/08bb6c49 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/08bb6c49 Branch: refs/heads/trunk Commit: 08bb6c49a5aec32b7d9f29238560f947420405d6 Parents: 1043795 Author: Hrishikesh Gadre <hga...@apache.org> Authored: Mon Oct 8 20:30:53 2018 -0700 Committer: Xiao Chen <x...@apache.org> Committed: Mon Oct 8 20:31:57 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSInputStream.java | 8 +++ .../hadoop/hdfs/DFSStripedInputStream.java | 22 +++++++ .../org/apache/hadoop/hdfs/ReaderStrategy.java | 15 ----- .../org/apache/hadoop/hdfs/StripeReader.java | 23 ++++--- .../apache/hadoop/hdfs/util/IOUtilsClient.java | 10 ++- .../hadoop/hdfs/util/StripedBlockUtil.java | 65 ++++++++++++++++++-- .../erasurecode/ErasureCodingWorker.java | 3 +- .../erasurecode/StripedBlockReader.java | 14 +++-- .../datanode/erasurecode/StripedReader.java | 17 ++--- .../erasurecode/StripedReconstructor.java | 3 +- .../TestDistributedFileSystemWithECFile.java | 44 +++++++++++++ 11 files changed, 178 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index e5640d2..52ed1d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -90,6 +90,8 @@ import com.google.common.annotations.VisibleForTesting; import javax.annotation.Nonnull; +import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics; + /**************************************************************** * DFSInputStream provides bytes from a named file. It handles * negotiation of the namenode and various datanodes as necessary. @@ -769,6 +771,12 @@ public class DFSInputStream extends FSInputStream // got a EOS from reader though we expect more data on it. throw new IOException("Unexpected EOS from the reader"); } + updateReadStatistics(readStatistics, result, blockReader); + dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(), + result); + if (readStatistics.getBlockType() == BlockType.STRIPED) { + dfsClient.updateFileSystemECReadStats(result); + } return result; } catch (ChecksumException ce) { throw ce; http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 5557a50..3f688d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -54,6 +54,8 @@ import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; +import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics; + /** * DFSStripedInputStream reads from striped block groups. */ @@ -329,6 +331,26 @@ public class DFSStripedInputStream extends DFSInputStream { } /** + * Update read statistics. Note that this has to be done on the thread that + * initiates the read, rather than inside each async thread, for + * {@link org.apache.hadoop.fs.FileSystem.Statistics} to work correctly with + * its ThreadLocal. + * + * @param stats striped read stats + */ + void updateReadStats(final StripedBlockUtil.BlockReadStats stats) { + if (stats == null) { + return; + } + updateReadStatistics(readStatistics, stats.getBytesRead(), + stats.isShortCircuit(), stats.getNetworkDistance()); + dfsClient.updateFileSystemReadStats(stats.getNetworkDistance(), + stats.getBytesRead()); + assert readStatistics.getBlockType() == BlockType.STRIPED; + dfsClient.updateFileSystemECReadStats(stats.getBytesRead()); + } + + /** * Seek to a new arbitrary location. */ @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java index 39ad2ff..4d5e741 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java @@ -17,11 +17,8 @@ */ package org.apache.hadoop.hdfs; -import org.apache.hadoop.hdfs.protocol.BlockType; - import java.io.IOException; import java.nio.ByteBuffer; -import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics; /** * Wraps different possible read implementations so that callers can be @@ -120,12 +117,6 @@ class ByteArrayStrategy implements ReaderStrategy { int length) throws IOException { int nRead = blockReader.read(readBuf, offset, length); if (nRead > 0) { - updateReadStatistics(readStatistics, nRead, blockReader); - dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(), - nRead); - if (readStatistics.getBlockType() == BlockType.STRIPED) { - dfsClient.updateFileSystemECReadStats(nRead); - } offset += nRead; } return nRead; @@ -190,12 +181,6 @@ class ByteBufferStrategy implements ReaderStrategy { // Only when data are read, update the position if (nRead > 0) { readBuf.position(readBuf.position() + nRead); - updateReadStatistics(readStatistics, nRead, blockReader); - dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(), - nRead); - if (readStatistics.getBlockType() == BlockType.STRIPED) { - dfsClient.updateFileSystemECReadStats(nRead); - } } return nRead; http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java index 0554ebe..e90af84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats; import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk; import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe; import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; @@ -105,9 +106,10 @@ abstract class StripeReader { } } - protected final Map<Future<Void>, Integer> futures = new HashMap<>(); + private final Map<Future<BlockReadStats>, Integer> futures = + new HashMap<>(); protected final AlignedStripe alignedStripe; - protected final CompletionService<Void> service; + private final CompletionService<BlockReadStats> service; protected final LocatedBlock[] targetBlocks; protected final CorruptedBlocks corruptedBlocks; protected final BlockReaderInfo[] readerInfos; @@ -257,7 +259,7 @@ abstract class StripeReader { } } - private Callable<Void> readCells(final BlockReader reader, + private Callable<BlockReadStats> readCells(final BlockReader reader, final DatanodeInfo datanode, final long currentReaderOffset, final long targetReaderOffset, final ByteBufferStrategy[] strategies, final ExtendedBlock currentBlock) { @@ -275,10 +277,13 @@ abstract class StripeReader { skipped == targetReaderOffset - currentReaderOffset); } + int ret = 0; for (ByteBufferStrategy strategy : strategies) { - readToBuffer(reader, datanode, strategy, currentBlock); + int bytesReead = readToBuffer(reader, datanode, strategy, currentBlock); + ret += bytesReead; } - return null; + return new BlockReadStats(ret, reader.isShortCircuit(), + reader.getNetworkDistance()); }; } @@ -303,13 +308,14 @@ abstract class StripeReader { } chunk.state = StripingChunk.PENDING; - Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader, + Callable<BlockReadStats> readCallable = + readCells(readerInfos[chunkIndex].reader, readerInfos[chunkIndex].datanode, readerInfos[chunkIndex].blockReaderOffset, alignedStripe.getOffsetInBlock(), getReadStrategies(chunk), block.getBlock()); - Future<Void> request = service.submit(readCallable); + Future<BlockReadStats> request = service.submit(readCallable); futures.put(request, chunkIndex); return true; } @@ -342,6 +348,7 @@ abstract class StripeReader { try { StripingChunkReadResult r = StripedBlockUtil .getNextCompletedStripedRead(service, futures, 0); + dfsStripedInputStream.updateReadStats(r.getReadStats()); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Read task returned: " + r + ", for stripe " + alignedStripe); @@ -460,7 +467,7 @@ abstract class StripeReader { } void clearFutures() { - for (Future<Void> future : futures.keySet()) { + for (Future future : futures.keySet()) { future.cancel(false); } futures.clear(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java index 85e9cee..25d8a0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java @@ -48,13 +48,19 @@ public class IOUtilsClient { public static void updateReadStatistics(ReadStatistics readStatistics, int nRead, BlockReader blockReader) { + updateReadStatistics(readStatistics, nRead, blockReader.isShortCircuit(), + blockReader.getNetworkDistance()); + } + + public static void updateReadStatistics(ReadStatistics readStatistics, + int nRead, boolean isShortCircuit, int networkDistance) { if (nRead <= 0) { return; } - if (blockReader.isShortCircuit()) { + if (isShortCircuit) { readStatistics.addShortCircuitBytes(nRead); - } else if (blockReader.getNetworkDistance() == 0) { + } else if (networkDistance == 0) { readStatistics.addLocalBytes(nRead); } else { readStatistics.addRemoteBytes(nRead); http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 0425927..2245757 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -77,6 +77,48 @@ public class StripedBlockUtil { LoggerFactory.getLogger(StripedBlockUtil.class); /** + * Struct holding the read statistics. This is used when reads are done + * asynchronously, to allow the async threads return the read stats and let + * the main reading thread to update the stats. This is important for the + * ThreadLocal stats for the main reading thread to be correct. + */ + public static class BlockReadStats { + private final int bytesRead; + private final boolean isShortCircuit; + private final int networkDistance; + + public BlockReadStats(int numBytesRead, boolean shortCircuit, + int distance) { + bytesRead = numBytesRead; + isShortCircuit = shortCircuit; + networkDistance = distance; + } + + public int getBytesRead() { + return bytesRead; + } + + public boolean isShortCircuit() { + return isShortCircuit; + } + + public int getNetworkDistance() { + return networkDistance; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(); + sb.append("bytesRead=").append(bytesRead); + sb.append(','); + sb.append("isShortCircuit=").append(isShortCircuit); + sb.append(','); + sb.append("networkDistance=").append(networkDistance); + return sb.toString(); + } + } + + /** * This method parses a striped block group into individual blocks. * * @param bg The striped block group @@ -245,10 +287,11 @@ public class StripedBlockUtil { * @throws InterruptedException */ public static StripingChunkReadResult getNextCompletedStripedRead( - CompletionService<Void> readService, Map<Future<Void>, Integer> futures, + CompletionService<BlockReadStats> readService, + Map<Future<BlockReadStats>, Integer> futures, final long timeoutMillis) throws InterruptedException { Preconditions.checkArgument(!futures.isEmpty()); - Future<Void> future = null; + Future<BlockReadStats> future = null; try { if (timeoutMillis > 0) { future = readService.poll(timeoutMillis, TimeUnit.MILLISECONDS); @@ -256,9 +299,9 @@ public class StripedBlockUtil { future = readService.take(); } if (future != null) { - future.get(); + final BlockReadStats stats = future.get(); return new StripingChunkReadResult(futures.remove(future), - StripingChunkReadResult.SUCCESSFUL); + StripingChunkReadResult.SUCCESSFUL, stats); } else { return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT); } @@ -881,24 +924,36 @@ public class StripedBlockUtil { public final int index; public final int state; + private final BlockReadStats readStats; public StripingChunkReadResult(int state) { Preconditions.checkArgument(state == TIMEOUT, "Only timeout result should return negative index."); this.index = -1; this.state = state; + this.readStats = null; } public StripingChunkReadResult(int index, int state) { + this(index, state, null); + } + + public StripingChunkReadResult(int index, int state, BlockReadStats stats) { Preconditions.checkArgument(state != TIMEOUT, "Timeout result should return negative index."); this.index = index; this.state = state; + this.readStats = stats; + } + + public BlockReadStats getReadStats() { + return readStats; } @Override public String toString() { - return "(index=" + index + ", state =" + state + ")"; + return "(index=" + index + ", state =" + state + ", readStats =" + + readStats + ")"; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index 45e29ff..f9063b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; @@ -161,7 +162,7 @@ public final class ErasureCodingWorker { return conf; } - CompletionService<Void> createReadService() { + CompletionService<BlockReadStats> createReadService() { return new ExecutorCompletionService<>(stripedReadPool); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java index cbef318..0db8a6f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; @@ -161,16 +162,15 @@ class StripedBlockReader { } } - Callable<Void> readFromBlock(final int length, + Callable<BlockReadStats> readFromBlock(final int length, final CorruptedBlocks corruptedBlocks) { - return new Callable<Void>() { + return new Callable<BlockReadStats>() { @Override - public Void call() throws Exception { + public BlockReadStats call() throws Exception { try { getReadBuffer().limit(length); - actualReadFromBlock(); - return null; + return actualReadFromBlock(); } catch (ChecksumException e) { LOG.warn("Found Checksum error for {} from {} at {}", block, source, e.getPos()); @@ -187,7 +187,7 @@ class StripedBlockReader { /** * Perform actual reading of bytes from block. */ - private void actualReadFromBlock() throws IOException { + private BlockReadStats actualReadFromBlock() throws IOException { int len = buffer.remaining(); int n = 0; while (n < len) { @@ -198,6 +198,8 @@ class StripedBlockReader { n += nread; stripedReader.getReconstructor().incrBytesRead(isLocal, nread); } + return new BlockReadStats(n, blockReader.isShortCircuit(), + blockReader.getNetworkDistance()); } // close block reader http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java index 96f9791..98edf72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats; import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; import org.apache.hadoop.util.DataChecksum; import org.slf4j.Logger; @@ -80,8 +81,8 @@ class StripedReader { private final List<StripedBlockReader> readers; - private final Map<Future<Void>, Integer> futures = new HashMap<>(); - private final CompletionService<Void> readService; + private final Map<Future<BlockReadStats>, Integer> futures = new HashMap<>(); + private final CompletionService<BlockReadStats> readService; StripedReader(StripedReconstructor reconstructor, DataNode datanode, Configuration conf, StripedReconstructionInfo stripedReconInfo) { @@ -289,9 +290,9 @@ class StripedReader { int toRead = getReadLength(liveIndices[successList[i]], reconstructLength); if (toRead > 0) { - Callable<Void> readCallable = + Callable<BlockReadStats> readCallable = reader.readFromBlock(toRead, corruptedBlocks); - Future<Void> f = readService.submit(readCallable); + Future<BlockReadStats> f = readService.submit(readCallable); futures.put(f, successList[i]); } else { // If the read length is 0, we don't need to do real read @@ -411,9 +412,9 @@ class StripedReader { // step3: schedule if find a correct source DN and need to do real read. if (reader != null) { - Callable<Void> readCallable = + Callable<BlockReadStats> readCallable = reader.readFromBlock(toRead, corruptedBlocks); - Future<Void> f = readService.submit(readCallable); + Future<BlockReadStats> f = readService.submit(readCallable); futures.put(f, m); used.set(m); } @@ -422,8 +423,8 @@ class StripedReader { } // Cancel all reads. - private static void cancelReads(Collection<Future<Void>> futures) { - for (Future<Void> future : futures) { + private static void cancelReads(Collection<Future<BlockReadStats>> futures) { + for (Future<BlockReadStats> future : futures) { future.cancel(true); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java index bbffcf5..a1f4c7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats; import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.io.erasurecode.CodecUtil; @@ -222,7 +223,7 @@ abstract class StripedReconstructor { return cachingStrategy; } - CompletionService<Void> createReadService() { + CompletionService<BlockReadStats> createReadService() { return erasureCodingWorker.createReadService(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/08bb6c49/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystemWithECFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystemWithECFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystemWithECFile.java index 0a3010f..1a2c4de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystemWithECFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystemWithECFile.java @@ -21,16 +21,21 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; +import org.apache.hadoop.io.IOUtils; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.List; @@ -61,6 +66,9 @@ public class TestDistributedFileSystemWithECFile { return StripedFileTestUtil.getDefaultECPolicy(); } + @Rule + public final Timeout globalTimeout = new Timeout(60000 * 3); + @Before public void setup() throws IOException { ecPolicy = getEcPolicy(); @@ -249,4 +257,40 @@ public class TestDistributedFileSystemWithECFile { assertEquals(rs63, fs.getErasureCodingPolicy(ecFile)); assertEquals(rs32, fs.getErasureCodingPolicy(ecFile2)); } + + @SuppressWarnings("deprecation") + @Test + public void testStatistics() throws Exception { + final String fileName = "/ec/file"; + final int size = 3200; + createFile(fileName, size); + InputStream in = null; + try { + in = fs.open(new Path(fileName)); + IOUtils.copyBytes(in, System.out, 4096, false); + } finally { + IOUtils.closeStream(in); + } + + // verify stats are correct + Long totalBytesRead = 0L; + Long ecBytesRead = 0L; + for (FileSystem.Statistics stat : FileSystem.getAllStatistics()) { + totalBytesRead += stat.getBytesRead(); + ecBytesRead += stat.getBytesReadErasureCoded(); + } + assertEquals(Long.valueOf(size), totalBytesRead); + assertEquals(Long.valueOf(size), ecBytesRead); + + // verify thread local stats are correct + Long totalBytesReadThread = 0L; + Long ecBytesReadThread = 0L; + for (FileSystem.Statistics stat : FileSystem.getAllStatistics()) { + FileSystem.Statistics.StatisticsData data = stat.getThreadStatistics(); + totalBytesReadThread += data.getBytesRead(); + ecBytesReadThread += data.getBytesReadErasureCoded(); + } + assertEquals(Long.valueOf(size), totalBytesReadThread); + assertEquals(Long.valueOf(size), ecBytesReadThread); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org