HADOOP-12685. Input buffer position after encode/decode not consistent between different kinds of buffers. Contributed by Rui Li.
Change-Id: I713c7b4e3cfae70c04b7e4b292ab53eae348d8d9 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c52b407c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c52b407c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c52b407c Branch: refs/heads/HDFS-1312 Commit: c52b407cbffc8693738b31c6cc4e71751efd70e8 Parents: 355c0ce Author: Zhe Zhang <[email protected]> Authored: Tue Jan 5 16:31:52 2016 -0800 Committer: Zhe Zhang <[email protected]> Committed: Tue Jan 5 16:32:18 2016 -0800 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 ++ .../rawcoder/AbstractRawErasureDecoder.java | 55 ++++++++++--------- .../rawcoder/AbstractRawErasureEncoder.java | 56 +++++++++++--------- .../erasurecode/rawcoder/RawErasureDecoder.java | 5 +- .../erasurecode/rawcoder/RawErasureEncoder.java | 5 +- .../io/erasurecode/rawcoder/TestRSRawCoder.java | 7 +++ .../erasurecode/rawcoder/TestRawCoderBase.java | 34 ++++++++++++ 7 files changed, 112 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c52b407c/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 0b88ed0..863d047 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -638,6 +638,9 @@ Trunk (Unreleased) HADOOP-12544. Erasure Coding: create dummy raw coder to isolate performance issues in testing. (Rui Li via zhz) + HADOOP-12685. Input buffer position after encode/decode not consistent + between different kinds of buffers. (Rui Li via zhz) + Release 2.9.0 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/c52b407c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java index 2cfb57c..37a9bcd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java @@ -51,39 +51,44 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder checkParameterBuffers(inputs, true, dataLen, usingDirectBuffer, false); checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true); + int[] inputPositions = new int[inputs.length]; + for (int i = 0; i < inputPositions.length; i++) { + if (inputs[i] != null) { + inputPositions[i] = inputs[i].position(); + } + } + if (usingDirectBuffer) { doDecode(inputs, erasedIndexes, outputs); - return; - } + } else { + int[] inputOffsets = new int[inputs.length]; + int[] outputOffsets = new int[outputs.length]; + byte[][] newInputs = new byte[inputs.length][]; + byte[][] newOutputs = new byte[outputs.length][]; + + ByteBuffer buffer; + for (int i = 0; i < inputs.length; ++i) { + buffer = inputs[i]; + if (buffer != null) { + inputOffsets[i] = buffer.arrayOffset() + buffer.position(); + newInputs[i] = buffer.array(); + } + } - int[] inputOffsets = new int[inputs.length]; - int[] outputOffsets = new int[outputs.length]; - byte[][] newInputs = new byte[inputs.length][]; - byte[][] newOutputs = new byte[outputs.length][]; - - ByteBuffer buffer; - for (int i = 0; i < inputs.length; ++i) { - buffer = inputs[i]; - if (buffer != null) { - inputOffsets[i] = buffer.arrayOffset() + buffer.position(); - newInputs[i] = buffer.array(); + for (int i = 0; i < outputs.length; ++i) { + buffer = outputs[i]; + outputOffsets[i] = buffer.arrayOffset() + buffer.position(); + newOutputs[i] = buffer.array(); } - } - for (int i = 0; i < outputs.length; ++i) { - buffer = outputs[i]; - outputOffsets[i] = buffer.arrayOffset() + buffer.position(); - newOutputs[i] = buffer.array(); + doDecode(newInputs, inputOffsets, dataLen, + erasedIndexes, newOutputs, outputOffsets); } - doDecode(newInputs, inputOffsets, dataLen, - erasedIndexes, newOutputs, outputOffsets); - - for (int i = 0; i < inputs.length; ++i) { - buffer = inputs[i]; - if (buffer != null) { + for (int i = 0; i < inputs.length; i++) { + if (inputs[i] != null) { // dataLen bytes consumed - buffer.position(buffer.position() + dataLen); + inputs[i].position(inputPositions[i] + dataLen); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c52b407c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java index 13c895c..49cc2c4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java @@ -48,34 +48,42 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder checkParameterBuffers(inputs, false, dataLen, usingDirectBuffer, false); checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true); - if (usingDirectBuffer) { - doEncode(inputs, outputs); - return; + int[] inputPositions = new int[inputs.length]; + for (int i = 0; i < inputPositions.length; i++) { + if (inputs[i] != null) { + inputPositions[i] = inputs[i].position(); + } } - int[] inputOffsets = new int[inputs.length]; - int[] outputOffsets = new int[outputs.length]; - byte[][] newInputs = new byte[inputs.length][]; - byte[][] newOutputs = new byte[outputs.length][]; - - ByteBuffer buffer; - for (int i = 0; i < inputs.length; ++i) { - buffer = inputs[i]; - inputOffsets[i] = buffer.arrayOffset() + buffer.position(); - newInputs[i] = buffer.array(); - } - - for (int i = 0; i < outputs.length; ++i) { - buffer = outputs[i]; - outputOffsets[i] = buffer.arrayOffset() + buffer.position(); - newOutputs[i] = buffer.array(); + if (usingDirectBuffer) { + doEncode(inputs, outputs); + } else { + int[] inputOffsets = new int[inputs.length]; + int[] outputOffsets = new int[outputs.length]; + byte[][] newInputs = new byte[inputs.length][]; + byte[][] newOutputs = new byte[outputs.length][]; + + ByteBuffer buffer; + for (int i = 0; i < inputs.length; ++i) { + buffer = inputs[i]; + inputOffsets[i] = buffer.arrayOffset() + buffer.position(); + newInputs[i] = buffer.array(); + } + + for (int i = 0; i < outputs.length; ++i) { + buffer = outputs[i]; + outputOffsets[i] = buffer.arrayOffset() + buffer.position(); + newOutputs[i] = buffer.array(); + } + + doEncode(newInputs, inputOffsets, dataLen, newOutputs, outputOffsets); } - doEncode(newInputs, inputOffsets, dataLen, newOutputs, outputOffsets); - - for (int i = 0; i < inputs.length; ++i) { - buffer = inputs[i]; - buffer.position(buffer.position() + dataLen); // dataLen bytes consumed + for (int i = 0; i < inputs.length; i++) { + if (inputs[i] != null) { + // dataLen bytes consumed + inputs[i].position(inputPositions[i] + dataLen); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c52b407c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java index ab322fa..1707650 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java @@ -56,9 +56,10 @@ public interface RawErasureDecoder extends RawErasureCoder { * * If the coder option ALLOW_CHANGE_INPUTS is set true (false by default), the * content of input buffers may change after the call, subject to concrete - * implementation. Anyway the positions of input buffers will move forward. + * implementation. * - * @param inputs input buffers to read data from + * @param inputs input buffers to read data from. The buffers' remaining will + * be 0 after decoding * @param erasedIndexes indexes of erased units in the inputs array * @param outputs output buffers to put decoded data into according to * erasedIndexes, ready for read after the call http://git-wip-us.apache.org/repos/asf/hadoop/blob/c52b407c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java index 91ef714..6303d82 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java @@ -42,8 +42,9 @@ public interface RawErasureEncoder extends RawErasureCoder { * content of input buffers may change after the call, subject to concrete * implementation. Anyway the positions of input buffers will move forward. * - * @param inputs input buffers to read data from - * @param outputs output buffers to put the encoded data into, read to read + * @param inputs input buffers to read data from. The buffers' remaining will + * be 0 after encoding + * @param outputs output buffers to put the encoded data into, ready to read * after the call */ void encode(ByteBuffer[] inputs, ByteBuffer[] outputs); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c52b407c/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoder.java index a35a4dd..3e37e17 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoder.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoder.java @@ -115,4 +115,11 @@ public class TestRSRawCoder extends TestRSRawCoderBase { prepare(null, 10, 4, new int[] {0}, new int[] {0}); testCodingDoMixAndTwice(); } + + @Test + public void testCodingInputBufferPosition() { + prepare(null, 6, 3, new int[]{0}, new int[]{0}); + testInputPosition(false); + testInputPosition(true); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c52b407c/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java index 9b6a196..cf77539 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java @@ -254,4 +254,38 @@ public abstract class TestRawCoderBase extends TestCoderBase { decoder.setConf(getConf()); return decoder; } + + /** + * Tests that the input buffer's position is moved to the end after + * encode/decode. + */ + protected void testInputPosition(boolean usingDirectBuffer) { + this.usingDirectBuffer = usingDirectBuffer; + prepareCoders(); + prepareBufferAllocator(false); + + // verify encode + ECChunk[] dataChunks = prepareDataChunksForEncoding(); + ECChunk[] parityChunks = prepareParityChunksForEncoding(); + ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks); + encoder.encode(dataChunks, parityChunks); + verifyBufferPositionAtEnd(dataChunks); + + // verify decode + backupAndEraseChunks(clonedDataChunks, parityChunks); + ECChunk[] inputChunks = prepareInputChunksForDecoding( + clonedDataChunks, parityChunks); + ensureOnlyLeastRequiredChunks(inputChunks); + ECChunk[] recoveredChunks = prepareOutputChunksForDecoding(); + decoder.decode(inputChunks, getErasedIndexesForDecoding(), recoveredChunks); + verifyBufferPositionAtEnd(inputChunks); + } + + private void verifyBufferPositionAtEnd(ECChunk[] inputChunks) { + for (ECChunk chunk : inputChunks) { + if (chunk != null) { + Assert.assertEquals(0, chunk.getBuffer().remaining()); + } + } + } }
