HDFS-8901. Use ByteBuffer in striping positional read. Contributed by Sammi 
Chen and Kai Zheng.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/401db4fc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/401db4fc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/401db4fc

Branch: refs/heads/YARN-3368
Commit: 401db4fc65140979fe7665983e36905e886df971
Parents: 20a20c2
Author: Zhe Zhang <z...@apache.org>
Authored: Thu Sep 8 11:54:33 2016 -0700
Committer: Zhe Zhang <z...@apache.org>
Committed: Thu Sep 8 11:54:33 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/util/DataChecksum.java    |   2 +-
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |  68 ++++---
 .../hadoop/hdfs/DFSStripedInputStream.java      |  24 ++-
 .../hadoop/hdfs/util/StripedBlockUtil.java      | 177 ++++++++++---------
 .../hadoop/hdfs/TestDFSStripedInputStream.java  | 121 ++++++++++++-
 .../hadoop/hdfs/util/TestStripedBlockUtil.java  |  22 +--
 6 files changed, 281 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/401db4fc/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
index 3a53ed9..6982a92 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
@@ -304,7 +304,7 @@ public class DataChecksum implements Checksum {
           bytesPerChecksum, checksums.array(), crcsOffset, fileName, basePos);
       return;
     }
-    if (NativeCrc32.isAvailable()) {
+    if (NativeCrc32.isAvailable() && data.isDirect()) {
       NativeCrc32.verifyChunkedSums(bytesPerChecksum, type.id, checksums, data,
           fileName, basePos);
     } else {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/401db4fc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 7a10ba4..31fa897 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -533,7 +533,8 @@ public class DFSInputStream extends FSInputStream
    * Open a DataInputStream to a DataNode so that it can be read from.
    * We get block ID and the IDs of the destinations at startup, from the 
namenode.
    */
-  private synchronized DatanodeInfo blockSeekTo(long target) throws 
IOException {
+  private synchronized DatanodeInfo blockSeekTo(long target)
+      throws IOException {
     if (target >= getFileLength()) {
       throw new IOException("Attempted to read past end of file");
     }
@@ -962,14 +963,14 @@ public class DFSInputStream extends FSInputStream
   }
 
   protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
-      byte[] buf, int offset, CorruptedBlocks corruptedBlocks)
+      ByteBuffer buf, CorruptedBlocks corruptedBlocks)
       throws IOException {
     block = refreshLocatedBlock(block);
     while (true) {
       DNAddrPair addressPair = chooseDataNode(block, null);
       try {
         actualGetFromOneDataNode(addressPair, block, start, end,
-            buf, offset, corruptedBlocks);
+            buf, corruptedBlocks);
         return;
       } catch (IOException e) {
         checkInterrupted(e); // check if the read has been interrupted
@@ -988,12 +989,10 @@ public class DFSInputStream extends FSInputStream
     return new Callable<ByteBuffer>() {
       @Override
       public ByteBuffer call() throws Exception {
-        byte[] buf = bb.array();
-        int offset = bb.position();
         try (TraceScope ignored = dfsClient.getTracer().
             newScope("hedgedRead" + hedgedReadId, parentSpanId)) {
-          actualGetFromOneDataNode(datanode, block, start, end, buf,
-              offset, corruptedBlocks);
+          actualGetFromOneDataNode(datanode, block, start, end, bb,
+              corruptedBlocks);
           return bb;
         }
       }
@@ -1007,13 +1006,12 @@ public class DFSInputStream extends FSInputStream
    * @param block             the located block containing the requested data
    * @param startInBlk        the startInBlk offset of the block
    * @param endInBlk          the endInBlk offset of the block
-   * @param buf               the given byte array into which the data is read
-   * @param offset            the offset in buf
+   * @param buf               the given byte buffer into which the data is read
    * @param corruptedBlocks   map recording list of datanodes with corrupted
    *                          block replica
    */
   void actualGetFromOneDataNode(final DNAddrPair datanode, LocatedBlock block,
-      final long startInBlk, final long endInBlk, byte[] buf, int offset,
+      final long startInBlk, final long endInBlk, ByteBuffer buf,
                                 CorruptedBlocks corruptedBlocks)
       throws IOException {
     DFSClientFaultInjector.get().startFetchFromDatanode();
@@ -1031,7 +1029,22 @@ public class DFSInputStream extends FSInputStream
         DFSClientFaultInjector.get().fetchFromDatanodeException();
         reader = getBlockReader(block, startInBlk, len, datanode.addr,
             datanode.storageType, datanode.info);
-        int nread = reader.readAll(buf, offset, len);
+
+        //Behave exactly as the readAll() call
+        ByteBuffer tmp = buf.duplicate();
+        tmp.limit(tmp.position() + len);
+        tmp = tmp.slice();
+        int nread = 0;
+        int ret;
+        while (true) {
+          ret = reader.read(tmp);
+          if (ret <= 0) {
+            break;
+          }
+          nread += ret;
+        }
+        buf.position(buf.position() + nread);
+
         IOUtilsClient.updateReadStatistics(readStatistics, nread, reader);
         dfsClient.updateFileSystemReadStats(
             reader.getNetworkDistance(), nread);
@@ -1098,7 +1111,7 @@ public class DFSInputStream extends FSInputStream
    * time. We then wait on which ever read returns first.
    */
   private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
-      long end, byte[] buf, int offset, CorruptedBlocks corruptedBlocks)
+      long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks)
       throws IOException {
     final DfsClientConf conf = dfsClient.getConf();
     ArrayList<Future<ByteBuffer>> futures = new ArrayList<>();
@@ -1130,8 +1143,8 @@ public class DFSInputStream extends FSInputStream
               conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
           if (future != null) {
             ByteBuffer result = future.get();
-            System.arraycopy(result.array(), result.position(), buf, offset,
-                len);
+            result.flip();
+            buf.put(result);
             return;
           }
           DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
@@ -1173,8 +1186,8 @@ public class DFSInputStream extends FSInputStream
           // cancel the rest.
           cancelAll(futures);
           dfsClient.getHedgedReadMetrics().incHedgedReadWins();
-          System.arraycopy(result.array(), result.position(), buf, offset,
-              len);
+          result.flip();
+          buf.put(result);
           return;
         } catch (InterruptedException ie) {
           // Ignore and retry
@@ -1244,7 +1257,8 @@ public class DFSInputStream extends FSInputStream
      * access key from its memory since it's considered expired based on
      * the estimated expiration date.
      */
-    if (ex instanceof InvalidBlockTokenException || ex instanceof 
InvalidToken) {
+    if (ex instanceof InvalidBlockTokenException ||
+        ex instanceof InvalidToken) {
       DFSClient.LOG.info("Access token was invalid when connecting to "
           + targetAddr + " : " + ex);
       return true;
@@ -1272,7 +1286,8 @@ public class DFSInputStream extends FSInputStream
     try (TraceScope scope = dfsClient.
         newReaderTraceScope("DFSInputStream#byteArrayPread",
             src, position, length)) {
-      int retLen = pread(position, buffer, offset, length);
+      ByteBuffer bb = ByteBuffer.wrap(buffer, offset, length);
+      int retLen = pread(position, bb);
       if (retLen < length) {
         dfsClient.addRetLenToReaderScope(scope, retLen);
       }
@@ -1280,7 +1295,7 @@ public class DFSInputStream extends FSInputStream
     }
   }
 
-  private int pread(long position, byte[] buffer, int offset, int length)
+  private int pread(long position, ByteBuffer buffer)
       throws IOException {
     // sanity checks
     dfsClient.checkOpen();
@@ -1292,6 +1307,7 @@ public class DFSInputStream extends FSInputStream
     if ((position < 0) || (position >= filelen)) {
       return -1;
     }
+    int length = buffer.remaining();
     int realLen = length;
     if ((position + length) > filelen) {
       realLen = (int)(filelen - position);
@@ -1304,14 +1320,16 @@ public class DFSInputStream extends FSInputStream
     CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
     for (LocatedBlock blk : blockRange) {
       long targetStart = position - blk.getStartOffset();
-      long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
+      int bytesToRead = (int) Math.min(remaining,
+          blk.getBlockSize() - targetStart);
+      long targetEnd = targetStart + bytesToRead - 1;
       try {
         if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) {
           hedgedFetchBlockByteRange(blk, targetStart,
-              targetStart + bytesToRead - 1, buffer, offset, corruptedBlocks);
+              targetEnd, buffer, corruptedBlocks);
         } else {
-          fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
-              buffer, offset, corruptedBlocks);
+          fetchBlockByteRange(blk, targetStart, targetEnd,
+              buffer, corruptedBlocks);
         }
       } finally {
         // Check and report if any block replicas are corrupted.
@@ -1323,7 +1341,6 @@ public class DFSInputStream extends FSInputStream
 
       remaining -= bytesToRead;
       position += bytesToRead;
-      offset += bytesToRead;
     }
     assert remaining == 0 : "Wrong number of bytes read.";
     return realLen;
@@ -1457,7 +1474,8 @@ public class DFSInputStream extends FSInputStream
    * If another node could not be found, then returns false.
    */
   @Override
-  public synchronized boolean seekToNewSource(long targetPos) throws 
IOException {
+  public synchronized boolean seekToNewSource(long targetPos)
+      throws IOException {
     if (currentNode == null) {
       return seekToBlockSource(targetPos);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/401db4fc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 9ca8005..ccaf6a7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -307,8 +307,8 @@ public class DFSStripedInputStream extends DFSInputStream {
         stripeLimit - stripeBufOffset);
 
     LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock;
-    AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy, 
cellSize,
-        blockGroup, offsetInBlockGroup,
+    AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy,
+        cellSize, blockGroup, offsetInBlockGroup,
         offsetInBlockGroup + stripeRange.length - 1, curStripeBuf);
     final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
         blockGroup, cellSize, dataBlkNum, parityBlkNum);
@@ -523,13 +523,13 @@ public class DFSStripedInputStream extends DFSInputStream 
{
    */
   @Override
   protected void fetchBlockByteRange(LocatedBlock block, long start,
-      long end, byte[] buf, int offset, CorruptedBlocks corruptedBlocks)
+      long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks)
       throws IOException {
     // Refresh the striped block group
     LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset());
 
     AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes(
-        ecPolicy, cellSize, blockGroup, start, end, buf, offset);
+        ecPolicy, cellSize, blockGroup, start, end, buf);
     CompletionService<Void> readService = new ExecutorCompletionService<>(
         dfsClient.getStripedReadsThreadPool());
     final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
@@ -542,6 +542,7 @@ public class DFSStripedInputStream extends DFSInputStream {
             blks, preaderInfos, corruptedBlocks);
         preader.readStripe();
       }
+      buf.position(buf.position() + (int)(end - start + 1));
     } finally {
       for (BlockReaderInfo preaderInfo : preaderInfos) {
         closeReader(preaderInfo);
@@ -698,16 +699,15 @@ public class DFSStripedInputStream extends DFSInputStream 
{
     }
 
     private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {
-      if (chunk.byteBuffer != null) {
-        ByteBufferStrategy strategy =
-            new ByteBufferStrategy(chunk.byteBuffer, readStatistics, 
dfsClient);
+      if (chunk.useByteBuffer()) {
+        ByteBufferStrategy strategy = new ByteBufferStrategy(
+            chunk.getByteBuffer(), readStatistics, dfsClient);
         return new ByteBufferStrategy[]{strategy};
       } else {
         ByteBufferStrategy[] strategies =
-            new ByteBufferStrategy[chunk.byteArray.getOffsets().length];
+            new ByteBufferStrategy[chunk.getChunkBuffer().getSlices().size()];
         for (int i = 0; i < strategies.length; i++) {
-          ByteBuffer buffer = ByteBuffer.wrap(chunk.byteArray.buf(),
-              chunk.byteArray.getOffsets()[i], 
chunk.byteArray.getLengths()[i]);
+          ByteBuffer buffer = chunk.getChunkBuffer().getSlice(i);
           strategies[i] =
               new ByteBufferStrategy(buffer, readStatistics, dfsClient);
         }
@@ -814,7 +814,7 @@ public class DFSStripedInputStream extends DFSInputStream {
   }
 
   class PositionStripeReader extends StripeReader {
-    private byte[][] decodeInputs = null;
+    private ByteBuffer[] decodeInputs = null;
 
     PositionStripeReader(CompletionService<Void> service,
         AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
@@ -836,8 +836,6 @@ public class DFSStripedInputStream extends DFSInputStream {
       Preconditions.checkState(index >= dataBlkNum &&
           alignedStripe.chunks[index] == null);
       alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]);
-      alignedStripe.chunks[index].addByteArraySlice(0,
-          (int) alignedStripe.getSpanInBlock());
       return true;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/401db4fc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index c8827d9..4dbbc3d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -73,7 +73,8 @@ import java.util.concurrent.TimeUnit;
 @InterfaceAudience.Private
 public class StripedBlockUtil {
 
-  public static final Logger LOG = 
LoggerFactory.getLogger(StripedBlockUtil.class);
+  public static final Logger LOG =
+      LoggerFactory.getLogger(StripedBlockUtil.class);
 
   /**
    * Parses a striped block group into individual blocks.
@@ -312,16 +313,17 @@ public class StripedBlockUtil {
    * schedule a new fetch request with the decoding input buffer as transfer
    * destination.
    */
-  public static byte[][] initDecodeInputs(AlignedStripe alignedStripe,
+  public static ByteBuffer[] initDecodeInputs(AlignedStripe alignedStripe,
       int dataBlkNum, int parityBlkNum) {
-    byte[][] decodeInputs =
-        new byte[dataBlkNum + parityBlkNum][(int) 
alignedStripe.getSpanInBlock()];
+    ByteBuffer[] decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum];
+    for (int i = 0; i < decodeInputs.length; i++) {
+      decodeInputs[i] = ByteBuffer.allocate(
+          (int) alignedStripe.getSpanInBlock());
+    }
     // read the full data aligned stripe
     for (int i = 0; i < dataBlkNum; i++) {
       if (alignedStripe.chunks[i] == null) {
         alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
-        alignedStripe.chunks[i].addByteArraySlice(0,
-            (int) alignedStripe.getSpanInBlock());
       }
     }
     return decodeInputs;
@@ -334,14 +336,21 @@ public class StripedBlockUtil {
    * When all pending requests have returned, this method should be called to
    * finalize decode input buffers.
    */
-  public static void finalizeDecodeInputs(final byte[][] decodeInputs,
+  public static void finalizeDecodeInputs(final ByteBuffer[] decodeInputs,
                                           AlignedStripe alignedStripe) {
     for (int i = 0; i < alignedStripe.chunks.length; i++) {
       final StripingChunk chunk = alignedStripe.chunks[i];
       if (chunk != null && chunk.state == StripingChunk.FETCHED) {
-        chunk.copyTo(decodeInputs[i]);
+        if (chunk.useChunkBuffer()) {
+          chunk.getChunkBuffer().copyTo(decodeInputs[i]);
+        } else {
+          chunk.getByteBuffer().flip();
+        }
       } else if (chunk != null && chunk.state == StripingChunk.ALLZERO) {
-        Arrays.fill(decodeInputs[i], (byte) 0);
+        //ZERO it. Will be better handled in other following issue.
+        byte[] emptyBytes = new byte[decodeInputs[i].limit()];
+        decodeInputs[i].put(emptyBytes);
+        decodeInputs[i].flip();
       } else {
         decodeInputs[i] = null;
       }
@@ -351,7 +360,7 @@ public class StripedBlockUtil {
   /**
    * Decode based on the given input buffers and erasure coding policy.
    */
-  public static void decodeAndFillBuffer(final byte[][] decodeInputs,
+  public static void decodeAndFillBuffer(final ByteBuffer[] decodeInputs,
       AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum,
       RawErasureDecoder decoder) {
     // Step 1: prepare indices and output buffers for missing data units
@@ -364,8 +373,11 @@ public class StripedBlockUtil {
       }
     }
     decodeIndices = Arrays.copyOf(decodeIndices, pos);
-    byte[][] decodeOutputs =
-        new byte[decodeIndices.length][(int) alignedStripe.getSpanInBlock()];
+    ByteBuffer[] decodeOutputs = new ByteBuffer[decodeIndices.length];
+    for (int i = 0; i < decodeOutputs.length; i++) {
+      decodeOutputs[i] = ByteBuffer.allocate(
+          (int) alignedStripe.getSpanInBlock());
+    }
 
     // Step 2: decode into prepared output buffers
     decoder.decode(decodeInputs, decodeIndices, decodeOutputs);
@@ -374,8 +386,8 @@ public class StripedBlockUtil {
     for (int i = 0; i < decodeIndices.length; i++) {
       int missingBlkIdx = decodeIndices[i];
       StripingChunk chunk = alignedStripe.chunks[missingBlkIdx];
-      if (chunk.state == StripingChunk.MISSING) {
-        chunk.copyFrom(decodeOutputs[i]);
+      if (chunk.state == StripingChunk.MISSING && chunk.useChunkBuffer()) {
+        chunk.getChunkBuffer().copyFrom(decodeOutputs[i]);
       }
     }
   }
@@ -402,7 +414,8 @@ public class StripedBlockUtil {
 
     // Step 4: calculate each chunk's position in destination buffer. Since the
     // whole read range is within a single stripe, the logic is simpler here.
-    int bufOffset = (int) (rangeStartInBlockGroup % ((long) cellSize * 
dataBlkNum));
+    int bufOffset =
+        (int) (rangeStartInBlockGroup % ((long) cellSize * dataBlkNum));
     for (StripingCell cell : cells) {
       long cellStart = cell.idxInInternalBlk * cellSize + cell.offset;
       long cellEnd = cellStart + cell.size - 1;
@@ -437,15 +450,14 @@ public class StripedBlockUtil {
    * @param rangeStartInBlockGroup The byte range's start offset in block group
    * @param rangeEndInBlockGroup The byte range's end offset in block group
    * @param buf Destination buffer of the read operation for the byte range
-   * @param offsetInBuf Start offset into the destination buffer
    *
    * At most 5 stripes will be generated from each logical range, as
    * demonstrated in the header of {@link AlignedStripe}.
    */
-  public static AlignedStripe[] divideByteRangeIntoStripes(ErasureCodingPolicy 
ecPolicy,
+  public static AlignedStripe[] divideByteRangeIntoStripes(
+      ErasureCodingPolicy ecPolicy,
       int cellSize, LocatedStripedBlock blockGroup,
-      long rangeStartInBlockGroup, long rangeEndInBlockGroup, byte[] buf,
-      int offsetInBuf) {
+      long rangeStartInBlockGroup, long rangeEndInBlockGroup, ByteBuffer buf) {
 
     // Step 0: analyze range and calculate basic parameters
     final int dataBlkNum = ecPolicy.getNumDataUnits();
@@ -462,7 +474,7 @@ public class StripedBlockUtil {
     AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges);
 
     // Step 4: calculate each chunk's position in destination buffer
-    calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf, offsetInBuf);
+    calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf);
 
     // Step 5: prepare ALLZERO blocks
     prepareAllZeroChunks(blockGroup, stripes, cellSize, dataBlkNum);
@@ -476,7 +488,8 @@ public class StripedBlockUtil {
    * used by {@link DFSStripedOutputStream} in encoding
    */
   @VisibleForTesting
-  private static StripingCell[] 
getStripingCellsOfByteRange(ErasureCodingPolicy ecPolicy,
+  private static StripingCell[] getStripingCellsOfByteRange(
+      ErasureCodingPolicy ecPolicy,
       int cellSize, LocatedStripedBlock blockGroup,
       long rangeStartInBlockGroup, long rangeEndInBlockGroup) {
     Preconditions.checkArgument(
@@ -511,7 +524,8 @@ public class StripedBlockUtil {
    * the physical byte range (inclusive) on each stored internal block.
    */
   @VisibleForTesting
-  private static VerticalRange[] 
getRangesForInternalBlocks(ErasureCodingPolicy ecPolicy,
+  private static VerticalRange[] getRangesForInternalBlocks(
+      ErasureCodingPolicy ecPolicy,
       int cellSize, StripingCell[] cells) {
     int dataBlkNum = ecPolicy.getNumDataUnits();
     int parityBlkNum = ecPolicy.getNumParityUnits();
@@ -575,8 +589,7 @@ public class StripedBlockUtil {
   }
 
   private static void calcualteChunkPositionsInBuf(int cellSize,
-      AlignedStripe[] stripes, StripingCell[] cells, byte[] buf,
-      int offsetInBuf) {
+      AlignedStripe[] stripes, StripingCell[] cells, ByteBuffer buf) {
     /**
      *     | <--------------- AlignedStripe --------------->|
      *
@@ -598,6 +611,7 @@ public class StripedBlockUtil {
     for (StripingCell cell : cells) {
       long cellStart = cell.idxInInternalBlk * cellSize + cell.offset;
       long cellEnd = cellStart + cell.size - 1;
+      StripingChunk chunk;
       for (AlignedStripe s : stripes) {
         long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1;
         long overlapStart = Math.max(cellStart, s.getOffsetInBlock());
@@ -606,11 +620,13 @@ public class StripedBlockUtil {
         if (overLapLen <= 0) {
           continue;
         }
-        if (s.chunks[cell.idxInStripe] == null) {
-          s.chunks[cell.idxInStripe] = new StripingChunk(buf);
+        chunk = s.chunks[cell.idxInStripe];
+        if (chunk == null) {
+          chunk = new StripingChunk();
+          s.chunks[cell.idxInStripe] = chunk;
         }
-        s.chunks[cell.idxInStripe].addByteArraySlice(
-            (int)(offsetInBuf + done + overlapStart - cellStart), overLapLen);
+        chunk.getChunkBuffer().addSlice(buf,
+            (int) (done + overlapStart - cellStart), overLapLen);
       }
       done += cell.size;
     }
@@ -833,88 +849,89 @@ public class StripedBlockUtil {
      */
     public int state = REQUESTED;
 
-    public final ChunkByteArray byteArray;
-    public final ByteBuffer byteBuffer;
+    private final ChunkByteBuffer chunkBuffer;
+    private final ByteBuffer byteBuffer;
 
-    public StripingChunk(byte[] buf) {
-      this.byteArray = new ChunkByteArray(buf);
+    public StripingChunk() {
+      this.chunkBuffer = new ChunkByteBuffer();
       byteBuffer = null;
     }
 
     public StripingChunk(ByteBuffer buf) {
-      this.byteArray = null;
+      this.chunkBuffer = null;
       this.byteBuffer = buf;
     }
 
     public StripingChunk(int state) {
-      this.byteArray = null;
+      this.chunkBuffer = null;
       this.byteBuffer = null;
       this.state = state;
     }
 
-    public void addByteArraySlice(int offset, int length) {
-      assert byteArray != null;
-      byteArray.offsetsInBuf.add(offset);
-      byteArray.lengthsInBuf.add(length);
+    public boolean useByteBuffer(){
+      return byteBuffer != null;
     }
 
-    void copyTo(byte[] target) {
-      assert byteArray != null;
-      byteArray.copyTo(target);
+    public boolean useChunkBuffer() {
+      return chunkBuffer != null;
     }
 
-    void copyFrom(byte[] src) {
-      assert byteArray != null;
-      byteArray.copyFrom(src);
+    public ByteBuffer getByteBuffer() {
+      assert byteBuffer != null;
+      return byteBuffer;
+    }
+
+    public ChunkByteBuffer getChunkBuffer() {
+      assert chunkBuffer != null;
+      return chunkBuffer;
     }
   }
 
-  public static class ChunkByteArray {
-    private final byte[] buf;
-    private final List<Integer> offsetsInBuf;
-    private final List<Integer> lengthsInBuf;
+  /**
+   * A utility to manage ByteBuffer slices for a reader.
+   */
+  public static class ChunkByteBuffer {
+    private final List<ByteBuffer> slices;
 
-    ChunkByteArray(byte[] buf) {
-      this.buf = buf;
-      this.offsetsInBuf = new ArrayList<>();
-      this.lengthsInBuf = new ArrayList<>();
+    ChunkByteBuffer() {
+      this.slices = new ArrayList<>();
     }
 
-    public int[] getOffsets() {
-      int[] offsets = new int[offsetsInBuf.size()];
-      for (int i = 0; i < offsets.length; i++) {
-        offsets[i] = offsetsInBuf.get(i);
-      }
-      return offsets;
+    public void addSlice(ByteBuffer buffer, int offset, int len) {
+      ByteBuffer tmp = buffer.duplicate();
+      tmp.position(buffer.position() + offset);
+      tmp.limit(buffer.position() + offset + len);
+      slices.add(tmp.slice());
     }
 
-    public int[] getLengths() {
-      int[] lens = new int[this.lengthsInBuf.size()];
-      for (int i = 0; i < lens.length; i++) {
-        lens[i] = this.lengthsInBuf.get(i);
-      }
-      return lens;
+    public ByteBuffer getSlice(int i) {
+      return slices.get(i);
     }
 
-    public byte[] buf() {
-      return buf;
+    public List<ByteBuffer> getSlices() {
+      return slices;
     }
 
-    void copyTo(byte[] target) {
-      int posInBuf = 0;
-      for (int i = 0; i < offsetsInBuf.size(); i++) {
-        System.arraycopy(buf, offsetsInBuf.get(i),
-            target, posInBuf, lengthsInBuf.get(i));
-        posInBuf += lengthsInBuf.get(i);
+    /**
+     *  Note: target will be ready-to-read state after the call.
+     */
+    void copyTo(ByteBuffer target) {
+      for (ByteBuffer slice : slices) {
+        slice.flip();
+        target.put(slice);
       }
-    }
-
-    void copyFrom(byte[] src) {
-      int srcPos = 0;
-      for (int j = 0; j < offsetsInBuf.size(); j++) {
-        System.arraycopy(src, srcPos, buf, offsetsInBuf.get(j),
-            lengthsInBuf.get(j));
-        srcPos += lengthsInBuf.get(j);
+      target.flip();
+    }
+
+    void copyFrom(ByteBuffer src) {
+      ByteBuffer tmp;
+      int len;
+      for (ByteBuffer slice : slices) {
+        len = slice.remaining();
+        tmp = src.duplicate();
+        tmp.limit(tmp.position() + len);
+        slice.put(tmp);
+        src.position(src.position() + len);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/401db4fc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index 18c2de9..1e27745 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -57,7 +57,8 @@ import static org.junit.Assert.assertTrue;
 
 public class TestDFSStripedInputStream {
 
-  public static final Log LOG = 
LogFactory.getLog(TestDFSStripedInputStream.class);
+  public static final Log LOG =
+      LogFactory.getLog(TestDFSStripedInputStream.class);
 
   private MiniDFSCluster cluster;
   private Configuration conf = new Configuration();
@@ -272,12 +273,16 @@ public class TestDFSStripedInputStream {
     // |10     |
     done += in.read(0, readBuffer, 0, delta);
     assertEquals(delta, done);
+    assertArrayEquals(Arrays.copyOf(expected, done),
+        Arrays.copyOf(readBuffer, done));
     // both head and trail cells are partial
     // |c_0      |c_1    |c_2 |c_3 |c_4      |c_5         |
     // |256K - 10|missing|256K|256K|256K - 10|not in range|
     done += in.read(delta, readBuffer, delta,
         CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta);
     assertEquals(CELLSIZE * (DATA_BLK_NUM - 1) - delta, done);
+    assertArrayEquals(Arrays.copyOf(expected, done),
+        Arrays.copyOf(readBuffer, done));
     // read the rest
     done += in.read(done, readBuffer, done, readSize - done);
     assertEquals(readSize, done);
@@ -291,8 +296,8 @@ public class TestDFSStripedInputStream {
     testStatefulRead(true, true);
   }
 
-  private void testStatefulRead(boolean useByteBuffer, boolean 
cellMisalignPacket)
-      throws Exception {
+  private void testStatefulRead(boolean useByteBuffer,
+      boolean cellMisalignPacket) throws Exception {
     final int numBlocks = 2;
     final int fileSize = numBlocks * BLOCK_GROUP_SIZE;
     if (cellMisalignPacket) {
@@ -302,7 +307,8 @@ public class TestDFSStripedInputStream {
     }
     DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
         NUM_STRIPE_PER_BLOCK, false);
-    LocatedBlocks lbs = 
fs.getClient().namenode.getBlockLocations(filePath.toString(), 0, fileSize);
+    LocatedBlocks lbs = fs.getClient().namenode.
+        getBlockLocations(filePath.toString(), 0, fileSize);
 
     assert lbs.getLocatedBlocks().size() == numBlocks;
     for (LocatedBlock lb : lbs.getLocatedBlocks()) {
@@ -360,4 +366,111 @@ public class TestDFSStripedInputStream {
     }
     fs.delete(filePath, true);
   }
+
+  @Test
+  public void testStatefulReadWithDNFailure() throws Exception {
+    final int numBlocks = 4;
+    final int failedDNIdx = DATA_BLK_NUM - 1;
+    DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+        NUM_STRIPE_PER_BLOCK, false);
+    LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+        filePath.toString(), 0, BLOCK_GROUP_SIZE);
+
+    assert lbs.get(0) instanceof LocatedStripedBlock;
+    LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+    for (int i = 0; i < DATA_BLK_NUM + PARITY_BLK_NUM; i++) {
+      Block blk = new Block(bg.getBlock().getBlockId() + i,
+          NUM_STRIPE_PER_BLOCK * CELLSIZE,
+          bg.getBlock().getGenerationStamp());
+      blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
+      cluster.injectBlocks(i, Arrays.asList(blk),
+          bg.getBlock().getBlockPoolId());
+    }
+    DFSStripedInputStream in =
+        new DFSStripedInputStream(fs.getClient(), filePath.toString(), false,
+            ecPolicy, null);
+    int readSize = BLOCK_GROUP_SIZE;
+    byte[] readBuffer = new byte[readSize];
+    byte[] expected = new byte[readSize];
+    /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks 
*/
+    for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+      for (int j = 0; j < DATA_BLK_NUM; j++) {
+        for (int k = 0; k < CELLSIZE; k++) {
+          int posInBlk = i * CELLSIZE + k;
+          int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
+          expected[posInFile] = SimulatedFSDataset.simulatedByte(
+              new Block(bg.getBlock().getBlockId() + j), posInBlk);
+        }
+      }
+    }
+
+    ErasureCoderOptions coderOptions = new ErasureCoderOptions(
+        DATA_BLK_NUM, PARITY_BLK_NUM);
+    RawErasureDecoder rawDecoder = CodecUtil.createRawDecoder(conf,
+        ecPolicy.getCodecName(), coderOptions);
+
+    // Update the expected content for decoded data
+    int[] missingBlkIdx = new int[PARITY_BLK_NUM];
+    for (int i = 0; i < missingBlkIdx.length; i++) {
+      if (i == 0) {
+        missingBlkIdx[i] = failedDNIdx;
+      } else {
+        missingBlkIdx[i] = DATA_BLK_NUM + i;
+      }
+    }
+    cluster.stopDataNode(failedDNIdx);
+    for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+      byte[][] decodeInputs = new byte[DATA_BLK_NUM + 
PARITY_BLK_NUM][CELLSIZE];
+      byte[][] decodeOutputs = new byte[missingBlkIdx.length][CELLSIZE];
+      for (int j = 0; j < DATA_BLK_NUM; j++) {
+        int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE;
+        if (j != failedDNIdx) {
+          System.arraycopy(expected, posInBuf, decodeInputs[j], 0, CELLSIZE);
+        }
+      }
+      for (int j = DATA_BLK_NUM; j < DATA_BLK_NUM + PARITY_BLK_NUM; j++) {
+        for (int k = 0; k < CELLSIZE; k++) {
+          int posInBlk = i * CELLSIZE + k;
+          decodeInputs[j][k] = SimulatedFSDataset.simulatedByte(
+              new Block(bg.getBlock().getBlockId() + j), posInBlk);
+        }
+      }
+      for (int m : missingBlkIdx) {
+        decodeInputs[m] = null;
+      }
+      rawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs);
+      int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE;
+      System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE);
+    }
+
+    int delta = 10;
+    int done = 0;
+    // read a small delta, shouldn't trigger decode
+    // |cell_0 |
+    // |10     |
+    done += in.read(readBuffer, 0, delta);
+    assertEquals(delta, done);
+    // both head and trail cells are partial
+    // |c_0      |c_1    |c_2 |c_3 |c_4      |c_5         |
+    // |256K - 10|missing|256K|256K|256K - 10|not in range|
+    while (done < (CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta)) {
+      int ret = in.read(readBuffer, delta,
+          CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta);
+      assertTrue(ret > 0);
+      done += ret;
+    }
+    assertEquals(CELLSIZE * (DATA_BLK_NUM - 1) - delta, done);
+    // read the rest
+
+    int restSize;
+    restSize = readSize - done;
+    while (done < restSize) {
+      int ret = in.read(readBuffer, done, restSize);
+      assertTrue(ret > 0);
+      done += ret;
+    }
+
+    assertEquals(readSize, done);
+    assertArrayEquals(expected, readBuffer);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/401db4fc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
index 96fc79c..7d9d7dc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
@@ -36,6 +36,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
+import java.nio.ByteBuffer;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
@@ -242,7 +243,8 @@ public class TestStripedBlockUtil {
    */
   @Test
   public void testDivideByteRangeIntoStripes() {
-    byte[] assembled = new byte[BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE];
+    ByteBuffer assembled =
+        ByteBuffer.allocate(BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE);
     for (int bgSize : blockGroupSizes) {
       LocatedStripedBlock blockGroup = createDummyLocatedBlock(bgSize);
       byte[][] internalBlkBufs = createInternalBlkBuffers(bgSize);
@@ -252,7 +254,7 @@ public class TestStripedBlockUtil {
             continue;
           }
           AlignedStripe[] stripes = divideByteRangeIntoStripes(EC_POLICY,
-              CELLSIZE, blockGroup, brStart, brStart + brSize - 1, assembled, 
0);
+              CELLSIZE, blockGroup, brStart, brStart + brSize - 1, assembled);
 
           for (AlignedStripe stripe : stripes) {
             for (int i = 0; i < DATA_BLK_NUM; i++) {
@@ -261,21 +263,21 @@ public class TestStripedBlockUtil {
                 continue;
               }
               int done = 0;
-              for (int j = 0; j < chunk.byteArray.getLengths().length; j++) {
-                System.arraycopy(internalBlkBufs[i],
-                    (int) stripe.getOffsetInBlock() + done, assembled,
-                    chunk.byteArray.getOffsets()[j],
-                    chunk.byteArray.getLengths()[j]);
-                done += chunk.byteArray.getLengths()[j];
+              int len;
+              for (ByteBuffer slice : chunk.getChunkBuffer().getSlices()) {
+                len = slice.remaining();
+                slice.put(internalBlkBufs[i],
+                    (int) stripe.getOffsetInBlock() + done, len);
+                done += len;
               }
             }
           }
           for (int i = 0; i < brSize; i++) {
-            if (hashIntToByte(brStart + i) != assembled[i]) {
+            if (hashIntToByte(brStart + i) != assembled.get(i)) {
               System.out.println("Oops");
             }
             assertEquals("Byte at " + (brStart + i) + " should be the same",
-                hashIntToByte(brStart + i), assembled[i]);
+                hashIntToByte(brStart + i), assembled.get(i));
           }
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to