HDFS-8669. Erasure Coding: handle missing internal block locations in 
DFSStripedInputStream. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/559ebd77
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/559ebd77
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/559ebd77

Branch: refs/heads/HDFS-7285-REBASE
Commit: 559ebd77dc48c2196add46b6fff51bfd4dfb3809
Parents: 436454a
Author: Jing Zhao <ji...@apache.org>
Authored: Mon Jul 13 11:41:18 2015 -0700
Committer: Vinayakumar B <vinayakum...@apache.org>
Committed: Thu Aug 13 17:20:04 2015 +0530

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   3 +
 .../org/apache/hadoop/hdfs/BlockReader.java     |   4 +-
 .../hadoop/hdfs/DFSStripedInputStream.java      | 331 ++++++++++++-------
 .../hadoop/hdfs/util/StripedBlockUtil.java      |  11 +-
 .../apache/hadoop/hdfs/StripedFileTestUtil.java | 128 +++++++
 .../TestReadStripedFileWithMissingBlocks.java   | 150 +++++++++
 .../hadoop/hdfs/TestWriteReadStripedFile.java   | 162 +--------
 7 files changed, 511 insertions(+), 278 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/559ebd77/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 7b6d165..cd9e19d 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -344,3 +344,6 @@
 
     HDFS-8744. Erasure Coding: the number of chunks in packet is not updated
     when writing parity data. (Li Bo)
+
+    HDFS-8669. Erasure Coding: handle missing internal block locations in
+    DFSStripedInputStream. (jing9)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/559ebd77/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
index 0a5511e..8f988af 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.EnumSet;
 
@@ -31,7 +32,7 @@ import org.apache.hadoop.util.DataChecksum;
  * from a single datanode.
  */
 @InterfaceAudience.Private
-public interface BlockReader extends ByteBufferReadable {
+public interface BlockReader extends ByteBufferReadable, Closeable {
   
 
   /* same interface as inputStream java.io.InputStream#read()
@@ -63,6 +64,7 @@ public interface BlockReader extends ByteBufferReadable {
    *
    * @throws IOException
    */
+  @Override // java.io.Closeable
   void close() throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/559ebd77/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index a71da93..7509003 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -43,6 +43,7 @@ import static 
org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
 import static 
org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
 
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 
@@ -113,16 +114,43 @@ public class DFSStripedInputStream extends DFSInputStream 
{
     }
   }
 
-  private final BlockReader[] blockReaders;
-  /**
-   * when initializing block readers, their starting offsets are set to the 
same
-   * number: the smallest internal block offsets among all the readers. This is
-   * because it is possible that for some internal blocks we have to read
-   * "backwards" for decoding purpose. We thus use this offset array to track
-   * offsets for all the block readers so that we can skip data if necessary.
-   */
-  private final long[] blockReaderOffsets;
-  private final DatanodeInfo[] currentNodes;
+  private static class BlockReaderInfo {
+    final BlockReader reader;
+    final DatanodeInfo datanode;
+    /**
+     * when initializing block readers, their starting offsets are set to the 
same
+     * number: the smallest internal block offsets among all the readers. This 
is
+     * because it is possible that for some internal blocks we have to read
+     * "backwards" for decoding purpose. We thus use this offset array to track
+     * offsets for all the block readers so that we can skip data if necessary.
+     */
+    long blockReaderOffset;
+    LocatedBlock targetBlock;
+    /**
+     * We use this field to indicate whether we should use this reader. In case
+     * we hit any issue with this reader, we set this field to true and avoid
+     * using it for the next stripe.
+     */
+    boolean shouldSkip = false;
+
+    BlockReaderInfo(BlockReader reader, LocatedBlock targetBlock,
+        DatanodeInfo dn, long offset) {
+      this.reader = reader;
+      this.targetBlock = targetBlock;
+      this.datanode = dn;
+      this.blockReaderOffset = offset;
+    }
+
+    void setOffset(long offset) {
+      this.blockReaderOffset = offset;
+    }
+
+    void skip() {
+      this.shouldSkip = true;
+    }
+  }
+
+  private final BlockReaderInfo[] blockReaders;
   private final int cellSize;
   private final short dataBlkNum;
   private final short parityBlkNum;
@@ -151,9 +179,7 @@ public class DFSStripedInputStream extends DFSInputStream {
     dataBlkNum = (short) schema.getNumDataUnits();
     parityBlkNum = (short) schema.getNumParityUnits();
     groupSize = dataBlkNum + parityBlkNum;
-    blockReaders = new BlockReader[groupSize];
-    blockReaderOffsets = new long[groupSize];
-    currentNodes = new DatanodeInfo[groupSize];
+    blockReaders = new BlockReaderInfo[groupSize];
     curStripeRange = new StripeRange(0, 0);
     readingService =
         new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
@@ -218,18 +244,26 @@ public class DFSStripedInputStream extends DFSInputStream 
{
     for (int i = 0; i < dataBlkNum; i++) {
       LocatedBlock targetBlock = targetBlocks[i];
       if (targetBlock != null) {
-        DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null);
-        if (retval != null) {
-          currentNodes[i] = retval.info;
-          blockReaders[i] = getBlockReaderWithRetry(targetBlock,
+        DNAddrPair dnInfo = getBestNodeDNAddrPair(targetBlock, null);
+        if (dnInfo != null) {
+          BlockReader reader = getBlockReaderWithRetry(targetBlock,
               minOffset, targetBlock.getBlockSize() - minOffset,
-              retval.addr, retval.storageType, retval.info, target, retry);
-          blockReaderOffsets[i] = minOffset;
+              dnInfo.addr, dnInfo.storageType, dnInfo.info, target, retry);
+          if (reader != null) {
+            blockReaders[i] = new BlockReaderInfo(reader, targetBlock,
+                dnInfo.info, minOffset);
+          }
         }
       }
     }
   }
 
+  /**
+   * @throws IOException only when failing to refetch block token, which 
happens
+   * when this client cannot get located block information from NameNode. This
+   * method returns null instead of throwing exception when failing to connect
+   * to the DataNode.
+   */
   private BlockReader getBlockReaderWithRetry(LocatedBlock targetBlock,
       long offsetInBlock, long length, InetSocketAddress targetAddr,
       StorageType storageType, DatanodeInfo datanode, long offsetInFile,
@@ -275,21 +309,16 @@ public class DFSStripedInputStream extends DFSInputStream 
{
     }
     for (int i = 0; i < groupSize; i++) {
       closeReader(i);
-      currentNodes[i] = null;
+      blockReaders[i] = null;
     }
     blockEnd = -1;
   }
 
   private void closeReader(int index) {
     if (blockReaders[index] != null) {
-      try {
-        blockReaders[index].close();
-      } catch (IOException e) {
-        DFSClient.LOG.error("error closing blockReader " + index, e);
-      }
-      blockReaders[index] = null;
+      IOUtils.cleanup(DFSClient.LOG, blockReaders[index].reader);
+      blockReaders[index].skip();
     }
-    blockReaderOffsets[index] = 0;
   }
 
   private long getOffsetInBlockGroup() {
@@ -323,16 +352,14 @@ public class DFSStripedInputStream extends DFSInputStream 
{
     AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(schema, 
cellSize,
         blockGroup, offsetInBlockGroup,
         offsetInBlockGroup + curStripeRange.length - 1, curStripeBuf);
-    // TODO handle null elements in blks (e.g., NN does not know locations for
-    // all the internal blocks)
     final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
         blockGroup, cellSize, dataBlkNum, parityBlkNum);
     // read the whole stripe
     for (AlignedStripe stripe : stripes) {
       // Parse group to get chosen DN location
       StripeReader sreader = new StatefulStripeReader(readingService, stripe,
-          blks);
-      sreader.readStripe(blks, corruptedBlockMap);
+          blks, corruptedBlockMap);
+      sreader.readStripe();
     }
     curStripeBuf.position(stripeBufOffset);
     curStripeBuf.limit(stripeLimit);
@@ -549,14 +576,13 @@ public class DFSStripedInputStream extends DFSInputStream 
{
         blockGroup, start, end, buf, offset);
     CompletionService<Void> readService = new ExecutorCompletionService<>(
         dfsClient.getStripedReadsThreadPool());
-    // TODO handle null elements in blks (e.g., NN does not know locations for
-    // all the internal blocks)
     final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
         blockGroup, cellSize, dataBlkNum, parityBlkNum);
     for (AlignedStripe stripe : stripes) {
       // Parse group to get chosen DN location
-      StripeReader preader = new PositionStripeReader(readService, stripe);
-      preader.readStripe(blks, corruptedBlockMap);
+      StripeReader preader = new PositionStripeReader(readService, stripe,
+          blks, corruptedBlockMap);
+      preader.readStripe();
     }
   }
 
@@ -586,43 +612,89 @@ public class DFSStripedInputStream extends DFSInputStream 
{
     final Map<Future<Void>, Integer> futures = new HashMap<>();
     final AlignedStripe alignedStripe;
     final CompletionService<Void> service;
+    final LocatedBlock[] targetBlocks;
+    final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap;
 
-    StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe) 
{
+    StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe,
+        LocatedBlock[] targetBlocks,
+        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
       this.service = service;
       this.alignedStripe = alignedStripe;
+      this.targetBlocks = targetBlocks;
+      this.corruptedBlockMap = corruptedBlockMap;
     }
 
-    /** submit reading chunk task */
-    abstract void readChunk(final CompletionService<Void> service,
-        final LocatedBlock block, int chunkIndex,
-        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap);
+    abstract boolean readChunk(final CompletionService<Void> service,
+        final LocatedBlock block, int chunkIndex);
 
-    /**
-     * When seeing first missing block, initialize decode input buffers.
-     * Also prepare the reading for data blocks outside of the reading range.
-     */
-    abstract void prepareDecodeInputs() throws IOException;
+    /** prepare all the data chunks */
+    abstract void prepareDecodeInputs();
 
-    /**
-     * Prepare reading for one more parity chunk.
-     */
-    abstract void prepareParityChunk() throws IOException;
+    /** prepare the parity chunk and block reader if necessary */
+    abstract boolean prepareParityChunk(int index) throws IOException;
 
     abstract void decode();
 
     abstract void updateState4SuccessRead(StripingChunkReadResult result);
 
+    private void checkMissingBlocks() throws IOException {
+      if (alignedStripe.missingChunksNum > parityBlkNum) {
+        clearFutures(futures.keySet());
+        throw new IOException(alignedStripe.missingChunksNum
+            + " missing blocks, the stripe is: " + alignedStripe);
+      }
+    }
+
+    /**
+     * We need decoding. Thus go through all the data chunks and make sure we
+     * submit read requests for all of them.
+     */
+    private void readDataForDecoding() throws IOException {
+      prepareDecodeInputs();
+      for (int i = 0; i < dataBlkNum; i++) {
+        Preconditions.checkNotNull(alignedStripe.chunks[i]);
+        if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) {
+          if (!readChunk(service, targetBlocks[i], i)) {
+            alignedStripe.missingChunksNum++;
+          }
+        }
+      }
+      checkMissingBlocks();
+    }
+
+    void readParityChunks(int num) throws IOException {
+      for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num;
+           i++) {
+        if (alignedStripe.chunks[i] == null) {
+          if (prepareParityChunk(i) && readChunk(service, targetBlocks[i], i)) 
{
+            j++;
+          } else {
+            alignedStripe.missingChunksNum++;
+          }
+        }
+      }
+      checkMissingBlocks();
+    }
+
     /** read the whole stripe. do decoding if necessary */
-    void readStripe(LocatedBlock[] blocks,
-        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
-        throws IOException {
-      assert alignedStripe.getSpanInBlock() > 0;
-      for (short i = 0; i < dataBlkNum; i++) {
-        if (alignedStripe.chunks[i] != null
-            && alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
-          readChunk(service, blocks[i], i, corruptedBlockMap);
+    void readStripe() throws IOException {
+      for (int i = 0; i < dataBlkNum; i++) {
+        if (alignedStripe.chunks[i] != null &&
+            alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
+          if (!readChunk(service, targetBlocks[i], i)) {
+            alignedStripe.missingChunksNum++;
+          }
         }
       }
+      // There are missing block locations at this stage. Thus we need to read
+      // the full stripe and one more parity block.
+      if (alignedStripe.missingChunksNum > 0) {
+        checkMissingBlocks();
+        readDataForDecoding();
+        // read parity chunks
+        readParityChunks(alignedStripe.missingChunksNum);
+      }
+      // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
 
       // Input buffers for potential decode operation, which remains null until
       // first read failure
@@ -648,24 +720,15 @@ public class DFSStripedInputStream extends DFSInputStream 
{
             }
           } else {
             returnedChunk.state = StripingChunk.MISSING;
-            alignedStripe.missingChunksNum++;
-            if (alignedStripe.missingChunksNum > parityBlkNum) {
-              clearFutures(futures.keySet());
-              throw new IOException("Too many blocks are missing: "
-                  + alignedStripe);
-            }
-
-            prepareDecodeInputs();
-            prepareParityChunk();
             // close the corresponding reader
             closeReader(r.index);
 
-            for (int i = 0; i < alignedStripe.chunks.length; i++) {
-              StripingChunk chunk = alignedStripe.chunks[i];
-              if (chunk != null && chunk.state == StripingChunk.REQUESTED) {
-                readChunk(service, blocks[i], i, corruptedBlockMap);
-              }
-            }
+            final int missing = alignedStripe.missingChunksNum;
+            alignedStripe.missingChunksNum++;
+            checkMissingBlocks();
+
+            readDataForDecoding();
+            readParityChunks(alignedStripe.missingChunksNum - missing);
           }
         } catch (InterruptedException ie) {
           String err = "Read request interrupted";
@@ -686,20 +749,24 @@ public class DFSStripedInputStream extends DFSInputStream 
{
     private byte[][] decodeInputs = null;
 
     PositionStripeReader(CompletionService<Void> service,
-        AlignedStripe alignedStripe) {
-      super(service, alignedStripe);
+        AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
+        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+      super(service, alignedStripe, targetBlocks, corruptedBlockMap);
     }
 
     @Override
-    void readChunk(final CompletionService<Void> service,
-        final LocatedBlock block, int chunkIndex,
-        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+    boolean readChunk(final CompletionService<Void> service,
+        final LocatedBlock block, int chunkIndex) {
+      final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
+      if (block == null) {
+        chunk.state = StripingChunk.MISSING;
+        return false;
+      }
       DatanodeInfo loc = block.getLocations()[0];
       StorageType type = block.getStorageTypes()[0];
       DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
           loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
           type);
-      StripingChunk chunk = alignedStripe.chunks[chunkIndex];
       chunk.state = StripingChunk.PENDING;
       Callable<Void> readCallable = getFromOneDataNode(dnAddr,
           block, alignedStripe.getOffsetInBlock(),
@@ -715,6 +782,7 @@ public class DFSStripedInputStream extends DFSInputStream {
             + alignedStripe.getSpanInBlock() - 1));
       }
       futures.put(getFromDNRequest, chunkIndex);
+      return true;
     }
 
     @Override
@@ -728,18 +796,15 @@ public class DFSStripedInputStream extends DFSInputStream 
{
     }
 
     @Override
-    void prepareParityChunk() {
-      for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) {
-        if (alignedStripe.chunks[i] == null) {
-          final int decodeIndex = convertIndex4Decode(i,
-              dataBlkNum, parityBlkNum);
-          alignedStripe.chunks[i] =
-              new StripingChunk(decodeInputs[decodeIndex]);
-          alignedStripe.chunks[i].addByteArraySlice(0,
-              (int) alignedStripe.getSpanInBlock());
-          break;
-        }
-      }
+    boolean prepareParityChunk(int index) {
+      Preconditions.checkState(index >= dataBlkNum &&
+          alignedStripe.chunks[index] == null);
+      final int decodeIndex = convertIndex4Decode(index, dataBlkNum,
+          parityBlkNum);
+      alignedStripe.chunks[index] = new 
StripingChunk(decodeInputs[decodeIndex]);
+      alignedStripe.chunks[index].addByteArraySlice(0,
+          (int) alignedStripe.getSpanInBlock());
+      return true;
     }
 
     @Override
@@ -753,39 +818,43 @@ public class DFSStripedInputStream extends DFSInputStream 
{
 
   class StatefulStripeReader extends StripeReader {
     ByteBuffer[] decodeInputs;
-    final LocatedBlock[] targetBlocks;
 
     StatefulStripeReader(CompletionService<Void> service,
-        AlignedStripe alignedStripe, LocatedBlock[] targetBlocks) {
-      super(service, alignedStripe);
-      this.targetBlocks = targetBlocks;
+        AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
+        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+      super(service, alignedStripe, targetBlocks, corruptedBlockMap);
     }
 
     @Override
-    void readChunk(final CompletionService<Void> service,
-        final LocatedBlock block, int chunkIndex, Map<ExtendedBlock,
-        Set<DatanodeInfo>> corruptedBlockMap) {
-      StripingChunk chunk = alignedStripe.chunks[chunkIndex];
+    boolean readChunk(final CompletionService<Void> service,
+        final LocatedBlock block, int chunkIndex) {
+      final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
+      final BlockReaderInfo readerInfo = blockReaders[chunkIndex];
+      if (readerInfo == null || block == null || readerInfo.shouldSkip) {
+        chunk.state = StripingChunk.MISSING;
+        return false;
+      }
       chunk.state = StripingChunk.PENDING;
       ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer);
-      Callable<Void> readCallable = readCell(blockReaders[chunkIndex],
-          currentNodes[chunkIndex], blockReaderOffsets[chunkIndex],
+      Callable<Void> readCallable = readCell(readerInfo.reader,
+          readerInfo.datanode, readerInfo.blockReaderOffset,
           alignedStripe.getOffsetInBlock(), strategy,
           chunk.byteBuffer.remaining(), block.getBlock(), corruptedBlockMap);
       Future<Void> request = readingService.submit(readCallable);
       futures.put(request, chunkIndex);
+      return true;
     }
 
     @Override
     void updateState4SuccessRead(StripingChunkReadResult result) {
       Preconditions.checkArgument(
           result.state == StripingChunkReadResult.SUCCESSFUL);
-      blockReaderOffsets[result.index] =
-          alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock();
+      blockReaders[result.index].setOffset(alignedStripe.getOffsetInBlock()
+          + alignedStripe.getSpanInBlock());
     }
 
     @Override
-    void prepareDecodeInputs() throws IOException {
+    void prepareDecodeInputs() {
       if (decodeInputs == null) {
         decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum];
         ByteBuffer cur = curStripeBuf.duplicate();
@@ -799,52 +868,58 @@ public class DFSStripedInputStream extends DFSInputStream 
{
               parityBlkNum);
           decodeInputs[decodeIndex] = cur.slice();
           if (alignedStripe.chunks[i] == null) {
-            alignedStripe.chunks[i] =
-                new StripingChunk(decodeInputs[decodeIndex]);
+            alignedStripe.chunks[i] = new StripingChunk(
+                decodeInputs[decodeIndex]);
           }
         }
       }
     }
 
     @Override
-    void prepareParityChunk() throws IOException {
-      for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) {
-        if (alignedStripe.chunks[i] == null) {
-          final int decodeIndex = convertIndex4Decode(i, dataBlkNum,
-              parityBlkNum);
-          decodeInputs[decodeIndex] = ByteBuffer.allocateDirect(
-              (int) alignedStripe.range.spanInBlock);
-          alignedStripe.chunks[i] =
-              new StripingChunk(decodeInputs[decodeIndex]);
-          if (blockReaders[i] == null) {
-            prepareParityBlockReader(i);
-          }
-          break;
-        }
+    boolean prepareParityChunk(int index) throws IOException {
+      Preconditions.checkState(index >= dataBlkNum
+          && alignedStripe.chunks[index] == null);
+      if (blockReaders[index] != null && blockReaders[index].shouldSkip) {
+        alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING);
+        // we have failed the block reader before
+        return false;
       }
+      final int decodeIndex = convertIndex4Decode(index, dataBlkNum,
+          parityBlkNum);
+      decodeInputs[decodeIndex] = ByteBuffer.allocateDirect(
+          (int) alignedStripe.range.spanInBlock);
+      alignedStripe.chunks[index] = new 
StripingChunk(decodeInputs[decodeIndex]);
+      if (blockReaders[index] == null && !prepareParityBlockReader(index)) {
+        alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING);
+        return false;
+      }
+      return true;
     }
 
-    private void prepareParityBlockReader(int i) throws IOException {
+    private boolean prepareParityBlockReader(int i) throws IOException {
       // prepare the block reader for the parity chunk
       LocatedBlock targetBlock = targetBlocks[i];
       if (targetBlock != null) {
         final long offsetInBlock = alignedStripe.getOffsetInBlock();
-        DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null);
-        if (retval != null) {
-          currentNodes[i] = retval.info;
-          blockReaders[i] = getBlockReaderWithRetry(targetBlock,
+        DNAddrPair dnInfo = getBestNodeDNAddrPair(targetBlock, null);
+        if (dnInfo != null) {
+          BlockReader reader = getBlockReaderWithRetry(targetBlock,
               offsetInBlock, targetBlock.getBlockSize() - offsetInBlock,
-              retval.addr, retval.storageType, retval.info,
+              dnInfo.addr, dnInfo.storageType, dnInfo.info,
               DFSStripedInputStream.this.getPos(), retry);
-          blockReaderOffsets[i] = offsetInBlock;
+          if (reader != null) {
+            blockReaders[i] = new BlockReaderInfo(reader, targetBlock,
+                dnInfo.info, offsetInBlock);
+            return true;
+          }
         }
       }
+      return false;
     }
 
     @Override
     void decode() {
-      // TODO no copy for data chunks. this depends on HADOOP-12047 for some
-      // decoders to work
+      // TODO no copy for data chunks. this depends on HADOOP-12047
       final int span = (int) alignedStripe.getSpanInBlock();
       for (int i = 0; i < alignedStripe.chunks.length; i++) {
         final int decodeIndex = convertIndex4Decode(i,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/559ebd77/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 579434b..6bd5e1f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -83,6 +83,7 @@ public class StripedBlockUtil {
     LocatedBlock[] lbs = new LocatedBlock[dataBlkNum + parityBlkNum];
     for (short i = 0; i < locatedBGSize; i++) {
       final int idx = bg.getBlockIndices()[i];
+      // for now we do not use redundant replica of an internal block
       if (idx < (dataBlkNum + parityBlkNum) && lbs[idx] == null) {
         lbs[idx] = constructInternalBlock(bg, i, cellSize,
             dataBlkNum, idx);
@@ -212,7 +213,9 @@ public class StripedBlockUtil {
         return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT);
       }
     } catch (ExecutionException e) {
-      DFSClient.LOG.warn("ExecutionException " + e);
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("ExecutionException " + e);
+      }
       return new StripingChunkReadResult(futures.remove(future),
           StripingChunkReadResult.FAILED);
     } catch (CancellationException e) {
@@ -623,7 +626,7 @@ public class StripedBlockUtil {
             cellSize, dataBlkNum, i);
         if (internalBlkLen <= s.getOffsetInBlock()) {
           Preconditions.checkState(s.chunks[i] == null);
-          s.chunks[i] = new StripingChunk(); // chunk state is set to ALLZERO
+          s.chunks[i] = new StripingChunk(StripingChunk.ALLZERO);
         }
       }
     }
@@ -841,10 +844,10 @@ public class StripedBlockUtil {
       this.byteBuffer = buf;
     }
 
-    public StripingChunk() {
+    public StripingChunk(int state) {
       this.byteArray = null;
       this.byteBuffer = null;
-      this.state = ALLZERO;
+      this.state = state;
     }
 
     public void addByteArraySlice(int offset, int length) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/559ebd77/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
index 2369704..815a50d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
@@ -18,9 +18,16 @@
 package org.apache.hadoop.hdfs;
 
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
+import org.junit.Assert;
 
+import java.io.EOFException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Random;
 
 public class StripedFileTestUtil {
@@ -56,4 +63,125 @@ public class StripedFileTestUtil {
     final int mod = 29;
     return (byte) (pos % mod + 1);
   }
+
+  static void verifyLength(FileSystem fs, Path srcPath, int fileLength)
+      throws IOException {
+    FileStatus status = fs.getFileStatus(srcPath);
+    Assert.assertEquals("File length should be the same", fileLength, 
status.getLen());
+  }
+
+  static void verifyPread(FileSystem fs, Path srcPath,  int fileLength,
+      byte[] expected, byte[] buf) throws IOException {
+    try (FSDataInputStream in = fs.open(srcPath)) {
+      int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102,
+          cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102,
+          cellSize * dataBlocks, fileLength - 102, fileLength - 1};
+      for (int startOffset : startOffsets) {
+        startOffset = Math.max(0, Math.min(startOffset, fileLength - 1));
+        int remaining = fileLength - startOffset;
+        in.readFully(startOffset, buf, 0, remaining);
+        for (int i = 0; i < remaining; i++) {
+          Assert.assertEquals("Byte at " + (startOffset + i) + " should be the 
" +
+              "same", expected[startOffset + i], buf[i]);
+        }
+      }
+    }
+  }
+
+  static void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
+      byte[] expected, byte[] buf) throws IOException {
+    try (FSDataInputStream in = fs.open(srcPath)) {
+      final byte[] result = new byte[fileLength];
+      int readLen = 0;
+      int ret;
+      while ((ret = in.read(buf, 0, buf.length)) >= 0) {
+        System.arraycopy(buf, 0, result, readLen, ret);
+        readLen += ret;
+      }
+      Assert.assertEquals("The length of file should be the same to write 
size",
+          fileLength, readLen);
+      Assert.assertArrayEquals(expected, result);
+    }
+  }
+
+  static void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
+      byte[] expected, ByteBuffer buf) throws IOException {
+    try (FSDataInputStream in = fs.open(srcPath)) {
+      ByteBuffer result = ByteBuffer.allocate(fileLength);
+      int readLen = 0;
+      int ret;
+      while ((ret = in.read(buf)) >= 0) {
+        readLen += ret;
+        buf.flip();
+        result.put(buf);
+        buf.clear();
+      }
+      Assert.assertEquals("The length of file should be the same to write 
size",
+          fileLength, readLen);
+      Assert.assertArrayEquals(expected, result.array());
+    }
+  }
+
+  static void verifySeek(FileSystem fs, Path srcPath, int fileLength)
+      throws IOException {
+    try (FSDataInputStream in = fs.open(srcPath)) {
+      // seek to 1/2 of content
+      int pos = fileLength / 2;
+      assertSeekAndRead(in, pos, fileLength);
+
+      // seek to 1/3 of content
+      pos = fileLength / 3;
+      assertSeekAndRead(in, pos, fileLength);
+
+      // seek to 0 pos
+      pos = 0;
+      assertSeekAndRead(in, pos, fileLength);
+
+      if (fileLength > cellSize) {
+        // seek to cellSize boundary
+        pos = cellSize - 1;
+        assertSeekAndRead(in, pos, fileLength);
+      }
+
+      if (fileLength > cellSize * dataBlocks) {
+        // seek to striped cell group boundary
+        pos = cellSize * dataBlocks - 1;
+        assertSeekAndRead(in, pos, fileLength);
+      }
+
+      if (fileLength > blockSize * dataBlocks) {
+        // seek to striped block group boundary
+        pos = blockSize * dataBlocks - 1;
+        assertSeekAndRead(in, pos, fileLength);
+      }
+
+      if (!(in.getWrappedStream() instanceof ByteRangeInputStream)) {
+        try {
+          in.seek(-1);
+          Assert.fail("Should be failed if seek to negative offset");
+        } catch (EOFException e) {
+          // expected
+        }
+
+        try {
+          in.seek(fileLength + 1);
+          Assert.fail("Should be failed if seek after EOF");
+        } catch (EOFException e) {
+          // expected
+        }
+      }
+    }
+  }
+
+  static void assertSeekAndRead(FSDataInputStream fsdis, int pos,
+      int writeBytes) throws IOException {
+    fsdis.seek(pos);
+    byte[] buf = new byte[writeBytes];
+    int readLen = StripedFileTestUtil.readAll(fsdis, buf);
+    Assert.assertEquals(readLen, writeBytes - pos);
+    for (int i = 0; i < readLen; i++) {
+      Assert.assertEquals("Byte at " + i + " should be the same",
+          StripedFileTestUtil.getByte(pos + i), buf[i]);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/559ebd77/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java
new file mode 100644
index 0000000..4c2438d
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
+
+/**
+ * Test reading a striped file when some of its blocks are missing (not 
included
+ * in the block locations returned by the NameNode).
+ */
+public class TestReadStripedFileWithMissingBlocks {
+  public static final Log LOG = LogFactory
+      .getLog(TestReadStripedFileWithMissingBlocks.class);
+  private static MiniDFSCluster cluster;
+  private static FileSystem fs;
+  private static Configuration conf = new HdfsConfiguration();
+  private final int fileLength = blockSize * dataBlocks + 123;
+
+  @Before
+  public void setup() throws IOException {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.getFileSystem().getClient().createErasureCodingZone("/",
+        null, cellSize);
+    fs = cluster.getFileSystem();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testReadFileWithMissingBlocks1() throws IOException {
+    readFileWithMissingBlocks(new Path("/foo"), fileLength, 1, 0);
+  }
+
+  @Test
+  public void testReadFileWithMissingBlocks2() throws IOException {
+    readFileWithMissingBlocks(new Path("/foo"), fileLength, 1, 1);
+  }
+
+  @Test
+  public void testReadFileWithMissingBlocks3() throws IOException {
+    readFileWithMissingBlocks(new Path("/foo"), fileLength, 1, 2);
+  }
+
+  @Test
+  public void testReadFileWithMissingBlocks4() throws IOException {
+    readFileWithMissingBlocks(new Path("/foo"), fileLength, 2, 0);
+  }
+
+  @Test
+  public void testReadFileWithMissingBlocks5() throws IOException {
+    readFileWithMissingBlocks(new Path("/foo"), fileLength, 2, 1);
+  }
+
+  @Test
+  public void testReadFileWithMissingBlocks6() throws IOException {
+    readFileWithMissingBlocks(new Path("/foo"), fileLength, 3, 0);
+  }
+
+  private void readFileWithMissingBlocks(Path srcPath, int fileLength,
+      int missingDataNum, int missingParityNum)
+      throws IOException {
+    LOG.info("readFileWithMissingBlocks: (" + missingDataNum + ","
+        + missingParityNum + ")");
+    final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
+    DFSTestUtil.writeFile(fs, srcPath, new String(expected));
+    StripedFileTestUtil.verifyLength(fs, srcPath, fileLength);
+    int dataBlocks = (fileLength - 1) / cellSize + 1;
+    BlockLocation[] locs = fs.getFileBlockLocations(srcPath, 0, cellSize);
+
+    int[] missingDataNodes = new int[missingDataNum + missingParityNum];
+    for (int i = 0; i < missingDataNum; i++) {
+      missingDataNodes[i] = i;
+    }
+    for (int i = 0; i < missingParityNum; i++) {
+      missingDataNodes[i + missingDataNum] = i +
+          Math.min(StripedFileTestUtil.dataBlocks, dataBlocks);
+    }
+    stopDataNodes(locs, missingDataNodes);
+
+    // make sure there are missing block locations
+    BlockLocation[] newLocs = fs.getFileBlockLocations(srcPath, 0, cellSize);
+    Assert.assertTrue(newLocs[0].getNames().length < 
locs[0].getNames().length);
+
+    byte[] smallBuf = new byte[1024];
+    byte[] largeBuf = new byte[fileLength + 100];
+    StripedFileTestUtil.verifySeek(fs, srcPath, fileLength);
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
+        smallBuf);
+    StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, 
largeBuf);
+
+    // delete the file
+    fs.delete(srcPath, true);
+  }
+
+  private void stopDataNodes(BlockLocation[] locs, int[] datanodes)
+      throws IOException {
+    if (locs != null && locs.length > 0) {
+      for (int failedDNIdx : datanodes) {
+        String name = (locs[0].getNames())[failedDNIdx];
+        for (DataNode dn : cluster.getDataNodes()) {
+          int port = dn.getXferPort();
+          if (name.contains(Integer.toString(port))) {
+            dn.shutdown();
+            cluster.setDataNodeDead(dn.getDatanodeId());
+            LOG.info("stop datanode " + failedDNIdx);
+            break;
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/559ebd77/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
index 272650d..2f9322d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
@@ -21,20 +21,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
 import org.apache.hadoop.hdfs.web.WebHdfsConstants;
 import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -48,11 +43,10 @@ public class TestWriteReadStripedFile {
   public static final Log LOG = 
LogFactory.getLog(TestWriteReadStripedFile.class);
   private static MiniDFSCluster cluster;
   private static FileSystem fs;
-  private static Configuration conf;
+  private static Configuration conf = new HdfsConfiguration();
 
   @Before
   public void setup() throws IOException {
-    conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
     cluster.getFileSystem().getClient().createErasureCodingZone("/",
@@ -175,18 +169,6 @@ public class TestWriteReadStripedFile {
             + cellSize + 123, true);
   }
 
-  private void assertSeekAndRead(FSDataInputStream fsdis, int pos,
-                                 int writeBytes) throws IOException {
-    fsdis.seek(pos);
-    byte[] buf = new byte[writeBytes];
-    int readLen = StripedFileTestUtil.readAll(fsdis, buf);
-    Assert.assertEquals(readLen, writeBytes - pos);
-    for (int i = 0; i < readLen; i++) {
-      Assert.assertEquals("Byte at " + i + " should be the same",
-          StripedFileTestUtil.getByte(pos + i), buf[i]);
-    }
-  }
-
   private void testOneFileUsingDFSStripedInputStream(String src, int 
fileLength)
       throws IOException {
     testOneFileUsingDFSStripedInputStream(src, fileLength, false);
@@ -198,7 +180,7 @@ public class TestWriteReadStripedFile {
     Path srcPath = new Path(src);
     DFSTestUtil.writeFile(fs, srcPath, new String(expected));
 
-    verifyLength(fs, srcPath, fileLength);
+    StripedFileTestUtil.verifyLength(fs, srcPath, fileLength);
 
     if (withDataNodeFailure) {
       int dnIndex = 1; // TODO: StripedFileTestUtil.random.nextInt(dataBlocks);
@@ -208,14 +190,16 @@ public class TestWriteReadStripedFile {
 
     byte[] smallBuf = new byte[1024];
     byte[] largeBuf = new byte[fileLength + 100];
-    verifyPread(fs, srcPath, fileLength, expected, largeBuf);
+    StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, 
largeBuf);
 
-    verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf);
-    verifySeek(fs, srcPath, fileLength);
-    verifyStatefulRead(fs, srcPath, fileLength, expected,
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
+        largeBuf);
+    StripedFileTestUtil.verifySeek(fs, srcPath, fileLength);
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
         ByteBuffer.allocate(fileLength + 100));
-    verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf);
-    verifyStatefulRead(fs, srcPath, fileLength, expected,
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
+        smallBuf);
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
         ByteBuffer.allocate(1024));
   }
 
@@ -241,130 +225,18 @@ public class TestWriteReadStripedFile {
     final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
     FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
         WebHdfsConstants.WEBHDFS_SCHEME);
-    Path srcPath = new Path("/testWriteReadUsingWebHdfs_stripe");
+    Path srcPath = new Path("/testWriteReadUsingWebHdfs");
     DFSTestUtil.writeFile(fs, srcPath, new String(expected));
 
-    verifyLength(fs, srcPath, fileLength);
+    StripedFileTestUtil.verifyLength(fs, srcPath, fileLength);
 
     byte[] smallBuf = new byte[1024];
     byte[] largeBuf = new byte[fileLength + 100];
-    verifyPread(fs, srcPath, fileLength, expected, largeBuf);
-
-    verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf);
-    verifySeek(fs, srcPath, fileLength);
-    verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf);
-    //webhdfs doesn't support bytebuffer read
-  }
-
-  void verifyLength(FileSystem fs, Path srcPath, int fileLength)
-      throws IOException {
-    FileStatus status = fs.getFileStatus(srcPath);
-    Assert.assertEquals("File length should be the same",
-        fileLength, status.getLen());
-  }
-
-  void verifyPread(FileSystem fs, Path srcPath,  int fileLength,
-                   byte[] expected, byte[] buf) throws IOException {
-    try (FSDataInputStream in = fs.open(srcPath)) {
-      int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102,
-          cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102,
-          cellSize * dataBlocks, fileLength - 102, fileLength - 1};
-      for (int startOffset : startOffsets) {
-        startOffset = Math.max(0, Math.min(startOffset, fileLength - 1));
-        int remaining = fileLength - startOffset;
-        in.readFully(startOffset, buf, 0, remaining);
-        for (int i = 0; i < remaining; i++) {
-          Assert.assertEquals("Byte at " + (startOffset + i) + " should be the 
" +
-              "same", expected[startOffset + i], buf[i]);
-        }
-      }
-    }
-  }
-
-  void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
-                          byte[] expected, byte[] buf) throws IOException {
-    try (FSDataInputStream in = fs.open(srcPath)) {
-      final byte[] result = new byte[fileLength];
-      int readLen = 0;
-      int ret;
-      while ((ret = in.read(buf, 0, buf.length)) >= 0) {
-        System.arraycopy(buf, 0, result, readLen, ret);
-        readLen += ret;
-      }
-      Assert.assertEquals("The length of file should be the same to write 
size",
-          fileLength, readLen);
-      Assert.assertArrayEquals(expected, result);
-    }
-  }
-
+    StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, 
largeBuf);
 
-  void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
-                          byte[] expected, ByteBuffer buf) throws IOException {
-    try (FSDataInputStream in = fs.open(srcPath)) {
-      ByteBuffer result = ByteBuffer.allocate(fileLength);
-      int readLen = 0;
-      int ret;
-      while ((ret = in.read(buf)) >= 0) {
-        readLen += ret;
-        buf.flip();
-        result.put(buf);
-        buf.clear();
-      }
-      Assert.assertEquals("The length of file should be the same to write 
size",
-          fileLength, readLen);
-      Assert.assertArrayEquals(expected, result.array());
-    }
-  }
-
-
-  void verifySeek(FileSystem fs, Path srcPath, int fileLength)
-      throws IOException {
-    try (FSDataInputStream in = fs.open(srcPath)) {
-      // seek to 1/2 of content
-      int pos = fileLength / 2;
-      assertSeekAndRead(in, pos, fileLength);
-
-      // seek to 1/3 of content
-      pos = fileLength / 3;
-      assertSeekAndRead(in, pos, fileLength);
-
-      // seek to 0 pos
-      pos = 0;
-      assertSeekAndRead(in, pos, fileLength);
-
-      if (fileLength > cellSize) {
-        // seek to cellSize boundary
-        pos = cellSize - 1;
-        assertSeekAndRead(in, pos, fileLength);
-      }
-
-      if (fileLength > cellSize * dataBlocks) {
-        // seek to striped cell group boundary
-        pos = cellSize * dataBlocks - 1;
-        assertSeekAndRead(in, pos, fileLength);
-      }
-
-      if (fileLength > blockSize * dataBlocks) {
-        // seek to striped block group boundary
-        pos = blockSize * dataBlocks - 1;
-        assertSeekAndRead(in, pos, fileLength);
-      }
-
-      if (!(in.getWrappedStream() instanceof ByteRangeInputStream)) {
-        try {
-          in.seek(-1);
-          Assert.fail("Should be failed if seek to negative offset");
-        } catch (EOFException e) {
-          // expected
-        }
-
-        try {
-          in.seek(fileLength + 1);
-          Assert.fail("Should be failed if seek after EOF");
-        } catch (EOFException e) {
-          // expected
-        }
-      }
-    }
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, 
largeBuf);
+    StripedFileTestUtil.verifySeek(fs, srcPath, fileLength);
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, 
smallBuf);
+    // webhdfs doesn't support bytebuffer read
   }
 }

Reply via email to