HADOOP-11828. Implement the Hitchhiker erasure coding algorithm. Contributed by Jack Liu Quan.
Change-Id: If43475ccc2574df60949c947af562722db076251 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1bb31fb2 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1bb31fb2 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1bb31fb2 Branch: refs/heads/YARN-1011 Commit: 1bb31fb22e6f8e6df8e9ff4e94adf20308b4c743 Parents: 2ac39ca Author: Zhe Zhang <z...@apache.org> Authored: Thu Jan 21 10:30:05 2016 -0800 Committer: Zhe Zhang <z...@apache.org> Committed: Thu Jan 21 10:30:05 2016 -0800 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 + .../coder/AbstractErasureDecoder.java | 2 +- .../coder/AbstractHHErasureCodingStep.java | 49 +++ .../erasurecode/coder/HHXORErasureDecoder.java | 95 +++++ .../coder/HHXORErasureDecodingStep.java | 349 +++++++++++++++++++ .../erasurecode/coder/HHXORErasureEncoder.java | 92 +++++ .../coder/HHXORErasureEncodingStep.java | 146 ++++++++ .../io/erasurecode/coder/util/HHUtil.java | 216 ++++++++++++ .../erasurecode/coder/TestErasureCoderBase.java | 4 +- .../coder/TestHHErasureCoderBase.java | 61 ++++ .../coder/TestHHXORErasureCoder.java | 120 +++++++ 11 files changed, 1134 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1bb31fb2/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 19ef66e..e5172d5 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -262,6 +262,9 @@ Trunk (Unreleased) HADOOP-11887. Introduce Intel ISA-L erasure coding library for native erasure encoding support (Kai Zheng via Colin P. McCabe) + HADOOP-11828. Implement the Hitchhiker erasure coding algorithm. + (Jack Liuquan via zhz) + BUG FIXES HADOOP-12617. SPNEGO authentication request to non-default realm gets http://git-wip-us.apache.org/repos/asf/hadoop/blob/1bb31fb2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureDecoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureDecoder.java index abada3d..d976dd1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureDecoder.java @@ -56,7 +56,7 @@ public abstract class AbstractErasureDecoder extends AbstractErasureCoder { * We have all the data blocks and parity blocks as input blocks for * recovering by default. It's codec specific * @param blockGroup - * @return + * @return input blocks */ protected ECBlock[] getInputBlocks(ECBlockGroup blockGroup) { ECBlock[] inputBlocks = new ECBlock[getNumDataUnits() + http://git-wip-us.apache.org/repos/asf/hadoop/blob/1bb31fb2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractHHErasureCodingStep.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractHHErasureCodingStep.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractHHErasureCodingStep.java new file mode 100644 index 0000000..e577c5d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractHHErasureCodingStep.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.coder; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.ECBlock; + +/** + * Abstract class for Hitchhiker common facilities shared by + * {@link HHXORErasureEncodingStep}and {@link HHXORErasureDecodingStep}. + * + * It implements {@link AbstractErasureCodingStep}. + */ +@InterfaceAudience.Private +public abstract class AbstractHHErasureCodingStep + extends AbstractErasureCodingStep { + + private static final int SUB_PACKET_SIZE = 2; + + /** + * Constructor given input blocks and output blocks. + * + * @param inputBlocks + * @param outputBlocks + */ + public AbstractHHErasureCodingStep(ECBlock[] inputBlocks, + ECBlock[] outputBlocks) { + super(inputBlocks, outputBlocks); + } + + protected int getSubPacketSize() { + return SUB_PACKET_SIZE; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1bb31fb2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecoder.java new file mode 100644 index 0000000..ac4df16 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecoder.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.coder; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.ECBlock; +import org.apache.hadoop.io.erasurecode.ECBlockGroup; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.rawcoder.*; + +/** + * Hitchhiker is a new erasure coding algorithm developed as a research project + * at UC Berkeley by Rashmi Vinayak. + * It has been shown to reduce network traffic and disk I/O by 25%-45% during + * data reconstruction while retaining the same storage capacity and failure + * tolerance capability of RS codes. + * The Hitchhiker algorithm is described in K.V.Rashmi, et al., + * "A "Hitchhiker's" Guide to Fast and Efficient Data Reconstruction in + * Erasure-coded Data Centers", in ACM SIGCOMM 2014. + * This is Hitchhiker-XOR erasure decoder that decodes a block group. + */ +@InterfaceAudience.Private +public class HHXORErasureDecoder extends AbstractErasureDecoder { + private RawErasureDecoder rsRawDecoder; + private RawErasureEncoder xorRawEncoder; + + public HHXORErasureDecoder(int numDataUnits, int numParityUnits) { + super(numDataUnits, numParityUnits); + } + + public HHXORErasureDecoder(ECSchema schema) { + super(schema); + } + + @Override + protected ErasureCodingStep prepareDecodingStep( + final ECBlockGroup blockGroup) { + + RawErasureDecoder rawDecoder; + RawErasureEncoder rawEncoder; + + ECBlock[] inputBlocks = getInputBlocks(blockGroup); + ECBlock[] outputBlocks = getOutputBlocks(blockGroup); + + rawDecoder = checkCreateRSRawDecoder(); + rawEncoder = checkCreateXorRawEncoder(); + + return new HHXORErasureDecodingStep(inputBlocks, + getErasedIndexes(inputBlocks), outputBlocks, rawDecoder, + rawEncoder); + } + + private RawErasureDecoder checkCreateRSRawDecoder() { + if (rsRawDecoder == null) { + rsRawDecoder = CodecUtil.createRSRawDecoder(getConf(), + getNumDataUnits(), getNumParityUnits()); + } + return rsRawDecoder; + } + + private RawErasureEncoder checkCreateXorRawEncoder() { + if (xorRawEncoder == null) { + xorRawEncoder = CodecUtil.createXORRawEncoder(getConf(), + getNumDataUnits(), getNumParityUnits()); + xorRawEncoder.setCoderOption(CoderOption.ALLOW_CHANGE_INPUTS, false); + } + return xorRawEncoder; + } + + @Override + public void release() { + if (rsRawDecoder != null) { + rsRawDecoder.release(); + } + if (xorRawEncoder != null) { + xorRawEncoder.release(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1bb31fb2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecodingStep.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecodingStep.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecodingStep.java new file mode 100644 index 0000000..6c81836 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecodingStep.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.coder; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.ECBlock; +import org.apache.hadoop.io.erasurecode.ECChunk; +import org.apache.hadoop.io.erasurecode.coder.util.HHUtil; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; + +/** + * Hitchhiker-XOR Erasure decoding step, a wrapper of all the necessary + * information to perform a decoding step involved in the whole process of + * decoding a block group. + */ +@InterfaceAudience.Private +public class HHXORErasureDecodingStep extends AbstractHHErasureCodingStep { + private int pbIndex; + private int[] piggyBackIndex; + private int[] piggyBackFullIndex; + private int[] erasedIndexes; + private RawErasureDecoder rsRawDecoder; + private RawErasureEncoder xorRawEncoder; + + /** + * The constructor with all the necessary info. + * @param inputBlocks + * @param erasedIndexes the indexes of erased blocks in inputBlocks array + * @param outputBlocks + * @param rawDecoder underlying RS decoder for hitchhiker decoding + * @param rawEncoder underlying XOR encoder for hitchhiker decoding + */ + public HHXORErasureDecodingStep(ECBlock[] inputBlocks, int[] erasedIndexes, + ECBlock[] outputBlocks, RawErasureDecoder rawDecoder, + RawErasureEncoder rawEncoder) { + super(inputBlocks, outputBlocks); + this.pbIndex = rawDecoder.getNumParityUnits() - 1; + this.erasedIndexes = erasedIndexes; + this.rsRawDecoder = rawDecoder; + this.xorRawEncoder = rawEncoder; + + this.piggyBackIndex = HHUtil.initPiggyBackIndexWithoutPBVec( + rawDecoder.getNumDataUnits(), rawDecoder.getNumParityUnits()); + this.piggyBackFullIndex = HHUtil.initPiggyBackFullIndexVec( + rawDecoder.getNumDataUnits(), piggyBackIndex); + } + + @Override + public void performCoding(ECChunk[] inputChunks, ECChunk[] outputChunks) { + if (erasedIndexes.length == 0) { + return; + } + + ByteBuffer[] inputBuffers = ECChunk.toBuffers(inputChunks); + ByteBuffer[] outputBuffers = ECChunk.toBuffers(outputChunks); + performCoding(inputBuffers, outputBuffers); + } + + private void performCoding(ByteBuffer[] inputs, ByteBuffer[] outputs) { + final int numDataUnits = rsRawDecoder.getNumDataUnits(); + final int numParityUnits = rsRawDecoder.getNumParityUnits(); + final int numTotalUnits = numDataUnits + numParityUnits; + final int subPacketSize = getSubPacketSize(); + + ByteBuffer fisrtValidInput = HHUtil.findFirstValidInput(inputs); + final int bufSize = fisrtValidInput.remaining(); + + if (inputs.length != numTotalUnits * getSubPacketSize()) { + throw new IllegalArgumentException("Invalid inputs length"); + } + + if (outputs.length != erasedIndexes.length * getSubPacketSize()) { + throw new IllegalArgumentException("Invalid outputs length"); + } + + // notes:inputs length = numDataUnits * subPacketizationSize + // first numDataUnits length is first sub-stripe, + // second numDataUnits length is second sub-stripe + ByteBuffer[][] newIn = new ByteBuffer[subPacketSize][numTotalUnits]; + for (int i = 0; i < subPacketSize; ++i) { + for (int j = 0; j < numTotalUnits; ++j) { + newIn[i][j] = inputs[i * numTotalUnits + j]; + } + } + + ByteBuffer[][] newOut = new ByteBuffer[subPacketSize][erasedIndexes.length]; + for (int i = 0; i < subPacketSize; ++i) { + for (int j = 0; j < erasedIndexes.length; ++j) { + newOut[i][j] = outputs[i * erasedIndexes.length + j]; + } + } + + if (erasedIndexes.length == 1 && erasedIndexes[0] < numDataUnits) { + // Only reconstruct one data unit missing + doDecodeSingle(newIn, newOut, erasedIndexes[0], bufSize, + fisrtValidInput.isDirect()); + } else { + doDecodeMultiAndParity(newIn, newOut, erasedIndexes, bufSize); + } + } + + private void doDecodeSingle(ByteBuffer[][] inputs, ByteBuffer[][] outputs, + int erasedLocationToFix, int bufSize, + boolean isDirect) { + final int numDataUnits = rsRawDecoder.getNumDataUnits(); + final int numParityUnits = rsRawDecoder.getNumParityUnits(); + final int subPacketSize = getSubPacketSize(); + + int[][] inputPositions = new int[subPacketSize][inputs[0].length]; + for (int i = 0; i < subPacketSize; ++i) { + for (int j = 0; j < inputs[i].length; ++j) { + if (inputs[i][j] != null) { + inputPositions[i][j] = inputs[i][j].position(); + } + } + } + + ByteBuffer[] tempInputs = new ByteBuffer[numDataUnits + numParityUnits]; + for (int i = 0; i < tempInputs.length; ++i) { + tempInputs[i] = inputs[1][i]; + } + + ByteBuffer[][] tmpOutputs = new ByteBuffer[subPacketSize][numParityUnits]; + for (int i = 0; i < getSubPacketSize(); ++i) { + for (int j = 0; j < erasedIndexes.length; ++j) { + tmpOutputs[i][j] = outputs[i][j]; + } + + for (int m = erasedIndexes.length; m < numParityUnits; ++m) { + tmpOutputs[i][m] = HHUtil.allocateByteBuffer(isDirect, bufSize); + } + } + + // First consider the second subPacket + int[] erasedLocation = new int[numParityUnits]; + erasedLocation[0] = erasedLocationToFix; + + // assign the erased locations based on the locations not read for + // second subPacket but from decoding + for (int i = 1; i < numParityUnits; i++) { + erasedLocation[i] = numDataUnits + i; + tempInputs[numDataUnits + i] = null; + } + + rsRawDecoder.decode(tempInputs, erasedLocation, tmpOutputs[1]); + + int piggyBackParityIndex = piggyBackFullIndex[erasedLocationToFix]; + ByteBuffer piggyBack = HHUtil.getPiggyBackForDecode(inputs, tmpOutputs, + piggyBackParityIndex, numDataUnits, numParityUnits, pbIndex); + + // Second consider the first subPacket. + // get the value of the piggyback associated with the erased location + if (isDirect) { + // decode the erased value in the first subPacket by using the piggyback + int idxToWrite = 0; + doDecodeByPiggyBack(inputs[0], tmpOutputs[0][idxToWrite], piggyBack, + erasedLocationToFix); + } else { + ByteBuffer buffer; + byte[][][] newInputs = new byte[getSubPacketSize()][inputs[0].length][]; + int[][] inputOffsets = new int[getSubPacketSize()][inputs[0].length]; + byte[][][] newOutputs = new byte[getSubPacketSize()][numParityUnits][]; + int[][] outOffsets = new int[getSubPacketSize()][numParityUnits]; + + for (int i = 0; i < getSubPacketSize(); ++i) { + for (int j = 0; j < inputs[0].length; ++j) { + buffer = inputs[i][j]; + if (buffer != null) { + inputOffsets[i][j] = buffer.arrayOffset() + buffer.position(); + newInputs[i][j] = buffer.array(); + } + } + } + + for (int i = 0; i < getSubPacketSize(); ++i) { + for (int j = 0; j < numParityUnits; ++j) { + buffer = tmpOutputs[i][j]; + if (buffer != null) { + outOffsets[i][j] = buffer.arrayOffset() + buffer.position(); + newOutputs[i][j] = buffer.array(); + } + } + } + + byte[] newPiggyBack = piggyBack.array(); + + // decode the erased value in the first subPacket by using the piggyback + int idxToWrite = 0; + doDecodeByPiggyBack(newInputs[0], inputOffsets[0], + newOutputs[0][idxToWrite], outOffsets[0][idxToWrite], + newPiggyBack, erasedLocationToFix, bufSize); + } + + for (int i = 0; i < subPacketSize; ++i) { + for (int j = 0; j < inputs[i].length; ++j) { + if (inputs[i][j] != null) { + inputs[i][j].position(inputPositions[i][j] + bufSize); + } + } + } + } + + private void doDecodeByPiggyBack(ByteBuffer[] inputs, + ByteBuffer outputs, + ByteBuffer piggyBack, + int erasedLocationToFix) { + final int thisPiggyBackSetIdx = piggyBackFullIndex[erasedLocationToFix]; + final int startIndex = piggyBackIndex[thisPiggyBackSetIdx - 1]; + final int endIndex = piggyBackIndex[thisPiggyBackSetIdx]; + + // recover first sub-stripe data by XOR piggyback + int bufSize = piggyBack.remaining(); + for (int i = piggyBack.position(); + i < piggyBack.position() + bufSize; i++) { + for (int j = startIndex; j < endIndex; j++) { + if (inputs[j] != null) { + piggyBack.put(i, (byte) + (piggyBack.get(i) ^ inputs[j].get(inputs[j].position() + i))); + } + } + outputs.put(outputs.position() + i, piggyBack.get(i)); + } + } + + private void doDecodeByPiggyBack(byte[][] inputs, int[] inputOffsets, + byte[] outputs, int outOffset, + byte[] piggyBack, int erasedLocationToFix, + int bufSize) { + final int thisPiggyBackSetIdx = piggyBackFullIndex[erasedLocationToFix]; + final int startIndex = piggyBackIndex[thisPiggyBackSetIdx - 1]; + final int endIndex = piggyBackIndex[thisPiggyBackSetIdx]; + + // recover first sub-stripe data by XOR piggyback + for (int i = 0; i < bufSize; i++) { + for (int j = startIndex; j < endIndex; j++) { + if (inputs[j] != null) { + piggyBack[i] = (byte) (piggyBack[i] ^ inputs[j][i + inputOffsets[j]]); + } + } + outputs[i + outOffset] = piggyBack[i]; + } + } + + private void doDecodeMultiAndParity(ByteBuffer[][] inputs, + ByteBuffer[][] outputs, + int[] erasedLocationToFix, int bufSize) { + final int numDataUnits = rsRawDecoder.getNumDataUnits(); + final int numParityUnits = rsRawDecoder.getNumParityUnits(); + final int numTotalUnits = numDataUnits + numParityUnits; + int[] parityToFixFlag = new int[numTotalUnits]; + + for (int i = 0; i < erasedLocationToFix.length; ++i) { + if (erasedLocationToFix[i] >= numDataUnits) { + parityToFixFlag[erasedLocationToFix[i]] = 1; + } + } + + int[] inputPositions = new int[inputs[0].length]; + for (int i = 0; i < inputPositions.length; i++) { + if (inputs[0][i] != null) { + inputPositions[i] = inputs[0][i].position(); + } + } + + // decoded first sub-stripe + rsRawDecoder.decode(inputs[0], erasedLocationToFix, outputs[0]); + + for (int i = 0; i < inputs[0].length; i++) { + if (inputs[0][i] != null) { + // dataLen bytes consumed + inputs[0][i].position(inputPositions[i]); + } + } + + ByteBuffer[] tempInput = new ByteBuffer[numDataUnits]; + for (int i = 0; i < numDataUnits; ++i) { + tempInput[i] = inputs[0][i]; +// +// if (!isDirect && tempInput[i] != null) { +// tempInput[i].position(tempInput[i].position() - bufSize); +// } + } + + for (int i = 0; i < erasedLocationToFix.length; ++i) { + if (erasedLocationToFix[i] < numDataUnits) { + tempInput[erasedLocationToFix[i]] = outputs[0][i]; + } + } + + ByteBuffer[] piggyBack = HHUtil.getPiggyBacksFromInput(tempInput, + piggyBackIndex, numParityUnits, 0, xorRawEncoder); + + for (int j = numDataUnits + 1; j < numTotalUnits; ++j) { + if (parityToFixFlag[j] == 0 && inputs[1][j] != null) { + // f(b) + f(a1,a2,a3....) + for (int k = inputs[1][j].position(), + m = piggyBack[j - numDataUnits - 1].position(); + k < inputs[1][j].limit(); ++k, ++m) { + inputs[1][j].put(k, (byte) + (inputs[1][j].get(k) ^ + piggyBack[j - numDataUnits - 1].get(m))); + } + } + } + + // decoded second sub-stripe + rsRawDecoder.decode(inputs[1], erasedLocationToFix, outputs[1]); + + // parity index = 0, the data have no piggyBack + for (int j = 0; j < erasedLocationToFix.length; ++j) { + if (erasedLocationToFix[j] < numTotalUnits + && erasedLocationToFix[j] > numDataUnits) { + int parityIndex = erasedLocationToFix[j] - numDataUnits - 1; + for (int k = outputs[1][j].position(), + m = piggyBack[parityIndex].position(); + k < outputs[1][j].limit(); ++k, ++m) { + outputs[1][j].put(k, (byte) + (outputs[1][j].get(k) ^ piggyBack[parityIndex].get(m))); + } + } + } + + for (int i = 0; i < inputs[0].length; i++) { + if (inputs[0][i] != null) { + // dataLen bytes consumed + inputs[0][i].position(inputPositions[i] + bufSize); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1bb31fb2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncoder.java new file mode 100644 index 0000000..f30572f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncoder.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.coder; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.ECBlock; +import org.apache.hadoop.io.erasurecode.ECBlockGroup; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.rawcoder.CoderOption; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; + +/** + * Hitchhiker is a new erasure coding algorithm developed as a research project + * at UC Berkeley by Rashmi Vinayak. + * It has been shown to reduce network traffic and disk I/O by 25%-45% during + * data reconstruction while retaining the same storage capacity and failure + * tolerance capability of RS codes. + * The Hitchhiker algorithm is described in K.V.Rashmi, et al., + * "A "Hitchhiker's" Guide to Fast and Efficient Data Reconstruction in + * Erasure-coded Data Centers", in ACM SIGCOMM 2014. + * This is Hitchhiker-XOR erasure encoder that encodes a block group. + */ +@InterfaceAudience.Private +public class HHXORErasureEncoder extends AbstractErasureEncoder { + private RawErasureEncoder rsRawEncoder; + private RawErasureEncoder xorRawEncoder; + + public HHXORErasureEncoder(int numDataUnits, int numParityUnits) { + super(numDataUnits, numParityUnits); + } + + public HHXORErasureEncoder(ECSchema schema) { + super(schema); + } + + @Override + protected ErasureCodingStep prepareEncodingStep( + final ECBlockGroup blockGroup) { + + RawErasureEncoder rsRawEncoderTmp = checkCreateRSRawEncoder(); + RawErasureEncoder xorRawEncoderTmp = checkCreateXorRawEncoder(); + + ECBlock[] inputBlocks = getInputBlocks(blockGroup); + + return new HHXORErasureEncodingStep(inputBlocks, + getOutputBlocks(blockGroup), rsRawEncoderTmp, xorRawEncoderTmp); + } + + private RawErasureEncoder checkCreateRSRawEncoder() { + if (rsRawEncoder == null) { + rsRawEncoder = CodecUtil.createRSRawEncoder(getConf(), + getNumDataUnits(), getNumParityUnits()); + } + return rsRawEncoder; + } + + private RawErasureEncoder checkCreateXorRawEncoder() { + if (xorRawEncoder == null) { + xorRawEncoder = CodecUtil.createXORRawEncoder(getConf(), + getNumDataUnits(), getNumParityUnits()); + xorRawEncoder.setCoderOption(CoderOption.ALLOW_CHANGE_INPUTS, false); + } + return xorRawEncoder; + } + + @Override + public void release() { + if (rsRawEncoder != null) { + rsRawEncoder.release(); + } + if (xorRawEncoder != null) { + xorRawEncoder.release(); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1bb31fb2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncodingStep.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncodingStep.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncodingStep.java new file mode 100644 index 0000000..f83ee26 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncodingStep.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.coder; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.ECBlock; +import org.apache.hadoop.io.erasurecode.ECChunk; +import org.apache.hadoop.io.erasurecode.coder.util.HHUtil; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; + +/** + * Hitchhiker-XOR Erasure encoding step, a wrapper of all the necessary + * information to perform an encoding step involved in the whole process of + * encoding a block group. + */ +@InterfaceAudience.Private +public class HHXORErasureEncodingStep extends AbstractHHErasureCodingStep { + private int[] piggyBackIndex; + private RawErasureEncoder rsRawEncoder; + private RawErasureEncoder xorRawEncoder; + + /** + * The constructor with all the necessary info. + * + * @param inputBlocks + * @param outputBlocks + * @param rsRawEncoder underlying RS encoder for hitchhiker encoding + * @param xorRawEncoder underlying XOR encoder for hitchhiker encoding + */ + public HHXORErasureEncodingStep(ECBlock[] inputBlocks, ECBlock[] outputBlocks, + RawErasureEncoder rsRawEncoder, + RawErasureEncoder xorRawEncoder) { + super(inputBlocks, outputBlocks); + + this.rsRawEncoder = rsRawEncoder; + this.xorRawEncoder = xorRawEncoder; + piggyBackIndex = HHUtil.initPiggyBackIndexWithoutPBVec( + rsRawEncoder.getNumDataUnits(), rsRawEncoder.getNumParityUnits()); + } + + @Override + public void performCoding(ECChunk[] inputChunks, ECChunk[] outputChunks) { + ByteBuffer[] inputBuffers = ECChunk.toBuffers(inputChunks); + ByteBuffer[] outputBuffers = ECChunk.toBuffers(outputChunks); + performCoding(inputBuffers, outputBuffers); + } + + private void performCoding(ByteBuffer[] inputs, ByteBuffer[] outputs) { + final int numDataUnits = this.rsRawEncoder.getNumDataUnits(); + final int numParityUnits = this.rsRawEncoder.getNumParityUnits(); + final int subSPacketSize = getSubPacketSize(); + + // inputs length = numDataUnits * subPacketSize + if (inputs.length != numDataUnits * subSPacketSize) { + throw new IllegalArgumentException("Invalid inputs length"); + } + + if (outputs.length != numParityUnits * subSPacketSize) { + throw new IllegalArgumentException("Invalid outputs length"); + } + + // first numDataUnits length is first sub-stripe, + // second numDataUnits length is second sub-stripe + ByteBuffer[][] hhInputs = new ByteBuffer[subSPacketSize][numDataUnits]; + for (int i = 0; i < subSPacketSize; ++i) { + for (int j = 0; j < numDataUnits; ++j) { + hhInputs[i][j] = inputs[i * numDataUnits + j]; + } + } + + ByteBuffer[][] hhOutputs = new ByteBuffer[subSPacketSize][numParityUnits]; + for (int i = 0; i < subSPacketSize; ++i) { + for (int j = 0; j < numParityUnits; ++j) { + hhOutputs[i][j] = outputs[i * numParityUnits + j]; + } + } + + doEncode(hhInputs, hhOutputs); + } + + private void doEncode(ByteBuffer[][] inputs, ByteBuffer[][] outputs) { + final int numParityUnits = this.rsRawEncoder.getNumParityUnits(); + + // calc piggyBacks using first sub-packet + ByteBuffer[] piggyBacks = HHUtil.getPiggyBacksFromInput(inputs[0], + piggyBackIndex, numParityUnits, 0, xorRawEncoder); + + // Step1: RS encode each byte-stripe of sub-packets + for (int i = 0; i < getSubPacketSize(); ++i) { + rsRawEncoder.encode(inputs[i], outputs[i]); + } + + // Step2: Adding piggybacks to the parities + // Only second sub-packet is added with a piggyback. + encodeWithPiggyBacks(piggyBacks, outputs, numParityUnits, + inputs[0][0].isDirect()); + } + + private void encodeWithPiggyBacks(ByteBuffer[] piggyBacks, + ByteBuffer[][] outputs, + int numParityUnits, + boolean bIsDirect) { + if (!bIsDirect) { + for (int i = 0; i < numParityUnits - 1; i++) { + int parityIndex = i + 1; + int bufSize = piggyBacks[i].remaining(); + byte[] newOut = outputs[1][parityIndex].array(); + int offset = outputs[1][parityIndex].arrayOffset() + + outputs[1][parityIndex].position(); + + for (int k = offset, j = 0; j < bufSize; k++, j++) { + newOut[k] = (byte) (newOut[k] ^ piggyBacks[i].get(j)); + } + } + return; + } + + for (int i = 0; i < numParityUnits - 1; i++) { + int parityIndex = i + 1; + for (int k = piggyBacks[i].position(), + m = outputs[1][parityIndex].position(); + k < piggyBacks[i].limit(); k++, m++) { + outputs[1][parityIndex].put(m, + (byte) (outputs[1][parityIndex].get(m) ^ piggyBacks[i].get(k))); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1bb31fb2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/util/HHUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/util/HHUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/util/HHUtil.java new file mode 100644 index 0000000..cfb567e3 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/util/HHUtil.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.coder.util; + +import java.nio.ByteBuffer; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; +import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil; + +/** + * Some utilities for Hitchhiker coding. + */ +@InterfaceAudience.Private +public final class HHUtil { + private HHUtil() { + // No called + } + + public static int[] initPiggyBackIndexWithoutPBVec(int numDataUnits, + int numParityUnits) { + final int piggyBackSize = numDataUnits / (numParityUnits - 1); + int[] piggyBackIndex = new int[numParityUnits]; + + for (int i = 0; i < numDataUnits; ++i) { + if ((i % piggyBackSize) == 0) { + piggyBackIndex[i / piggyBackSize] = i; + } + } + + piggyBackIndex[numParityUnits - 1] = numDataUnits; + return piggyBackIndex; + } + + public static int[] initPiggyBackFullIndexVec(int numDataUnits, + int[] piggyBackIndex) { + int[] piggyBackFullIndex = new int[numDataUnits]; + + for (int i = 1; i < piggyBackIndex.length; ++i) { + for (int j = piggyBackIndex[i - 1]; j < piggyBackIndex[i]; ++j) { + piggyBackFullIndex[j] = i; + } + } + + return piggyBackFullIndex; + } + + public static ByteBuffer[] getPiggyBacksFromInput(ByteBuffer[] inputs, + int[] piggyBackIndex, + int numParityUnits, + int pgIndex, + RawErasureEncoder encoder) { + ByteBuffer[] emptyInput = new ByteBuffer[inputs.length]; + ByteBuffer[] tempInput = new ByteBuffer[inputs.length]; + int[] inputPositions = new int[inputs.length]; + + for (int m = 0; m < inputs.length; ++m) { + if (inputs[m] != null) { + emptyInput[m] = allocateByteBuffer(inputs[m].isDirect(), + inputs[m].remaining()); + } + } + + ByteBuffer[] tempOutput = new ByteBuffer[numParityUnits]; + for (int m = 0; m < numParityUnits; ++m) { + tempOutput[m] = allocateByteBuffer(inputs[m].isDirect(), + inputs[0].remaining()); + } + + ByteBuffer[] piggyBacks = new ByteBuffer[numParityUnits - 1]; + assert (piggyBackIndex.length >= numParityUnits); + + // using underlying RS code to create piggybacks + for (int i = 0; i < numParityUnits - 1; ++i) { + for (int k = piggyBackIndex[i]; k < piggyBackIndex[i + 1]; ++k) { + tempInput[k] = inputs[k]; + inputPositions[k] = inputs[k].position(); + } + for (int n = 0; n < emptyInput.length; ++n) { + if (tempInput[n] == null) { + tempInput[n] = emptyInput[n]; + inputPositions[n] = emptyInput[n].position(); + } + } + + encoder.encode(tempInput, tempOutput); + + piggyBacks[i] = cloneBufferData(tempOutput[pgIndex]); + + for (int j = 0; j < tempInput.length; j++) { + if (tempInput[j] != null) { + tempInput[j].position(inputPositions[j]); + tempInput[j] = null; + } + } + + for (int j = 0; j < tempOutput.length; j++) { + tempOutput[j].clear(); + } + } + + return piggyBacks; + } + + private static ByteBuffer cloneBufferData(ByteBuffer srcBuffer) { + ByteBuffer destBuffer; + byte[] bytesArr = new byte[srcBuffer.remaining()]; + + srcBuffer.mark(); + srcBuffer.get(bytesArr); + srcBuffer.reset(); + + if (!srcBuffer.isDirect()) { + destBuffer = ByteBuffer.wrap(bytesArr); + } else { + destBuffer = ByteBuffer.allocateDirect(srcBuffer.remaining()); + destBuffer.put(bytesArr); + destBuffer.flip(); + } + + return destBuffer; + } + + public static ByteBuffer allocateByteBuffer(boolean useDirectBuffer, + int bufSize) { + if (useDirectBuffer) { + return ByteBuffer.allocateDirect(bufSize); + } else { + return ByteBuffer.allocate(bufSize); + } + } + + public static ByteBuffer getPiggyBackForDecode(ByteBuffer[][] inputs, + ByteBuffer[][] outputs, + int pbParityIndex, + int numDataUnits, + int numParityUnits, + int pbIndex) { + ByteBuffer fisrtValidInput = HHUtil.findFirstValidInput(inputs[0]); + int bufSize = fisrtValidInput.remaining(); + + ByteBuffer piggybacks = allocateByteBuffer(fisrtValidInput.isDirect(), + bufSize); + + // Use piggyBackParityIndex to figure out which parity location has the + // associated piggyBack + // Obtain the piggyback by subtracting the decoded (second sub-packet + // only ) parity value from the actually read parity value + if (pbParityIndex < numParityUnits) { + // not the last piggybackSet + int inputIdx = numDataUnits + pbParityIndex; + int inputPos = inputs[1][inputIdx].position(); + int outputPos = outputs[1][pbParityIndex].position(); + + for (int m = 0, k = inputPos, n = outputPos; m < bufSize; k++, m++, n++) { + int valueWithPb = 0xFF & inputs[1][inputIdx].get(k); + int valueWithoutPb = 0xFF & outputs[1][pbParityIndex].get(n); + piggybacks.put(m, (byte) RSUtil.GF.add(valueWithPb, valueWithoutPb)); + } + } else { + // last piggybackSet + int sum = 0; + for (int k = 0; k < bufSize; k++) { + sum = 0; + for (int i = 1; i < numParityUnits; i++) { + int inIdx = numDataUnits + i; + int inPos = inputs[1][numDataUnits + i].position(); + int outPos = outputs[1][i].position(); + + sum = RSUtil.GF.add(sum, (0xFF & inputs[1][inIdx].get(inPos + k))); + sum = RSUtil.GF.add(sum, (0xFF & outputs[1][i].get(outPos + k))); + } + + sum = RSUtil.GF.add(sum, + (0xFF & inputs[0][numDataUnits + pbIndex].get( + inputs[0][numDataUnits + pbIndex].position() + k))); + + piggybacks.put(k, (byte) sum); + } + + } + + return piggybacks; + } + + /** + * Find the valid input from all the inputs. + * @param inputs input buffers to look for valid input + * @return the first valid input + */ + public static <T> T findFirstValidInput(T[] inputs) { + for (T input : inputs) { + if (input != null) { + return input; + } + } + + throw new HadoopIllegalArgumentException( + "Invalid inputs are found, all being null"); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1bb31fb2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java index 0584977..261be57 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java @@ -41,7 +41,7 @@ public abstract class TestErasureCoderBase extends TestCoderBase { * at all for simple. */ protected static class TestBlock extends ECBlock { - private ECChunk[] chunks; + protected ECChunk[] chunks; // For simple, just assume the block have the chunks already ready. // In practice we need to read/write chunks from/to the block via file IO. @@ -101,7 +101,7 @@ public abstract class TestErasureCoderBase extends TestCoderBase { * This is typically how a coding step should be performed. * @param codingStep */ - private void performCodingStep(ErasureCodingStep codingStep) { + protected void performCodingStep(ErasureCodingStep codingStep) { // Pretend that we're opening these input blocks and output blocks. ECBlock[] inputBlocks = codingStep.getInputBlocks(); ECBlock[] outputBlocks = codingStep.getOutputBlocks(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1bb31fb2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestHHErasureCoderBase.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestHHErasureCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestHHErasureCoderBase.java new file mode 100644 index 0000000..5471153 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestHHErasureCoderBase.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.coder; + +import org.apache.hadoop.io.erasurecode.ECBlock; +import org.apache.hadoop.io.erasurecode.ECChunk; + + +/** + * Erasure coder test base with utilities for hitchhiker. + */ +public abstract class TestHHErasureCoderBase extends TestErasureCoderBase{ + protected int subPacketSize = 2; + + @Override + protected void performCodingStep(ErasureCodingStep codingStep) { + // Pretend that we're opening these input blocks and output blocks. + ECBlock[] inputBlocks = codingStep.getInputBlocks(); + ECBlock[] outputBlocks = codingStep.getOutputBlocks(); + // We allocate input and output chunks accordingly. + ECChunk[] inputChunks = new ECChunk[inputBlocks.length * subPacketSize]; + ECChunk[] outputChunks = new ECChunk[outputBlocks.length * subPacketSize]; + + for (int i = 0; i < numChunksInBlock; i += subPacketSize) { + // Pretend that we're reading input chunks from input blocks. + for (int k = 0; k < subPacketSize; ++k) { + for (int j = 0; j < inputBlocks.length; ++j) { + inputChunks[k * inputBlocks.length + j] = ((TestBlock) + inputBlocks[j]).chunks[i + k]; + } + + // Pretend that we allocate and will write output results to the blocks. + for (int j = 0; j < outputBlocks.length; ++j) { + outputChunks[k * outputBlocks.length + j] = allocateOutputChunk(); + ((TestBlock) outputBlocks[j]).chunks[i + k] = + outputChunks[k * outputBlocks.length + j]; + } + } + + // Given the input chunks and output chunk buffers, just call it ! + codingStep.performCoding(inputChunks, outputChunks); + } + + codingStep.finish(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1bb31fb2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestHHXORErasureCoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestHHXORErasureCoder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestHHXORErasureCoder.java new file mode 100644 index 0000000..ad346e0 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestHHXORErasureCoder.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.coder; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.erasurecode.rawcoder.RSRawErasureCoderFactory; +import org.junit.Before; +import org.junit.Test; + +public class TestHHXORErasureCoder extends TestHHErasureCoderBase { + + @Before + public void setup() { + this.encoderClass = HHXORErasureEncoder.class; + this.decoderClass = HHXORErasureDecoder.class; + this.numChunksInBlock = 10; + this.subPacketSize = 2; + } + + @Test + public void testCodingNoDirectBuffer_10x4_erasing_d0() { + prepare(null, 10, 4, new int[]{0}, new int[0]); + /** + * Doing twice to test if the coders can be repeatedly reused. This matters + * as the underlying coding buffers are shared, which may have bugs. + */ + testCoding(false); + testCoding(false); + } + + @Test + public void testCodingDirectBufferWithConf_10x4_erasing_d0() { + /** + * This tests if the configuration items work or not. + */ + Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, + RSRawErasureCoderFactory.class.getCanonicalName()); + prepare(conf, 10, 4, new int[]{0}, new int[0]); + + testCoding(true); + testCoding(true); + } + + @Test + public void testCodingDirectBuffer_10x4_erasing_p1() { + prepare(null, 10, 4, new int[]{}, new int[]{1}); + testCoding(true); + testCoding(true); + } + + @Test + public void testCodingDirectBuffer_10x4_erasing_d4() { + prepare(null, 10, 4, new int[] {4}, new int[] {}); + testCoding(true); + testCoding(true); + } + + @Test + public void testCodingDirectBuffer_10x4_erasing_d0_p0() { + prepare(null, 10, 4, new int[] {0}, new int[] {0}); + testCoding(true); + testCoding(true); + } + + @Test + public void testCodingBothBuffers_10x4_erasing_d0_p0() { + prepare(null, 10, 4, new int[] {0}, new int[] {0}); + + /** + * Doing in mixed buffer usage model to test if the coders can be repeatedly + * reused with different buffer usage model. This matters as the underlying + * coding buffers are shared, which may have bugs. + */ + testCoding(true); + testCoding(false); + testCoding(true); + testCoding(false); + } + + @Test + public void testCodingDirectBuffer_10x4_erasure_of_d2_d4_p0() { + prepare(null, 10, 4, new int[] {2, 4}, new int[] {0}); + testCoding(true); + } + + @Test + public void testCodingDirectBuffer_10x4_erasing_d0_d1_p0_p1() { + prepare(null, 10, 4, new int[] {0, 1}, new int[] {0, 1}); + testCoding(true); + } + +// @Test +// public void testCodingNoDirectBuffer_3x3_erasing_d0_p0() { +// prepare(null, 3, 3, new int[] {0}, new int[] {0}); +// testCoding(false); +// } + + @Test + public void testCodingDirectBuffer_6x3_erasing_d0_p0() { + prepare(null, 6, 3, new int[] {0}, new int[] {0}); + testCoding(true); + } +}