HDFS-8282. Erasure coding: move striped reading logic to StripedBlockUtil. Contributed by Zhe Zhang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3e6161f0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3e6161f0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3e6161f0 Branch: refs/heads/HDFS-7285 Commit: 3e6161f083541b9d91f68af0b26f5318c0e026fa Parents: da2a33f Author: Zhe Zhang <z...@apache.org> Authored: Wed Apr 29 23:49:52 2015 -0700 Committer: Jing Zhao <ji...@apache.org> Committed: Sat May 16 15:16:04 2015 -0700 ---------------------------------------------------------------------- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../hadoop/hdfs/DFSStripedInputStream.java | 111 +----------- .../hadoop/hdfs/util/StripedBlockUtil.java | 174 +++++++++++++++++++ .../hadoop/hdfs/TestPlanReadPortions.java | 11 +- 4 files changed, 186 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e6161f0/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 6a9bdee..ca60487 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -146,3 +146,6 @@ HDFS-8272. Erasure Coding: simplify the retry logic in DFSStripedInputStream (stateful read). (Jing Zhao via Zhe Zhang) + + HDFS-8282. Erasure coding: move striped reading logic to StripedBlockUtil. + (Zhe Zhang) http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e6161f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 3da7306..0dc98fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -17,12 +17,14 @@ */ package org.apache.hadoop.hdfs; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions; + import org.apache.hadoop.net.NetUtils; import org.apache.htrace.Span; import org.apache.htrace.Trace; @@ -31,8 +33,6 @@ import org.apache.htrace.TraceScope; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; import java.util.Set; import java.util.Map; import java.util.HashMap; @@ -69,59 +69,6 @@ import java.util.concurrent.Future; * 3. pread with decode support: TODO: will be supported after HDFS-7678 *****************************************************************************/ public class DFSStripedInputStream extends DFSInputStream { - /** - * This method plans the read portion from each block in the stripe - * @param dataBlkNum The number of data blocks in the striping group - * @param cellSize The size of each striping cell - * @param startInBlk Starting offset in the striped block - * @param len Length of the read request - * @param bufOffset Initial offset in the result buffer - * @return array of {@link ReadPortion}, each representing the portion of I/O - * for an individual block in the group - */ - @VisibleForTesting - static ReadPortion[] planReadPortions(final int dataBlkNum, - final int cellSize, final long startInBlk, final int len, int bufOffset) { - ReadPortion[] results = new ReadPortion[dataBlkNum]; - for (int i = 0; i < dataBlkNum; i++) { - results[i] = new ReadPortion(); - } - - // cellIdxInBlk is the index of the cell in the block - // E.g., cell_3 is the 2nd cell in blk_0 - int cellIdxInBlk = (int) (startInBlk / (cellSize * dataBlkNum)); - - // blkIdxInGroup is the index of the block in the striped block group - // E.g., blk_2 is the 3rd block in the group - final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum); - results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk + - startInBlk % cellSize; - boolean crossStripe = false; - for (int i = 1; i < dataBlkNum; i++) { - if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) { - cellIdxInBlk++; - crossStripe = true; - } - results[(blkIdxInGroup + i) % dataBlkNum].startOffsetInBlock = - cellSize * cellIdxInBlk; - } - - int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len); - results[blkIdxInGroup].offsetsInBuf.add(bufOffset); - results[blkIdxInGroup].lengths.add(firstCellLen); - results[blkIdxInGroup].readLength += firstCellLen; - - int i = (blkIdxInGroup + 1) % dataBlkNum; - for (int done = firstCellLen; done < len; done += cellSize) { - ReadPortion rp = results[i]; - rp.offsetsInBuf.add(done + bufOffset); - final int readLen = Math.min(len - done, cellSize); - rp.lengths.add(readLen); - rp.readLength += readLen; - i = (i + 1) % dataBlkNum; - } - return results; - } private static class ReaderRetryPolicy { private int fetchEncryptionKeyTimes = 1; @@ -520,56 +467,4 @@ public class DFSStripedInputStream extends DFSInputStream { } throw new InterruptedException("let's retry"); } - - - /** - * This class represents the portion of I/O associated with each block in the - * striped block group. - */ - static class ReadPortion { - /** - * startOffsetInBlock - * | - * v - * |<-lengths[0]->|<- lengths[1] ->|<-lengths[2]->| - * +------------------+------------------+----------------+ - * | cell_0 | cell_3 | cell_6 | <- blk_0 - * +------------------+------------------+----------------+ - * _/ \_______________________ - * | | - * v offsetsInBuf[0] v offsetsInBuf[1] - * +------------------------------------------------------+ - * | cell_0 | cell_1 and cell_2 |cell_3 ...| <- buf - * | (partial) | (from blk_1 and blk_2) | | - * +------------------------------------------------------+ - */ - private long startOffsetInBlock = 0; - private int readLength = 0; - private final List<Integer> offsetsInBuf = new ArrayList<>(); - private final List<Integer> lengths = new ArrayList<>(); - - int[] getOffsets() { - int[] offsets = new int[offsetsInBuf.size()]; - for (int i = 0; i < offsets.length; i++) { - offsets[i] = offsetsInBuf.get(i); - } - return offsets; - } - - int[] getLengths() { - int[] lens = new int[this.lengths.size()]; - for (int i = 0; i < lens.length; i++) { - lens[i] = this.lengths.get(i); - } - return lens; - } - - int getReadLength() { - return readLength; - } - - long getStartOffsetInBlock() { - return startOffsetInBlock; - } - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e6161f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index d622d4d..cb6d39a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.util; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -27,6 +28,15 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + /** * Utility class for analyzing striped block groups */ @@ -134,4 +144,168 @@ public class StripedBlockUtil { + offsetInBlk % cellSize; // partial cell } + /** + * This method plans the read portion from each block in the stripe + * @param dataBlkNum The number of data blocks in the striping group + * @param cellSize The size of each striping cell + * @param startInBlk Starting offset in the striped block + * @param len Length of the read request + * @param bufOffset Initial offset in the result buffer + * @return array of {@link ReadPortion}, each representing the portion of I/O + * for an individual block in the group + */ + @VisibleForTesting + public static ReadPortion[] planReadPortions(final int dataBlkNum, + final int cellSize, final long startInBlk, final int len, int bufOffset) { + ReadPortion[] results = new ReadPortion[dataBlkNum]; + for (int i = 0; i < dataBlkNum; i++) { + results[i] = new ReadPortion(); + } + + // cellIdxInBlk is the index of the cell in the block + // E.g., cell_3 is the 2nd cell in blk_0 + int cellIdxInBlk = (int) (startInBlk / (cellSize * dataBlkNum)); + + // blkIdxInGroup is the index of the block in the striped block group + // E.g., blk_2 is the 3rd block in the group + final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum); + results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk + + startInBlk % cellSize; + boolean crossStripe = false; + for (int i = 1; i < dataBlkNum; i++) { + if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) { + cellIdxInBlk++; + crossStripe = true; + } + results[(blkIdxInGroup + i) % dataBlkNum].startOffsetInBlock = + cellSize * cellIdxInBlk; + } + + int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len); + results[blkIdxInGroup].offsetsInBuf.add(bufOffset); + results[blkIdxInGroup].lengths.add(firstCellLen); + results[blkIdxInGroup].readLength += firstCellLen; + + int i = (blkIdxInGroup + 1) % dataBlkNum; + for (int done = firstCellLen; done < len; done += cellSize) { + ReadPortion rp = results[i]; + rp.offsetsInBuf.add(done + bufOffset); + final int readLen = Math.min(len - done, cellSize); + rp.lengths.add(readLen); + rp.readLength += readLen; + i = (i + 1) % dataBlkNum; + } + return results; + } + + /** + * Get the next completed striped read task + * + * @return {@link StripedReadResult} indicating the status of the read task + * succeeded, and the block index of the task. If the method times + * out without getting any completed read tasks, -1 is returned as + * block index. + * @throws InterruptedException + */ + public static StripedReadResult getNextCompletedStripedRead( + CompletionService<Void> readService, Map<Future<Void>, + Integer> futures, final long threshold) throws InterruptedException { + Preconditions.checkArgument(!futures.isEmpty()); + Preconditions.checkArgument(threshold > 0); + Future<Void> future = null; + try { + future = readService.poll(threshold, TimeUnit.MILLISECONDS); + if (future != null) { + future.get(); + return new StripedReadResult(futures.remove(future), + StripedReadResult.SUCCESSFUL); + } else { + return new StripedReadResult(StripedReadResult.TIMEOUT); + } + } catch (ExecutionException e) { + return new StripedReadResult(futures.remove(future), + StripedReadResult.FAILED); + } catch (CancellationException e) { + return new StripedReadResult(futures.remove(future), + StripedReadResult.CANCELLED); + } + } + + /** + * This class represents the portion of I/O associated with each block in the + * striped block group. + */ + public static class ReadPortion { + /** + * startOffsetInBlock + * | + * v + * |<-lengths[0]->|<- lengths[1] ->|<-lengths[2]->| + * +------------------+------------------+----------------+ + * | cell_0 | cell_3 | cell_6 | <- blk_0 + * +------------------+------------------+----------------+ + * _/ \_______________________ + * | | + * v offsetsInBuf[0] v offsetsInBuf[1] + * +------------------------------------------------------+ + * | cell_0 | cell_1 and cell_2 |cell_3 ...| <- buf + * | (partial) | (from blk_1 and blk_2) | | + * +------------------------------------------------------+ + */ + public long startOffsetInBlock = 0; + public int readLength = 0; + public final List<Integer> offsetsInBuf = new ArrayList<>(); + public final List<Integer> lengths = new ArrayList<>(); + + public int[] getOffsets() { + int[] offsets = new int[offsetsInBuf.size()]; + for (int i = 0; i < offsets.length; i++) { + offsets[i] = offsetsInBuf.get(i); + } + return offsets; + } + + public int[] getLengths() { + int[] lens = new int[this.lengths.size()]; + for (int i = 0; i < lens.length; i++) { + lens[i] = this.lengths.get(i); + } + return lens; + } + + public boolean containsReadPortion(ReadPortion rp) { + long end = startOffsetInBlock + readLength; + return startOffsetInBlock <= rp.startOffsetInBlock && end >= + rp.startOffsetInBlock + rp.readLength; + } + } + + /** + * This class represents result from a striped read request. + * If the task was successful or the internal computation failed, + * an index is also returned. + */ + public static class StripedReadResult { + public static final int SUCCESSFUL = 0x01; + public static final int FAILED = 0x02; + public static final int TIMEOUT = 0x04; + public static final int CANCELLED = 0x08; + + public final int index; + public final int state; + + public StripedReadResult(int state) { + Preconditions.checkArgument(state == TIMEOUT, + "Only timeout result should return negative index."); + this.index = -1; + this.state = state; + } + + public StripedReadResult(int index, int state) { + Preconditions.checkArgument(state != TIMEOUT, + "Timeout result should return negative index."); + this.index = index; + this.state = state; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e6161f0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java index cf84b30..3b5787a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java @@ -19,7 +19,8 @@ package org.apache.hadoop.hdfs; import org.junit.Test; -import static org.apache.hadoop.hdfs.DFSStripedInputStream.ReadPortion; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion; import static org.junit.Assert.*; public class TestPlanReadPortions { @@ -32,13 +33,13 @@ public class TestPlanReadPortions { private void testPlanReadPortions(int startInBlk, int length, int bufferOffset, int[] readLengths, int[] offsetsInBlock, int[][] bufferOffsets, int[][] bufferLengths) { - ReadPortion[] results = DFSStripedInputStream.planReadPortions(GROUP_SIZE, + ReadPortion[] results = StripedBlockUtil.planReadPortions(GROUP_SIZE, CELLSIZE, startInBlk, length, bufferOffset); assertEquals(GROUP_SIZE, results.length); for (int i = 0; i < GROUP_SIZE; i++) { - assertEquals(readLengths[i], results[i].getReadLength()); - assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock()); + assertEquals(readLengths[i], results[i].readLength); + assertEquals(offsetsInBlock[i], results[i].startOffsetInBlock); final int[] bOffsets = results[i].getOffsets(); assertArrayEquals(bufferOffsets[i], bOffsets); final int[] bLengths = results[i].getLengths(); @@ -47,7 +48,7 @@ public class TestPlanReadPortions { } /** - * Test {@link DFSStripedInputStream#planReadPortions} + * Test {@link StripedBlockUtil#planReadPortions} */ @Test public void testPlanReadPortions() {