HDFS-8901. Use ByteBuffer in striping positional read. Contributed by Sammi Chen and Kai Zheng.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/401db4fc Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/401db4fc Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/401db4fc Branch: refs/heads/YARN-3368 Commit: 401db4fc65140979fe7665983e36905e886df971 Parents: 20a20c2 Author: Zhe Zhang <z...@apache.org> Authored: Thu Sep 8 11:54:33 2016 -0700 Committer: Zhe Zhang <z...@apache.org> Committed: Thu Sep 8 11:54:33 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/util/DataChecksum.java | 2 +- .../org/apache/hadoop/hdfs/DFSInputStream.java | 68 ++++--- .../hadoop/hdfs/DFSStripedInputStream.java | 24 ++- .../hadoop/hdfs/util/StripedBlockUtil.java | 177 ++++++++++--------- .../hadoop/hdfs/TestDFSStripedInputStream.java | 121 ++++++++++++- .../hadoop/hdfs/util/TestStripedBlockUtil.java | 22 +-- 6 files changed, 281 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/401db4fc/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java index 3a53ed9..6982a92 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java @@ -304,7 +304,7 @@ public class DataChecksum implements Checksum { bytesPerChecksum, checksums.array(), crcsOffset, fileName, basePos); return; } - if (NativeCrc32.isAvailable()) { + if (NativeCrc32.isAvailable() && data.isDirect()) { NativeCrc32.verifyChunkedSums(bytesPerChecksum, type.id, checksums, data, fileName, basePos); } else { http://git-wip-us.apache.org/repos/asf/hadoop/blob/401db4fc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 7a10ba4..31fa897 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -533,7 +533,8 @@ public class DFSInputStream extends FSInputStream * Open a DataInputStream to a DataNode so that it can be read from. * We get block ID and the IDs of the destinations at startup, from the namenode. */ - private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { + private synchronized DatanodeInfo blockSeekTo(long target) + throws IOException { if (target >= getFileLength()) { throw new IOException("Attempted to read past end of file"); } @@ -962,14 +963,14 @@ public class DFSInputStream extends FSInputStream } protected void fetchBlockByteRange(LocatedBlock block, long start, long end, - byte[] buf, int offset, CorruptedBlocks corruptedBlocks) + ByteBuffer buf, CorruptedBlocks corruptedBlocks) throws IOException { block = refreshLocatedBlock(block); while (true) { DNAddrPair addressPair = chooseDataNode(block, null); try { actualGetFromOneDataNode(addressPair, block, start, end, - buf, offset, corruptedBlocks); + buf, corruptedBlocks); return; } catch (IOException e) { checkInterrupted(e); // check if the read has been interrupted @@ -988,12 +989,10 @@ public class DFSInputStream extends FSInputStream return new Callable<ByteBuffer>() { @Override public ByteBuffer call() throws Exception { - byte[] buf = bb.array(); - int offset = bb.position(); try (TraceScope ignored = dfsClient.getTracer(). newScope("hedgedRead" + hedgedReadId, parentSpanId)) { - actualGetFromOneDataNode(datanode, block, start, end, buf, - offset, corruptedBlocks); + actualGetFromOneDataNode(datanode, block, start, end, bb, + corruptedBlocks); return bb; } } @@ -1007,13 +1006,12 @@ public class DFSInputStream extends FSInputStream * @param block the located block containing the requested data * @param startInBlk the startInBlk offset of the block * @param endInBlk the endInBlk offset of the block - * @param buf the given byte array into which the data is read - * @param offset the offset in buf + * @param buf the given byte buffer into which the data is read * @param corruptedBlocks map recording list of datanodes with corrupted * block replica */ void actualGetFromOneDataNode(final DNAddrPair datanode, LocatedBlock block, - final long startInBlk, final long endInBlk, byte[] buf, int offset, + final long startInBlk, final long endInBlk, ByteBuffer buf, CorruptedBlocks corruptedBlocks) throws IOException { DFSClientFaultInjector.get().startFetchFromDatanode(); @@ -1031,7 +1029,22 @@ public class DFSInputStream extends FSInputStream DFSClientFaultInjector.get().fetchFromDatanodeException(); reader = getBlockReader(block, startInBlk, len, datanode.addr, datanode.storageType, datanode.info); - int nread = reader.readAll(buf, offset, len); + + //Behave exactly as the readAll() call + ByteBuffer tmp = buf.duplicate(); + tmp.limit(tmp.position() + len); + tmp = tmp.slice(); + int nread = 0; + int ret; + while (true) { + ret = reader.read(tmp); + if (ret <= 0) { + break; + } + nread += ret; + } + buf.position(buf.position() + nread); + IOUtilsClient.updateReadStatistics(readStatistics, nread, reader); dfsClient.updateFileSystemReadStats( reader.getNetworkDistance(), nread); @@ -1098,7 +1111,7 @@ public class DFSInputStream extends FSInputStream * time. We then wait on which ever read returns first. */ private void hedgedFetchBlockByteRange(LocatedBlock block, long start, - long end, byte[] buf, int offset, CorruptedBlocks corruptedBlocks) + long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks) throws IOException { final DfsClientConf conf = dfsClient.getConf(); ArrayList<Future<ByteBuffer>> futures = new ArrayList<>(); @@ -1130,8 +1143,8 @@ public class DFSInputStream extends FSInputStream conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS); if (future != null) { ByteBuffer result = future.get(); - System.arraycopy(result.array(), result.position(), buf, offset, - len); + result.flip(); + buf.put(result); return; } DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged " @@ -1173,8 +1186,8 @@ public class DFSInputStream extends FSInputStream // cancel the rest. cancelAll(futures); dfsClient.getHedgedReadMetrics().incHedgedReadWins(); - System.arraycopy(result.array(), result.position(), buf, offset, - len); + result.flip(); + buf.put(result); return; } catch (InterruptedException ie) { // Ignore and retry @@ -1244,7 +1257,8 @@ public class DFSInputStream extends FSInputStream * access key from its memory since it's considered expired based on * the estimated expiration date. */ - if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) { + if (ex instanceof InvalidBlockTokenException || + ex instanceof InvalidToken) { DFSClient.LOG.info("Access token was invalid when connecting to " + targetAddr + " : " + ex); return true; @@ -1272,7 +1286,8 @@ public class DFSInputStream extends FSInputStream try (TraceScope scope = dfsClient. newReaderTraceScope("DFSInputStream#byteArrayPread", src, position, length)) { - int retLen = pread(position, buffer, offset, length); + ByteBuffer bb = ByteBuffer.wrap(buffer, offset, length); + int retLen = pread(position, bb); if (retLen < length) { dfsClient.addRetLenToReaderScope(scope, retLen); } @@ -1280,7 +1295,7 @@ public class DFSInputStream extends FSInputStream } } - private int pread(long position, byte[] buffer, int offset, int length) + private int pread(long position, ByteBuffer buffer) throws IOException { // sanity checks dfsClient.checkOpen(); @@ -1292,6 +1307,7 @@ public class DFSInputStream extends FSInputStream if ((position < 0) || (position >= filelen)) { return -1; } + int length = buffer.remaining(); int realLen = length; if ((position + length) > filelen) { realLen = (int)(filelen - position); @@ -1304,14 +1320,16 @@ public class DFSInputStream extends FSInputStream CorruptedBlocks corruptedBlocks = new CorruptedBlocks(); for (LocatedBlock blk : blockRange) { long targetStart = position - blk.getStartOffset(); - long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); + int bytesToRead = (int) Math.min(remaining, + blk.getBlockSize() - targetStart); + long targetEnd = targetStart + bytesToRead - 1; try { if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) { hedgedFetchBlockByteRange(blk, targetStart, - targetStart + bytesToRead - 1, buffer, offset, corruptedBlocks); + targetEnd, buffer, corruptedBlocks); } else { - fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, - buffer, offset, corruptedBlocks); + fetchBlockByteRange(blk, targetStart, targetEnd, + buffer, corruptedBlocks); } } finally { // Check and report if any block replicas are corrupted. @@ -1323,7 +1341,6 @@ public class DFSInputStream extends FSInputStream remaining -= bytesToRead; position += bytesToRead; - offset += bytesToRead; } assert remaining == 0 : "Wrong number of bytes read."; return realLen; @@ -1457,7 +1474,8 @@ public class DFSInputStream extends FSInputStream * If another node could not be found, then returns false. */ @Override - public synchronized boolean seekToNewSource(long targetPos) throws IOException { + public synchronized boolean seekToNewSource(long targetPos) + throws IOException { if (currentNode == null) { return seekToBlockSource(targetPos); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/401db4fc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 9ca8005..ccaf6a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -307,8 +307,8 @@ public class DFSStripedInputStream extends DFSInputStream { stripeLimit - stripeBufOffset); LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock; - AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy, cellSize, - blockGroup, offsetInBlockGroup, + AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy, + cellSize, blockGroup, offsetInBlockGroup, offsetInBlockGroup + stripeRange.length - 1, curStripeBuf); final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup( blockGroup, cellSize, dataBlkNum, parityBlkNum); @@ -523,13 +523,13 @@ public class DFSStripedInputStream extends DFSInputStream { */ @Override protected void fetchBlockByteRange(LocatedBlock block, long start, - long end, byte[] buf, int offset, CorruptedBlocks corruptedBlocks) + long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks) throws IOException { // Refresh the striped block group LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset()); AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes( - ecPolicy, cellSize, blockGroup, start, end, buf, offset); + ecPolicy, cellSize, blockGroup, start, end, buf); CompletionService<Void> readService = new ExecutorCompletionService<>( dfsClient.getStripedReadsThreadPool()); final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup( @@ -542,6 +542,7 @@ public class DFSStripedInputStream extends DFSInputStream { blks, preaderInfos, corruptedBlocks); preader.readStripe(); } + buf.position(buf.position() + (int)(end - start + 1)); } finally { for (BlockReaderInfo preaderInfo : preaderInfos) { closeReader(preaderInfo); @@ -698,16 +699,15 @@ public class DFSStripedInputStream extends DFSInputStream { } private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) { - if (chunk.byteBuffer != null) { - ByteBufferStrategy strategy = - new ByteBufferStrategy(chunk.byteBuffer, readStatistics, dfsClient); + if (chunk.useByteBuffer()) { + ByteBufferStrategy strategy = new ByteBufferStrategy( + chunk.getByteBuffer(), readStatistics, dfsClient); return new ByteBufferStrategy[]{strategy}; } else { ByteBufferStrategy[] strategies = - new ByteBufferStrategy[chunk.byteArray.getOffsets().length]; + new ByteBufferStrategy[chunk.getChunkBuffer().getSlices().size()]; for (int i = 0; i < strategies.length; i++) { - ByteBuffer buffer = ByteBuffer.wrap(chunk.byteArray.buf(), - chunk.byteArray.getOffsets()[i], chunk.byteArray.getLengths()[i]); + ByteBuffer buffer = chunk.getChunkBuffer().getSlice(i); strategies[i] = new ByteBufferStrategy(buffer, readStatistics, dfsClient); } @@ -814,7 +814,7 @@ public class DFSStripedInputStream extends DFSInputStream { } class PositionStripeReader extends StripeReader { - private byte[][] decodeInputs = null; + private ByteBuffer[] decodeInputs = null; PositionStripeReader(CompletionService<Void> service, AlignedStripe alignedStripe, LocatedBlock[] targetBlocks, @@ -836,8 +836,6 @@ public class DFSStripedInputStream extends DFSInputStream { Preconditions.checkState(index >= dataBlkNum && alignedStripe.chunks[index] == null); alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]); - alignedStripe.chunks[index].addByteArraySlice(0, - (int) alignedStripe.getSpanInBlock()); return true; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/401db4fc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index c8827d9..4dbbc3d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -73,7 +73,8 @@ import java.util.concurrent.TimeUnit; @InterfaceAudience.Private public class StripedBlockUtil { - public static final Logger LOG = LoggerFactory.getLogger(StripedBlockUtil.class); + public static final Logger LOG = + LoggerFactory.getLogger(StripedBlockUtil.class); /** * Parses a striped block group into individual blocks. @@ -312,16 +313,17 @@ public class StripedBlockUtil { * schedule a new fetch request with the decoding input buffer as transfer * destination. */ - public static byte[][] initDecodeInputs(AlignedStripe alignedStripe, + public static ByteBuffer[] initDecodeInputs(AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum) { - byte[][] decodeInputs = - new byte[dataBlkNum + parityBlkNum][(int) alignedStripe.getSpanInBlock()]; + ByteBuffer[] decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum]; + for (int i = 0; i < decodeInputs.length; i++) { + decodeInputs[i] = ByteBuffer.allocate( + (int) alignedStripe.getSpanInBlock()); + } // read the full data aligned stripe for (int i = 0; i < dataBlkNum; i++) { if (alignedStripe.chunks[i] == null) { alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]); - alignedStripe.chunks[i].addByteArraySlice(0, - (int) alignedStripe.getSpanInBlock()); } } return decodeInputs; @@ -334,14 +336,21 @@ public class StripedBlockUtil { * When all pending requests have returned, this method should be called to * finalize decode input buffers. */ - public static void finalizeDecodeInputs(final byte[][] decodeInputs, + public static void finalizeDecodeInputs(final ByteBuffer[] decodeInputs, AlignedStripe alignedStripe) { for (int i = 0; i < alignedStripe.chunks.length; i++) { final StripingChunk chunk = alignedStripe.chunks[i]; if (chunk != null && chunk.state == StripingChunk.FETCHED) { - chunk.copyTo(decodeInputs[i]); + if (chunk.useChunkBuffer()) { + chunk.getChunkBuffer().copyTo(decodeInputs[i]); + } else { + chunk.getByteBuffer().flip(); + } } else if (chunk != null && chunk.state == StripingChunk.ALLZERO) { - Arrays.fill(decodeInputs[i], (byte) 0); + //ZERO it. Will be better handled in other following issue. + byte[] emptyBytes = new byte[decodeInputs[i].limit()]; + decodeInputs[i].put(emptyBytes); + decodeInputs[i].flip(); } else { decodeInputs[i] = null; } @@ -351,7 +360,7 @@ public class StripedBlockUtil { /** * Decode based on the given input buffers and erasure coding policy. */ - public static void decodeAndFillBuffer(final byte[][] decodeInputs, + public static void decodeAndFillBuffer(final ByteBuffer[] decodeInputs, AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum, RawErasureDecoder decoder) { // Step 1: prepare indices and output buffers for missing data units @@ -364,8 +373,11 @@ public class StripedBlockUtil { } } decodeIndices = Arrays.copyOf(decodeIndices, pos); - byte[][] decodeOutputs = - new byte[decodeIndices.length][(int) alignedStripe.getSpanInBlock()]; + ByteBuffer[] decodeOutputs = new ByteBuffer[decodeIndices.length]; + for (int i = 0; i < decodeOutputs.length; i++) { + decodeOutputs[i] = ByteBuffer.allocate( + (int) alignedStripe.getSpanInBlock()); + } // Step 2: decode into prepared output buffers decoder.decode(decodeInputs, decodeIndices, decodeOutputs); @@ -374,8 +386,8 @@ public class StripedBlockUtil { for (int i = 0; i < decodeIndices.length; i++) { int missingBlkIdx = decodeIndices[i]; StripingChunk chunk = alignedStripe.chunks[missingBlkIdx]; - if (chunk.state == StripingChunk.MISSING) { - chunk.copyFrom(decodeOutputs[i]); + if (chunk.state == StripingChunk.MISSING && chunk.useChunkBuffer()) { + chunk.getChunkBuffer().copyFrom(decodeOutputs[i]); } } } @@ -402,7 +414,8 @@ public class StripedBlockUtil { // Step 4: calculate each chunk's position in destination buffer. Since the // whole read range is within a single stripe, the logic is simpler here. - int bufOffset = (int) (rangeStartInBlockGroup % ((long) cellSize * dataBlkNum)); + int bufOffset = + (int) (rangeStartInBlockGroup % ((long) cellSize * dataBlkNum)); for (StripingCell cell : cells) { long cellStart = cell.idxInInternalBlk * cellSize + cell.offset; long cellEnd = cellStart + cell.size - 1; @@ -437,15 +450,14 @@ public class StripedBlockUtil { * @param rangeStartInBlockGroup The byte range's start offset in block group * @param rangeEndInBlockGroup The byte range's end offset in block group * @param buf Destination buffer of the read operation for the byte range - * @param offsetInBuf Start offset into the destination buffer * * At most 5 stripes will be generated from each logical range, as * demonstrated in the header of {@link AlignedStripe}. */ - public static AlignedStripe[] divideByteRangeIntoStripes(ErasureCodingPolicy ecPolicy, + public static AlignedStripe[] divideByteRangeIntoStripes( + ErasureCodingPolicy ecPolicy, int cellSize, LocatedStripedBlock blockGroup, - long rangeStartInBlockGroup, long rangeEndInBlockGroup, byte[] buf, - int offsetInBuf) { + long rangeStartInBlockGroup, long rangeEndInBlockGroup, ByteBuffer buf) { // Step 0: analyze range and calculate basic parameters final int dataBlkNum = ecPolicy.getNumDataUnits(); @@ -462,7 +474,7 @@ public class StripedBlockUtil { AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges); // Step 4: calculate each chunk's position in destination buffer - calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf, offsetInBuf); + calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf); // Step 5: prepare ALLZERO blocks prepareAllZeroChunks(blockGroup, stripes, cellSize, dataBlkNum); @@ -476,7 +488,8 @@ public class StripedBlockUtil { * used by {@link DFSStripedOutputStream} in encoding */ @VisibleForTesting - private static StripingCell[] getStripingCellsOfByteRange(ErasureCodingPolicy ecPolicy, + private static StripingCell[] getStripingCellsOfByteRange( + ErasureCodingPolicy ecPolicy, int cellSize, LocatedStripedBlock blockGroup, long rangeStartInBlockGroup, long rangeEndInBlockGroup) { Preconditions.checkArgument( @@ -511,7 +524,8 @@ public class StripedBlockUtil { * the physical byte range (inclusive) on each stored internal block. */ @VisibleForTesting - private static VerticalRange[] getRangesForInternalBlocks(ErasureCodingPolicy ecPolicy, + private static VerticalRange[] getRangesForInternalBlocks( + ErasureCodingPolicy ecPolicy, int cellSize, StripingCell[] cells) { int dataBlkNum = ecPolicy.getNumDataUnits(); int parityBlkNum = ecPolicy.getNumParityUnits(); @@ -575,8 +589,7 @@ public class StripedBlockUtil { } private static void calcualteChunkPositionsInBuf(int cellSize, - AlignedStripe[] stripes, StripingCell[] cells, byte[] buf, - int offsetInBuf) { + AlignedStripe[] stripes, StripingCell[] cells, ByteBuffer buf) { /** * | <--------------- AlignedStripe --------------->| * @@ -598,6 +611,7 @@ public class StripedBlockUtil { for (StripingCell cell : cells) { long cellStart = cell.idxInInternalBlk * cellSize + cell.offset; long cellEnd = cellStart + cell.size - 1; + StripingChunk chunk; for (AlignedStripe s : stripes) { long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1; long overlapStart = Math.max(cellStart, s.getOffsetInBlock()); @@ -606,11 +620,13 @@ public class StripedBlockUtil { if (overLapLen <= 0) { continue; } - if (s.chunks[cell.idxInStripe] == null) { - s.chunks[cell.idxInStripe] = new StripingChunk(buf); + chunk = s.chunks[cell.idxInStripe]; + if (chunk == null) { + chunk = new StripingChunk(); + s.chunks[cell.idxInStripe] = chunk; } - s.chunks[cell.idxInStripe].addByteArraySlice( - (int)(offsetInBuf + done + overlapStart - cellStart), overLapLen); + chunk.getChunkBuffer().addSlice(buf, + (int) (done + overlapStart - cellStart), overLapLen); } done += cell.size; } @@ -833,88 +849,89 @@ public class StripedBlockUtil { */ public int state = REQUESTED; - public final ChunkByteArray byteArray; - public final ByteBuffer byteBuffer; + private final ChunkByteBuffer chunkBuffer; + private final ByteBuffer byteBuffer; - public StripingChunk(byte[] buf) { - this.byteArray = new ChunkByteArray(buf); + public StripingChunk() { + this.chunkBuffer = new ChunkByteBuffer(); byteBuffer = null; } public StripingChunk(ByteBuffer buf) { - this.byteArray = null; + this.chunkBuffer = null; this.byteBuffer = buf; } public StripingChunk(int state) { - this.byteArray = null; + this.chunkBuffer = null; this.byteBuffer = null; this.state = state; } - public void addByteArraySlice(int offset, int length) { - assert byteArray != null; - byteArray.offsetsInBuf.add(offset); - byteArray.lengthsInBuf.add(length); + public boolean useByteBuffer(){ + return byteBuffer != null; } - void copyTo(byte[] target) { - assert byteArray != null; - byteArray.copyTo(target); + public boolean useChunkBuffer() { + return chunkBuffer != null; } - void copyFrom(byte[] src) { - assert byteArray != null; - byteArray.copyFrom(src); + public ByteBuffer getByteBuffer() { + assert byteBuffer != null; + return byteBuffer; + } + + public ChunkByteBuffer getChunkBuffer() { + assert chunkBuffer != null; + return chunkBuffer; } } - public static class ChunkByteArray { - private final byte[] buf; - private final List<Integer> offsetsInBuf; - private final List<Integer> lengthsInBuf; + /** + * A utility to manage ByteBuffer slices for a reader. + */ + public static class ChunkByteBuffer { + private final List<ByteBuffer> slices; - ChunkByteArray(byte[] buf) { - this.buf = buf; - this.offsetsInBuf = new ArrayList<>(); - this.lengthsInBuf = new ArrayList<>(); + ChunkByteBuffer() { + this.slices = new ArrayList<>(); } - public int[] getOffsets() { - int[] offsets = new int[offsetsInBuf.size()]; - for (int i = 0; i < offsets.length; i++) { - offsets[i] = offsetsInBuf.get(i); - } - return offsets; + public void addSlice(ByteBuffer buffer, int offset, int len) { + ByteBuffer tmp = buffer.duplicate(); + tmp.position(buffer.position() + offset); + tmp.limit(buffer.position() + offset + len); + slices.add(tmp.slice()); } - public int[] getLengths() { - int[] lens = new int[this.lengthsInBuf.size()]; - for (int i = 0; i < lens.length; i++) { - lens[i] = this.lengthsInBuf.get(i); - } - return lens; + public ByteBuffer getSlice(int i) { + return slices.get(i); } - public byte[] buf() { - return buf; + public List<ByteBuffer> getSlices() { + return slices; } - void copyTo(byte[] target) { - int posInBuf = 0; - for (int i = 0; i < offsetsInBuf.size(); i++) { - System.arraycopy(buf, offsetsInBuf.get(i), - target, posInBuf, lengthsInBuf.get(i)); - posInBuf += lengthsInBuf.get(i); + /** + * Note: target will be ready-to-read state after the call. + */ + void copyTo(ByteBuffer target) { + for (ByteBuffer slice : slices) { + slice.flip(); + target.put(slice); } - } - - void copyFrom(byte[] src) { - int srcPos = 0; - for (int j = 0; j < offsetsInBuf.size(); j++) { - System.arraycopy(src, srcPos, buf, offsetsInBuf.get(j), - lengthsInBuf.get(j)); - srcPos += lengthsInBuf.get(j); + target.flip(); + } + + void copyFrom(ByteBuffer src) { + ByteBuffer tmp; + int len; + for (ByteBuffer slice : slices) { + len = slice.remaining(); + tmp = src.duplicate(); + tmp.limit(tmp.position() + len); + slice.put(tmp); + src.position(src.position() + len); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/401db4fc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index 18c2de9..1e27745 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -57,7 +57,8 @@ import static org.junit.Assert.assertTrue; public class TestDFSStripedInputStream { - public static final Log LOG = LogFactory.getLog(TestDFSStripedInputStream.class); + public static final Log LOG = + LogFactory.getLog(TestDFSStripedInputStream.class); private MiniDFSCluster cluster; private Configuration conf = new Configuration(); @@ -272,12 +273,16 @@ public class TestDFSStripedInputStream { // |10 | done += in.read(0, readBuffer, 0, delta); assertEquals(delta, done); + assertArrayEquals(Arrays.copyOf(expected, done), + Arrays.copyOf(readBuffer, done)); // both head and trail cells are partial // |c_0 |c_1 |c_2 |c_3 |c_4 |c_5 | // |256K - 10|missing|256K|256K|256K - 10|not in range| done += in.read(delta, readBuffer, delta, CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta); assertEquals(CELLSIZE * (DATA_BLK_NUM - 1) - delta, done); + assertArrayEquals(Arrays.copyOf(expected, done), + Arrays.copyOf(readBuffer, done)); // read the rest done += in.read(done, readBuffer, done, readSize - done); assertEquals(readSize, done); @@ -291,8 +296,8 @@ public class TestDFSStripedInputStream { testStatefulRead(true, true); } - private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket) - throws Exception { + private void testStatefulRead(boolean useByteBuffer, + boolean cellMisalignPacket) throws Exception { final int numBlocks = 2; final int fileSize = numBlocks * BLOCK_GROUP_SIZE; if (cellMisalignPacket) { @@ -302,7 +307,8 @@ public class TestDFSStripedInputStream { } DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, NUM_STRIPE_PER_BLOCK, false); - LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(filePath.toString(), 0, fileSize); + LocatedBlocks lbs = fs.getClient().namenode. + getBlockLocations(filePath.toString(), 0, fileSize); assert lbs.getLocatedBlocks().size() == numBlocks; for (LocatedBlock lb : lbs.getLocatedBlocks()) { @@ -360,4 +366,111 @@ public class TestDFSStripedInputStream { } fs.delete(filePath, true); } + + @Test + public void testStatefulReadWithDNFailure() throws Exception { + final int numBlocks = 4; + final int failedDNIdx = DATA_BLK_NUM - 1; + DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, + NUM_STRIPE_PER_BLOCK, false); + LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( + filePath.toString(), 0, BLOCK_GROUP_SIZE); + + assert lbs.get(0) instanceof LocatedStripedBlock; + LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0)); + for (int i = 0; i < DATA_BLK_NUM + PARITY_BLK_NUM; i++) { + Block blk = new Block(bg.getBlock().getBlockId() + i, + NUM_STRIPE_PER_BLOCK * CELLSIZE, + bg.getBlock().getGenerationStamp()); + blk.setGenerationStamp(bg.getBlock().getGenerationStamp()); + cluster.injectBlocks(i, Arrays.asList(blk), + bg.getBlock().getBlockPoolId()); + } + DFSStripedInputStream in = + new DFSStripedInputStream(fs.getClient(), filePath.toString(), false, + ecPolicy, null); + int readSize = BLOCK_GROUP_SIZE; + byte[] readBuffer = new byte[readSize]; + byte[] expected = new byte[readSize]; + /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */ + for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) { + for (int j = 0; j < DATA_BLK_NUM; j++) { + for (int k = 0; k < CELLSIZE; k++) { + int posInBlk = i * CELLSIZE + k; + int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k; + expected[posInFile] = SimulatedFSDataset.simulatedByte( + new Block(bg.getBlock().getBlockId() + j), posInBlk); + } + } + } + + ErasureCoderOptions coderOptions = new ErasureCoderOptions( + DATA_BLK_NUM, PARITY_BLK_NUM); + RawErasureDecoder rawDecoder = CodecUtil.createRawDecoder(conf, + ecPolicy.getCodecName(), coderOptions); + + // Update the expected content for decoded data + int[] missingBlkIdx = new int[PARITY_BLK_NUM]; + for (int i = 0; i < missingBlkIdx.length; i++) { + if (i == 0) { + missingBlkIdx[i] = failedDNIdx; + } else { + missingBlkIdx[i] = DATA_BLK_NUM + i; + } + } + cluster.stopDataNode(failedDNIdx); + for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) { + byte[][] decodeInputs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][CELLSIZE]; + byte[][] decodeOutputs = new byte[missingBlkIdx.length][CELLSIZE]; + for (int j = 0; j < DATA_BLK_NUM; j++) { + int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE; + if (j != failedDNIdx) { + System.arraycopy(expected, posInBuf, decodeInputs[j], 0, CELLSIZE); + } + } + for (int j = DATA_BLK_NUM; j < DATA_BLK_NUM + PARITY_BLK_NUM; j++) { + for (int k = 0; k < CELLSIZE; k++) { + int posInBlk = i * CELLSIZE + k; + decodeInputs[j][k] = SimulatedFSDataset.simulatedByte( + new Block(bg.getBlock().getBlockId() + j), posInBlk); + } + } + for (int m : missingBlkIdx) { + decodeInputs[m] = null; + } + rawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs); + int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE; + System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE); + } + + int delta = 10; + int done = 0; + // read a small delta, shouldn't trigger decode + // |cell_0 | + // |10 | + done += in.read(readBuffer, 0, delta); + assertEquals(delta, done); + // both head and trail cells are partial + // |c_0 |c_1 |c_2 |c_3 |c_4 |c_5 | + // |256K - 10|missing|256K|256K|256K - 10|not in range| + while (done < (CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta)) { + int ret = in.read(readBuffer, delta, + CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta); + assertTrue(ret > 0); + done += ret; + } + assertEquals(CELLSIZE * (DATA_BLK_NUM - 1) - delta, done); + // read the rest + + int restSize; + restSize = readSize - done; + while (done < restSize) { + int ret = in.read(readBuffer, done, restSize); + assertTrue(ret > 0); + done += ret; + } + + assertEquals(readSize, done); + assertArrayEquals(expected, readBuffer); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/401db4fc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java index 96fc79c..7d9d7dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java @@ -36,6 +36,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; +import java.nio.ByteBuffer; import java.util.Random; import static org.junit.Assert.assertEquals; @@ -242,7 +243,8 @@ public class TestStripedBlockUtil { */ @Test public void testDivideByteRangeIntoStripes() { - byte[] assembled = new byte[BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE]; + ByteBuffer assembled = + ByteBuffer.allocate(BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE); for (int bgSize : blockGroupSizes) { LocatedStripedBlock blockGroup = createDummyLocatedBlock(bgSize); byte[][] internalBlkBufs = createInternalBlkBuffers(bgSize); @@ -252,7 +254,7 @@ public class TestStripedBlockUtil { continue; } AlignedStripe[] stripes = divideByteRangeIntoStripes(EC_POLICY, - CELLSIZE, blockGroup, brStart, brStart + brSize - 1, assembled, 0); + CELLSIZE, blockGroup, brStart, brStart + brSize - 1, assembled); for (AlignedStripe stripe : stripes) { for (int i = 0; i < DATA_BLK_NUM; i++) { @@ -261,21 +263,21 @@ public class TestStripedBlockUtil { continue; } int done = 0; - for (int j = 0; j < chunk.byteArray.getLengths().length; j++) { - System.arraycopy(internalBlkBufs[i], - (int) stripe.getOffsetInBlock() + done, assembled, - chunk.byteArray.getOffsets()[j], - chunk.byteArray.getLengths()[j]); - done += chunk.byteArray.getLengths()[j]; + int len; + for (ByteBuffer slice : chunk.getChunkBuffer().getSlices()) { + len = slice.remaining(); + slice.put(internalBlkBufs[i], + (int) stripe.getOffsetInBlock() + done, len); + done += len; } } } for (int i = 0; i < brSize; i++) { - if (hashIntToByte(brStart + i) != assembled[i]) { + if (hashIntToByte(brStart + i) != assembled.get(i)) { System.out.println("Oops"); } assertEquals("Byte at " + (brStart + i) + " should be the same", - hashIntToByte(brStart + i), assembled[i]); + hashIntToByte(brStart + i), assembled.get(i)); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org