Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7285 2d847e7d6 -> 71329e817


HDFS-8517. Fix a decoding issue in stripped block recovering in client side. 
Contributed by Kai Zheng.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/71329e81
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/71329e81
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/71329e81

Branch: refs/heads/HDFS-7285
Commit: 71329e817b99dbee630f902fa3640c3c93f04a44
Parents: 2d847e7
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Jun 2 15:35:49 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Tue Jun 2 15:35:49 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   3 +
 .../hadoop/hdfs/DFSStripedInputStream.java      |   7 +-
 .../hadoop/hdfs/util/StripedBlockUtil.java      |  57 +++--
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  14 +-
 .../apache/hadoop/hdfs/StripedFileTestUtil.java |  59 +++++
 .../hadoop/hdfs/TestDFSStripedInputStream.java  |   7 +-
 .../hdfs/TestReadStripedFileWithDecoding.java   | 108 +++++++++
 .../hadoop/hdfs/TestWriteReadStripedFile.java   | 238 ++++++-------------
 8 files changed, 305 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/71329e81/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index c3c55c7..fa0a8e2 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -271,3 +271,6 @@
 
     HDFS-8444. Erasure Coding: fix cannot rename a zone dir
     (Walter Su via vinayakumarb)
+
+    HDFS-8517. Fix a decoding issue in stripped block recovering in client 
side.
+    (Kai Zheng via jing9)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71329e81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index b4aa033..228368b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -597,9 +597,10 @@ public class DFSStripedInputStream extends DFSInputStream {
     }
 
     if (alignedStripe.missingChunksNum > 0) {
-      finalizeDecodeInputs(decodeInputs, alignedStripe);
-      decodeAndFillBuffer(decodeInputs, buf, alignedStripe, parityBlkNum,
-          decoder);
+      finalizeDecodeInputs(decodeInputs, dataBlkNum, parityBlkNum,
+          alignedStripe);
+      decodeAndFillBuffer(decodeInputs, buf, alignedStripe, dataBlkNum,
+          parityBlkNum, decoder);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71329e81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index e75209f..80321ef 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.io.erasurecode.ECSchema;
-import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
 
 import java.util.*;
@@ -257,7 +256,8 @@ public class StripedBlockUtil {
         new byte[dataBlkNum + parityBlkNum][(int) 
alignedStripe.getSpanInBlock()];
     for (int i = 0; i < alignedStripe.chunks.length; i++) {
       if (alignedStripe.chunks[i] == null) {
-        alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
+        final int decodeIndex = convertIndex4Decode(i, dataBlkNum, 
parityBlkNum);
+        alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]);
         alignedStripe.chunks[i].offsetsInBuf.add(0);
         alignedStripe.chunks[i].lengthsInBuf.add((int) 
alignedStripe.getSpanInBlock());
       }
@@ -273,35 +273,57 @@ public class StripedBlockUtil {
    * finalize decode input buffers.
    */
   public static void finalizeDecodeInputs(final byte[][] decodeInputs,
-      AlignedStripe alignedStripe) {
+      int dataBlkNum, int parityBlkNum, AlignedStripe alignedStripe) {
     for (int i = 0; i < alignedStripe.chunks.length; i++) {
-      StripingChunk chunk = alignedStripe.chunks[i];
+      final StripingChunk chunk = alignedStripe.chunks[i];
+      final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
       if (chunk.state == StripingChunk.FETCHED) {
         int posInBuf = 0;
         for (int j = 0; j < chunk.offsetsInBuf.size(); j++) {
           System.arraycopy(chunk.buf, chunk.offsetsInBuf.get(j),
-              decodeInputs[i], posInBuf, chunk.lengthsInBuf.get(j));
+              decodeInputs[decodeIndex], posInBuf, chunk.lengthsInBuf.get(j));
           posInBuf += chunk.lengthsInBuf.get(j);
         }
       } else if (chunk.state == StripingChunk.ALLZERO) {
-        Arrays.fill(decodeInputs[i], (byte)0);
+        Arrays.fill(decodeInputs[decodeIndex], (byte) 0);
       } else {
-        decodeInputs[i] = null;
+        decodeInputs[decodeIndex] = null;
       }
     }
   }
+
+  /**
+   * Currently decoding requires parity chunks are before data chunks.
+   * The indices are opposite to what we store in NN. In future we may
+   * improve the decoding to make the indices order the same as in NN.
+   *
+   * @param index The index to convert
+   * @param dataBlkNum The number of data blocks
+   * @param parityBlkNum The number of parity blocks
+   * @return converted index
+   */
+  public static int convertIndex4Decode(int index, int dataBlkNum,
+      int parityBlkNum) {
+    return index < dataBlkNum ? index + parityBlkNum : index - dataBlkNum;
+  }
+
+  public static int convertDecodeIndexBack(int index, int dataBlkNum,
+      int parityBlkNum) {
+    return index < parityBlkNum ? index + dataBlkNum : index - parityBlkNum;
+  }
+
   /**
    * Decode based on the given input buffers and schema.
    */
   public static void decodeAndFillBuffer(final byte[][] decodeInputs,
-      byte[] buf, AlignedStripe alignedStripe, int parityBlkNum,
+      byte[] buf, AlignedStripe alignedStripe, int dataBlkNum, int 
parityBlkNum,
       RawErasureDecoder decoder) {
     // Step 1: prepare indices and output buffers for missing data units
     int[] decodeIndices = new int[parityBlkNum];
     int pos = 0;
     for (int i = 0; i < alignedStripe.chunks.length; i++) {
       if (alignedStripe.chunks[i].state == StripingChunk.MISSING){
-        decodeIndices[pos++] = i;
+        decodeIndices[pos++] = convertIndex4Decode(i, dataBlkNum, 
parityBlkNum);
       }
     }
     decodeIndices = Arrays.copyOf(decodeIndices, pos);
@@ -313,13 +335,14 @@ public class StripedBlockUtil {
 
     // Step 3: fill original application buffer with decoded data
     for (int i = 0; i < decodeIndices.length; i++) {
-      int missingBlkIdx = decodeIndices[i];
+      int missingBlkIdx = convertDecodeIndexBack(decodeIndices[i],
+          dataBlkNum, parityBlkNum);
       StripingChunk chunk = alignedStripe.chunks[missingBlkIdx];
       if (chunk.state == StripingChunk.MISSING) {
         int srcPos = 0;
         for (int j = 0; j < chunk.offsetsInBuf.size(); j++) {
-          System.arraycopy(decodeOutputs[i], srcPos, buf, 
chunk.offsetsInBuf.get(j),
-              chunk.lengthsInBuf.get(j));
+          System.arraycopy(decodeOutputs[i], srcPos, buf,
+              chunk.offsetsInBuf.get(j), chunk.lengthsInBuf.get(j));
           srcPos += chunk.lengthsInBuf.get(j);
         }
       }
@@ -330,7 +353,7 @@ public class StripedBlockUtil {
    * This method divides a requested byte range into an array of inclusive
    * {@link AlignedStripe}.
    * @param ecSchema The codec schema for the file, which carries the numbers
-   *                 of data / parity blocks, as well as cell size
+   *                 of data / parity blocks
    * @param cellSize Cell size of stripe
    * @param blockGroup The striped block group
    * @param rangeStartInBlockGroup The byte range's start offset in block group
@@ -345,7 +368,6 @@ public class StripedBlockUtil {
       int cellSize, LocatedStripedBlock blockGroup,
       long rangeStartInBlockGroup, long rangeEndInBlockGroup, byte[] buf,
       int offsetInBuf) {
-    // TODO: change ECSchema naming to use cell size instead of chunk size
 
     // Step 0: analyze range and calculate basic parameters
     int dataBlkNum = ecSchema.getNumDataUnits();
@@ -362,8 +384,7 @@ public class StripedBlockUtil {
     AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges);
 
     // Step 4: calculate each chunk's position in destination buffer
-    calcualteChunkPositionsInBuf(ecSchema, cellSize, stripes, cells, buf,
-        offsetInBuf);
+    calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf, offsetInBuf);
 
     // Step 5: prepare ALLZERO blocks
     prepareAllZeroChunks(blockGroup, buf, stripes, cellSize, dataBlkNum);
@@ -508,8 +529,8 @@ public class StripedBlockUtil {
     return stripes.toArray(new AlignedStripe[stripes.size()]);
   }
 
-  private static void calcualteChunkPositionsInBuf(ECSchema ecSchema,
-      int cellSize, AlignedStripe[] stripes, StripingCell[] cells, byte[] buf,
+  private static void calcualteChunkPositionsInBuf(int cellSize,
+      AlignedStripe[] stripes, StripingCell[] cells, byte[] buf,
       int offsetInBuf) {
     /**
      *     | <--------------- AlignedStripe --------------->|

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71329e81/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 82c0781..6cd7003 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -790,15 +790,21 @@ public class DFSTestUtil {
     return os.toByteArray();
   }
 
-  /* Write the given string to the given file */
-  public static void writeFile(FileSystem fs, Path p, String s) 
+  /* Write the given bytes to the given file */
+  public static void writeFile(FileSystem fs, Path p, byte[] bytes)
       throws IOException {
     if (fs.exists(p)) {
       fs.delete(p, true);
     }
-    InputStream is = new ByteArrayInputStream(s.getBytes());
+    InputStream is = new ByteArrayInputStream(bytes);
     FSDataOutputStream os = fs.create(p);
-    IOUtils.copyBytes(is, os, s.length(), true);
+    IOUtils.copyBytes(is, os, bytes.length, true);
+  }
+
+  /* Write the given string to the given file */
+  public static void writeFile(FileSystem fs, Path p, String s)
+      throws IOException {
+    writeFile(fs, p, s.getBytes());
   }
 
   /* Append the given string to the given file */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71329e81/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
new file mode 100644
index 0000000..54367d7
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+
+import java.io.IOException;
+import java.util.Random;
+
+public class StripedFileTestUtil {
+  static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+  static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+
+  static final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  static final int stripesPerBlock = 4;
+  static final int blockSize = cellSize * stripesPerBlock;
+  static final int numDNs = dataBlocks + parityBlocks + 2;
+
+  static final Random r = new Random();
+
+  static byte[] generateBytes(int cnt) {
+    byte[] bytes = new byte[cnt];
+    for (int i = 0; i < cnt; i++) {
+      bytes[i] = getByte(i);
+    }
+    return bytes;
+  }
+
+  static int readAll(FSDataInputStream in, byte[] buf) throws IOException {
+    int readLen = 0;
+    int ret;
+    while ((ret = in.read(buf, readLen, buf.length - readLen)) >= 0 &&
+        readLen <= buf.length) {
+      readLen += ret;
+    }
+    return readLen;
+  }
+
+  static byte getByte(long pos) {
+    final int mod = 29;
+    return (byte) (pos % mod + 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71329e81/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index ce56325..b64e690 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -208,17 +208,18 @@ public class TestDFSStripedInputStream {
     // Update the expected content for decoded data
     for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
       byte[][] decodeInputs = new byte[DATA_BLK_NUM + 
PARITY_BLK_NUM][CELLSIZE];
-      int[] missingBlkIdx = new int[]{failedDNIdx, DATA_BLK_NUM+1, 
DATA_BLK_NUM+2};
+      int[] missingBlkIdx = new int[]{failedDNIdx + PARITY_BLK_NUM, 1, 2};
       byte[][] decodeOutputs = new byte[PARITY_BLK_NUM][CELLSIZE];
       for (int j = 0; j < DATA_BLK_NUM; j++) {
         int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE;
         if (j != failedDNIdx) {
-          System.arraycopy(expected, posInBuf, decodeInputs[j], 0, CELLSIZE);
+          System.arraycopy(expected, posInBuf, decodeInputs[j + 
PARITY_BLK_NUM],
+              0, CELLSIZE);
         }
       }
       for (int k = 0; k < CELLSIZE; k++) {
         int posInBlk = i * CELLSIZE + k;
-        decodeInputs[DATA_BLK_NUM][k] = SimulatedFSDataset.simulatedByte(
+        decodeInputs[0][k] = SimulatedFSDataset.simulatedByte(
             new Block(bg.getBlock().getBlockId() + DATA_BLK_NUM), posInBlk);
       }
       for (int m : missingBlkIdx) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71329e81/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
new file mode 100644
index 0000000..7397caf
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
+
+public class TestReadStripedFileWithDecoding {
+
+  private MiniDFSCluster cluster;
+  private FileSystem fs;
+
+  @Before
+  public void setup() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.getFileSystem().getClient().createErasureCodingZone("/",
+        null, cellSize);
+    fs = cluster.getFileSystem();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testWritePreadWithDNFailure1() throws IOException {
+    testWritePreadWithDNFailure("/foo", 0);
+  }
+
+  @Test
+  public void testWritePreadWithDNFailure2() throws IOException {
+    testWritePreadWithDNFailure("/foo", cellSize * 5);
+  }
+
+  private void testWritePreadWithDNFailure(String file, int startOffsetInFile)
+      throws IOException {
+    final int failedDNIdx = 2;
+    final int length = cellSize * (dataBlocks + 2);
+    Path testPath = new Path(file);
+    final byte[] bytes = StripedFileTestUtil.generateBytes(length);
+    DFSTestUtil.writeFile(fs, testPath, bytes);
+
+    // shut down the DN that holds the last internal data block
+    BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5,
+        cellSize);
+    String name = (locs[0].getNames())[failedDNIdx];
+    for (DataNode dn : cluster.getDataNodes()) {
+      int port = dn.getXferPort();
+      if (name.contains(Integer.toString(port))) {
+        dn.shutdown();
+        break;
+      }
+    }
+
+    // pread
+    try (FSDataInputStream fsdis = fs.open(testPath)) {
+      byte[] buf = new byte[length];
+      int readLen = fsdis.read(startOffsetInFile, buf, 0, buf.length);
+      Assert.assertEquals("The length of file should be the same to write 
size",
+          length - startOffsetInFile, readLen);
+
+      byte[] expected = new byte[readLen];
+      for (int i = startOffsetInFile; i < length; i++) {
+        expected[i - startOffsetInFile] = StripedFileTestUtil.getByte(i);
+      }
+
+      for (int i = startOffsetInFile; i < length; i++) {
+        Assert.assertEquals("Byte at " + i + " should be the same",
+            expected[i - startOffsetInFile], buf[i - startOffsetInFile]);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71329e81/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
index b0436a6..e2e5246 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
@@ -18,17 +18,13 @@
 package org.apache.hadoop.hdfs;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
 import org.apache.hadoop.hdfs.web.WebHdfsConstants;
 import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
-import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -37,34 +33,30 @@ import org.junit.Test;
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Random;
 
-public class TestWriteReadStripedFile {
-  private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
-  private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
-
-  private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
-  private final static int stripesPerBlock = 4;
-  static int blockSize = cellSize * stripesPerBlock;
-  static int numDNs = dataBlocks + parityBlocks + 2;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.stripesPerBlock;
 
+public class TestWriteReadStripedFile {
   private static MiniDFSCluster cluster;
-  private static Configuration conf;
   private static FileSystem fs;
-
-  private static Random r= new Random();
+  private static Configuration conf;
 
   @BeforeClass
   public static void setup() throws IOException {
-    conf = new Configuration();
+    conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
-    cluster.getFileSystem().getClient().createErasureCodingZone("/", null, 
cellSize);
+    cluster.getFileSystem().getClient().createErasureCodingZone("/",
+        null, cellSize);
     fs = cluster.getFileSystem();
   }
 
   @AfterClass
-  public static void tearDown() {
+  public static void tearDown() throws IOException {
     if (cluster != null) {
       cluster.shutdown();
     }
@@ -152,47 +144,21 @@ public class TestWriteReadStripedFile {
             + cellSize + 123);
   }
 
-  private byte[] generateBytes(int cnt) {
-    byte[] bytes = new byte[cnt];
-    for (int i = 0; i < cnt; i++) {
-      bytes[i] = getByte(i);
-    }
-    return bytes;
-  }
-
-  private int readAll(FSDataInputStream in, byte[] buf) throws IOException {
-    int readLen = 0;
-    int ret;
-    do {
-      ret = in.read(buf, readLen, buf.length - readLen);
-      if (ret > 0) {
-        readLen += ret;
-      }
-    } while (ret >= 0 && readLen < buf.length);
-    return readLen;
-  }
-
-  private byte getByte(long pos) {
-    final int mod = 29;
-    return (byte) (pos % mod + 1);
-  }
-
   private void assertSeekAndRead(FSDataInputStream fsdis, int pos,
                                  int writeBytes) throws IOException {
     fsdis.seek(pos);
     byte[] buf = new byte[writeBytes];
-    int readLen = readAll(fsdis, buf);
+    int readLen = StripedFileTestUtil.readAll(fsdis, buf);
     Assert.assertEquals(readLen, writeBytes - pos);
     for (int i = 0; i < readLen; i++) {
       Assert.assertEquals("Byte at " + i + " should be the same",
-          getByte(pos + i), buf[i]);
+          StripedFileTestUtil.getByte(pos + i), buf[i]);
     }
   }
 
   private void testOneFileUsingDFSStripedInputStream(String src, int 
fileLength)
       throws IOException {
-
-    final byte[] expected = generateBytes(fileLength);
+    final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
     Path srcPath = new Path(src);
     DFSTestUtil.writeFile(fs, srcPath, new String(expected));
 
@@ -215,7 +181,7 @@ public class TestWriteReadStripedFile {
   public void testWriteReadUsingWebHdfs() throws Exception {
     int fileLength = blockSize * dataBlocks + cellSize + 123;
 
-    final byte[] expected = generateBytes(fileLength);
+    final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
     FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
         WebHdfsConstants.WEBHDFS_SCHEME);
     Path srcPath = new Path("/testWriteReadUsingWebHdfs_stripe");
@@ -231,7 +197,6 @@ public class TestWriteReadStripedFile {
     verifySeek(fs, srcPath, fileLength);
     verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf);
     //webhdfs doesn't support bytebuffer read
-
   }
 
   void verifyLength(FileSystem fs, Path srcPath, int fileLength)
@@ -243,152 +208,105 @@ public class TestWriteReadStripedFile {
 
   void verifyPread(FileSystem fs, Path srcPath,  int fileLength,
                    byte[] expected, byte[] buf) throws IOException {
-    FSDataInputStream in = fs.open(srcPath);
-    int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102,
-        cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102,
-        cellSize * dataBlocks, fileLength - 102, fileLength - 1};
-    for (int startOffset : startOffsets) {
-      startOffset = Math.max(0, Math.min(startOffset, fileLength - 1));
-      int remaining = fileLength - startOffset;
-      in.readFully(startOffset, buf, 0, remaining);
-      for (int i = 0; i < remaining; i++) {
-        Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " 
+
-                "same",
-            expected[startOffset + i], buf[i]);
+    try (FSDataInputStream in = fs.open(srcPath)) {
+      int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102,
+          cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102,
+          cellSize * dataBlocks, fileLength - 102, fileLength - 1};
+      for (int startOffset : startOffsets) {
+        startOffset = Math.max(0, Math.min(startOffset, fileLength - 1));
+        int remaining = fileLength - startOffset;
+        in.readFully(startOffset, buf, 0, remaining);
+        for (int i = 0; i < remaining; i++) {
+          Assert.assertEquals("Byte at " + (startOffset + i) + " should be the 
" +
+              "same", expected[startOffset + i], buf[i]);
+        }
       }
     }
-    in.close();
   }
 
   void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
                           byte[] expected, byte[] buf) throws IOException {
-    FSDataInputStream in = fs.open(srcPath);
-    final byte[] result = new byte[fileLength];
-    int readLen = 0;
-    int ret;
-    do {
-      ret = in.read(buf, 0, buf.length);
-      if (ret > 0) {
+    try (FSDataInputStream in = fs.open(srcPath)) {
+      final byte[] result = new byte[fileLength];
+      int readLen = 0;
+      int ret;
+      while ((ret = in.read(buf, 0, buf.length)) >= 0) {
         System.arraycopy(buf, 0, result, readLen, ret);
         readLen += ret;
       }
-    } while (ret >= 0);
-    Assert.assertEquals("The length of file should be the same to write size",
-        fileLength, readLen);
-    Assert.assertArrayEquals(expected, result);
-    in.close();
+      Assert.assertEquals("The length of file should be the same to write 
size",
+          fileLength, readLen);
+      Assert.assertArrayEquals(expected, result);
+    }
   }
 
 
   void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
                           byte[] expected, ByteBuffer buf) throws IOException {
-    FSDataInputStream in = fs.open(srcPath);
-    ByteBuffer result = ByteBuffer.allocate(fileLength);
-    int readLen = 0;
-    int ret;
-    do {
-      ret = in.read(buf);
-      if (ret > 0) {
+    try (FSDataInputStream in = fs.open(srcPath)) {
+      ByteBuffer result = ByteBuffer.allocate(fileLength);
+      int readLen = 0;
+      int ret;
+      while ((ret = in.read(buf)) >= 0) {
         readLen += ret;
         buf.flip();
         result.put(buf);
         buf.clear();
       }
-    } while (ret >= 0);
-    readLen = readLen >= 0 ? readLen : 0;
-    Assert.assertEquals("The length of file should be the same to write size",
-        fileLength, readLen);
-    Assert.assertArrayEquals(expected, result.array());
-    in.close();
+      Assert.assertEquals("The length of file should be the same to write 
size",
+          fileLength, readLen);
+      Assert.assertArrayEquals(expected, result.array());
+    }
   }
 
 
   void verifySeek(FileSystem fs, Path srcPath, int fileLength)
       throws IOException {
-    FSDataInputStream in = fs.open(srcPath);
-    // seek to 1/2 of content
-    int pos = fileLength / 2;
-    assertSeekAndRead(in, pos, fileLength);
-
-    // seek to 1/3 of content
-    pos = fileLength / 3;
-    assertSeekAndRead(in, pos, fileLength);
-
-    // seek to 0 pos
-    pos = 0;
-    assertSeekAndRead(in, pos, fileLength);
-
-    if (fileLength > cellSize) {
-      // seek to cellSize boundary
-      pos = cellSize - 1;
+    try (FSDataInputStream in = fs.open(srcPath)) {
+      // seek to 1/2 of content
+      int pos = fileLength / 2;
       assertSeekAndRead(in, pos, fileLength);
-    }
 
-    if (fileLength > cellSize * dataBlocks) {
-      // seek to striped cell group boundary
-      pos = cellSize * dataBlocks - 1;
+      // seek to 1/3 of content
+      pos = fileLength / 3;
       assertSeekAndRead(in, pos, fileLength);
-    }
 
-    if (fileLength > blockSize * dataBlocks) {
-      // seek to striped block group boundary
-      pos = blockSize * dataBlocks - 1;
+      // seek to 0 pos
+      pos = 0;
       assertSeekAndRead(in, pos, fileLength);
-    }
 
-    if(!(in.getWrappedStream() instanceof ByteRangeInputStream)){
-      try {
-        in.seek(-1);
-        Assert.fail("Should be failed if seek to negative offset");
-      } catch (EOFException e) {
-        // expected
+      if (fileLength > cellSize) {
+        // seek to cellSize boundary
+        pos = cellSize - 1;
+        assertSeekAndRead(in, pos, fileLength);
       }
 
-      try {
-        in.seek(fileLength + 1);
-        Assert.fail("Should be failed if seek after EOF");
-      } catch (EOFException e) {
-        // expected
+      if (fileLength > cellSize * dataBlocks) {
+        // seek to striped cell group boundary
+        pos = cellSize * dataBlocks - 1;
+        assertSeekAndRead(in, pos, fileLength);
       }
-    }
-    in.close();
-  }
 
-  @Test
-  public void testWritePreadWithDNFailure() throws IOException {
-    final int failedDNIdx = 2;
-    final int length = cellSize * (dataBlocks + 2);
-    Path testPath = new Path("/foo");
-    final byte[] bytes = generateBytes(length);
-    DFSTestUtil.writeFile(fs, testPath, new String(bytes));
-
-    // shut down the DN that holds the last internal data block
-    BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5,
-        cellSize);
-    String name = (locs[0].getNames())[failedDNIdx];
-    for (DataNode dn : cluster.getDataNodes()) {
-      int port = dn.getXferPort();
-      if (name.contains(Integer.toString(port))) {
-        dn.shutdown();
-        break;
+      if (fileLength > blockSize * dataBlocks) {
+        // seek to striped block group boundary
+        pos = blockSize * dataBlocks - 1;
+        assertSeekAndRead(in, pos, fileLength);
       }
-    }
 
-    // pread
-    int startOffsetInFile = cellSize * 5;
-    try (FSDataInputStream fsdis = fs.open(testPath)) {
-      byte[] buf = new byte[length];
-      int readLen = fsdis.read(startOffsetInFile, buf, 0, buf.length);
-      Assert.assertEquals("The length of file should be the same to write 
size",
-          length - startOffsetInFile, readLen);
-
-      byte[] expected = new byte[readLen];
-      for (int i = startOffsetInFile; i < length; i++) {
-        expected[i - startOffsetInFile] = getByte(i);
-      }
-      for (int i = startOffsetInFile; i < length; i++) {
-        Assert.assertEquals("Byte at " + i + " should be the same",
-            expected[i - startOffsetInFile], buf[i - startOffsetInFile]);
+      if (!(in.getWrappedStream() instanceof ByteRangeInputStream)) {
+        try {
+          in.seek(-1);
+          Assert.fail("Should be failed if seek to negative offset");
+        } catch (EOFException e) {
+          // expected
+        }
+
+        try {
+          in.seek(fileLength + 1);
+          Assert.fail("Should be failed if seek after EOF");
+        } catch (EOFException e) {
+          // expected
+        }
       }
     }
   }

Reply via email to