HADOOP-13010. Refactor raw erasure coders. Contributed by Kai Zheng
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/77202fa1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/77202fa1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/77202fa1 Branch: refs/heads/trunk Commit: 77202fa1035a54496d11d07472fbc399148ff630 Parents: 4f513a4 Author: Kai Zheng <kai.zh...@intel.com> Authored: Fri May 27 13:23:34 2016 +0800 Committer: Kai Zheng <kai.zh...@intel.com> Committed: Fri May 27 13:23:34 2016 +0800 ---------------------------------------------------------------------- .../apache/hadoop/io/erasurecode/CodecUtil.java | 113 +++------- .../io/erasurecode/ErasureCoderOptions.java | 89 ++++++++ .../erasurecode/coder/HHXORErasureDecoder.java | 18 +- .../erasurecode/coder/HHXORErasureEncoder.java | 15 +- .../io/erasurecode/coder/RSErasureDecoder.java | 6 +- .../io/erasurecode/coder/RSErasureEncoder.java | 6 +- .../io/erasurecode/coder/XORErasureDecoder.java | 6 +- .../io/erasurecode/coder/XORErasureEncoder.java | 6 +- .../rawcoder/AbstractRawErasureCoder.java | 220 ------------------- .../rawcoder/AbstractRawErasureDecoder.java | 181 --------------- .../rawcoder/AbstractRawErasureEncoder.java | 146 ------------ .../rawcoder/ByteArrayDecodingState.java | 111 ++++++++++ .../rawcoder/ByteArrayEncodingState.java | 81 +++++++ .../rawcoder/ByteBufferDecodingState.java | 134 +++++++++++ .../rawcoder/ByteBufferEncodingState.java | 98 +++++++++ .../io/erasurecode/rawcoder/CoderOption.java | 43 ---- .../io/erasurecode/rawcoder/CoderUtil.java | 199 +++++++++++++++++ .../io/erasurecode/rawcoder/DecodingState.java | 55 +++++ .../erasurecode/rawcoder/DummyRawDecoder.java | 16 +- .../erasurecode/rawcoder/DummyRawEncoder.java | 15 +- .../rawcoder/DummyRawErasureCoderFactory.java | 10 +- .../io/erasurecode/rawcoder/EncodingState.java | 44 ++++ .../io/erasurecode/rawcoder/RSRawDecoder.java | 48 ++-- .../rawcoder/RSRawDecoderLegacy.java | 66 +++--- .../io/erasurecode/rawcoder/RSRawEncoder.java | 45 ++-- .../rawcoder/RSRawEncoderLegacy.java | 82 ++++--- .../rawcoder/RSRawErasureCoderFactory.java | 9 +- .../RSRawErasureCoderFactoryLegacy.java | 9 +- .../erasurecode/rawcoder/RawErasureCoder.java | 73 ------ .../rawcoder/RawErasureCoderFactory.java | 11 +- .../erasurecode/rawcoder/RawErasureDecoder.java | 137 +++++++++++- .../erasurecode/rawcoder/RawErasureEncoder.java | 135 +++++++++++- .../io/erasurecode/rawcoder/XORRawDecoder.java | 51 +++-- .../io/erasurecode/rawcoder/XORRawEncoder.java | 57 ++--- .../rawcoder/XORRawErasureCoderFactory.java | 9 +- .../io/erasurecode/rawcoder/package-info.java | 38 ++++ .../io/erasurecode/rawcoder/util/CoderUtil.java | 83 ------- .../erasurecode/rawcoder/util/GaloisField.java | 4 +- .../erasurecode/TestCodecRawCoderMapping.java | 29 ++- .../hadoop/io/erasurecode/TestCoderBase.java | 14 +- .../erasurecode/rawcoder/TestDummyRawCoder.java | 2 +- .../erasurecode/rawcoder/TestRawCoderBase.java | 50 +++-- .../hadoop/hdfs/DFSStripedInputStream.java | 7 +- .../hadoop/hdfs/DFSStripedOutputStream.java | 7 +- .../erasurecode/StripedReconstructor.java | 7 +- .../apache/hadoop/hdfs/StripedFileTestUtil.java | 8 +- .../hadoop/hdfs/TestDFSStripedInputStream.java | 23 +- 47 files changed, 1496 insertions(+), 1120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java index fcce071..9cd9561 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java @@ -21,7 +21,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoderFactory; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; @@ -36,115 +35,61 @@ public final class CodecUtil { /** * Create RS raw encoder according to configuration. - * @param conf configuration possibly with some items to configure the coder - * @param numDataUnits number of data units in a coding group - * @param numParityUnits number of parity units in a coding group + * @param conf configuration + * @param coderOptions coder options that's used to create the coder * @param codec the codec to use. If null, will use the default codec * @return raw encoder */ - public static RawErasureEncoder createRSRawEncoder( - Configuration conf, int numDataUnits, int numParityUnits, String codec) { + public static RawErasureEncoder createRawEncoder( + Configuration conf, String codec, ErasureCoderOptions coderOptions) { Preconditions.checkNotNull(conf); - if (codec == null) { - codec = ErasureCodeConstants.RS_DEFAULT_CODEC_NAME; - } - RawErasureCoder rawCoder = createRawCoder(conf, - getFactNameFromCodec(conf, codec), true, numDataUnits, numParityUnits); - return (RawErasureEncoder) rawCoder; - } + Preconditions.checkNotNull(codec); - /** - * Create RS raw encoder using the default codec. - */ - public static RawErasureEncoder createRSRawEncoder( - Configuration conf, int numDataUnits, int numParityUnits) { - return createRSRawEncoder(conf, numDataUnits, numParityUnits, null); + String rawCoderFactoryKey = getFactNameFromCodec(conf, codec); + + RawErasureCoderFactory fact = createRawCoderFactory(conf, + rawCoderFactoryKey); + + return fact.createEncoder(coderOptions); } /** * Create RS raw decoder according to configuration. - * @param conf configuration possibly with some items to configure the coder - * @param numDataUnits number of data units in a coding group - * @param numParityUnits number of parity units in a coding group + * @param conf configuration + * @param coderOptions coder options that's used to create the coder * @param codec the codec to use. If null, will use the default codec * @return raw decoder */ - public static RawErasureDecoder createRSRawDecoder( - Configuration conf, int numDataUnits, int numParityUnits, String codec) { + public static RawErasureDecoder createRawDecoder( + Configuration conf, String codec, ErasureCoderOptions coderOptions) { Preconditions.checkNotNull(conf); - if (codec == null) { - codec = ErasureCodeConstants.RS_DEFAULT_CODEC_NAME; - } - RawErasureCoder rawCoder = createRawCoder(conf, - getFactNameFromCodec(conf, codec), false, numDataUnits, numParityUnits); - return (RawErasureDecoder) rawCoder; - } + Preconditions.checkNotNull(codec); - /** - * Create RS raw decoder using the default codec. - */ - public static RawErasureDecoder createRSRawDecoder( - Configuration conf, int numDataUnits, int numParityUnits) { - return createRSRawDecoder(conf, numDataUnits, numParityUnits, null); - } + String rawCoderFactoryKey = getFactNameFromCodec(conf, codec); - /** - * Create XOR raw encoder according to configuration. - * @param conf configuration possibly with some items to configure the coder - * @param numDataUnits number of data units in a coding group - * @param numParityUnits number of parity units in a coding group - * @return raw encoder - */ - public static RawErasureEncoder createXORRawEncoder( - Configuration conf, int numDataUnits, int numParityUnits) { - Preconditions.checkNotNull(conf); - RawErasureCoder rawCoder = createRawCoder(conf, - getFactNameFromCodec(conf, ErasureCodeConstants.XOR_CODEC_NAME), - true, numDataUnits, numParityUnits); - return (RawErasureEncoder) rawCoder; - } + RawErasureCoderFactory fact = createRawCoderFactory(conf, + rawCoderFactoryKey); - /** - * Create XOR raw decoder according to configuration. - * @param conf configuration possibly with some items to configure the coder - * @param numDataUnits number of data units in a coding group - * @param numParityUnits number of parity units in a coding group - * @return raw decoder - */ - public static RawErasureDecoder createXORRawDecoder( - Configuration conf, int numDataUnits, int numParityUnits) { - Preconditions.checkNotNull(conf); - RawErasureCoder rawCoder = createRawCoder(conf, - getFactNameFromCodec(conf, ErasureCodeConstants.XOR_CODEC_NAME), - false, numDataUnits, numParityUnits); - return (RawErasureDecoder) rawCoder; + return fact.createDecoder(coderOptions); } - /** - * Create raw coder using specified conf and raw coder factory key. - * @param conf configuration possibly with some items to configure the coder - * @param rawCoderFactory name of the raw coder factory - * @param isEncoder is encoder or not we're going to create - * @param numDataUnits number of data units in a coding group - * @param numParityUnits number of parity units in a coding group - * @return raw coder - */ - public static RawErasureCoder createRawCoder(Configuration conf, - String rawCoderFactory, boolean isEncoder, int numDataUnits, - int numParityUnits) { - + private static RawErasureCoderFactory createRawCoderFactory( + Configuration conf, String rawCoderFactoryKey) { RawErasureCoderFactory fact; try { Class<? extends RawErasureCoderFactory> factClass = conf.getClassByName( - rawCoderFactory).asSubclass(RawErasureCoderFactory.class); + rawCoderFactoryKey).asSubclass(RawErasureCoderFactory.class); fact = factClass.newInstance(); } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { - throw new RuntimeException("Failed to create raw coder", e); + throw new RuntimeException("Failed to create raw coder factory", e); + } + + if (fact == null) { + throw new RuntimeException("Failed to create raw coder factory"); } - return isEncoder ? fact.createEncoder(numDataUnits, numParityUnits) : - fact.createDecoder(numDataUnits, numParityUnits); + return fact; } private static String getFactNameFromCodec(Configuration conf, String codec) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCoderOptions.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCoderOptions.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCoderOptions.java new file mode 100644 index 0000000..106a36c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCoderOptions.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Erasure coder configuration that maintains schema info and coder options. + */ +@InterfaceAudience.Private +public final class ErasureCoderOptions { + + private final int numDataUnits; + private final int numParityUnits; + private final int numAllUnits; + private final boolean allowChangeInputs; + private final boolean allowVerboseDump; + + public ErasureCoderOptions(int numDataUnits, int numParityUnits) { + this(numDataUnits, numParityUnits, false, false); + } + + public ErasureCoderOptions(int numDataUnits, int numParityUnits, + boolean allowChangeInputs, boolean allowVerboseDump) { + this.numDataUnits = numDataUnits; + this.numParityUnits = numParityUnits; + this.numAllUnits = numDataUnits + numParityUnits; + this.allowChangeInputs = allowChangeInputs; + this.allowVerboseDump = allowVerboseDump; + } + + /** + * The number of data input units for the coding. A unit can be a byte, + * chunk or buffer or even a block. + * @return count of data input units + */ + public int getNumDataUnits() { + return numDataUnits; + } + + /** + * The number of parity output units for the coding. A unit can be a byte, + * chunk, buffer or even a block. + * @return count of parity output units + */ + public int getNumParityUnits() { + return numParityUnits; + } + + /** + * The number of all the involved units in the coding. + * @return count of all the data units and parity units + */ + public int getNumAllUnits() { + return numAllUnits; + } + + /** + * Allow changing input buffer content (not positions). Maybe better + * performance if not allowed. + * @return true if allowing input content to be changed, false otherwise + */ + public boolean allowChangeInputs() { + return allowChangeInputs; + } + + /** + * Allow dump verbose debug info or not. + * @return true if verbose debug info is desired, false otherwise + */ + public boolean allowVerboseDump() { + return allowVerboseDump; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecoder.java index ac4df16..94487d8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureDecoder.java @@ -22,7 +22,10 @@ import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECSchema; -import org.apache.hadoop.io.erasurecode.rawcoder.*; +import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; /** * Hitchhiker is a new erasure coding algorithm developed as a research project @@ -68,17 +71,20 @@ public class HHXORErasureDecoder extends AbstractErasureDecoder { private RawErasureDecoder checkCreateRSRawDecoder() { if (rsRawDecoder == null) { - rsRawDecoder = CodecUtil.createRSRawDecoder(getConf(), - getNumDataUnits(), getNumParityUnits()); + ErasureCoderOptions coderOptions = new ErasureCoderOptions( + getNumDataUnits(), getNumParityUnits()); + rsRawDecoder = CodecUtil.createRawDecoder(getConf(), + ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions); } return rsRawDecoder; } private RawErasureEncoder checkCreateXorRawEncoder() { if (xorRawEncoder == null) { - xorRawEncoder = CodecUtil.createXORRawEncoder(getConf(), - getNumDataUnits(), getNumParityUnits()); - xorRawEncoder.setCoderOption(CoderOption.ALLOW_CHANGE_INPUTS, false); + ErasureCoderOptions coderOptions = new ErasureCoderOptions( + getNumDataUnits(), getNumParityUnits()); + xorRawEncoder = CodecUtil.createRawEncoder(getConf(), + ErasureCodeConstants.XOR_CODEC_NAME, coderOptions); } return xorRawEncoder; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncoder.java index a402469..219f25c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/HHXORErasureEncoder.java @@ -22,7 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECSchema; -import org.apache.hadoop.io.erasurecode.rawcoder.CoderOption; +import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; /** @@ -64,17 +65,21 @@ public class HHXORErasureEncoder extends AbstractErasureEncoder { private RawErasureEncoder checkCreateRSRawEncoder() { if (rsRawEncoder == null) { - rsRawEncoder = CodecUtil.createRSRawEncoder(getConf(), + ErasureCoderOptions coderOptions = new ErasureCoderOptions( getNumDataUnits(), getNumParityUnits()); + rsRawEncoder = CodecUtil.createRawEncoder(getConf(), + ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions); } return rsRawEncoder; } private RawErasureEncoder checkCreateXorRawEncoder() { if (xorRawEncoder == null) { - xorRawEncoder = CodecUtil.createXORRawEncoder(getConf(), - getNumDataUnits(), getNumParityUnits()); - xorRawEncoder.setCoderOption(CoderOption.ALLOW_CHANGE_INPUTS, false); + ErasureCoderOptions erasureCoderOptions = new ErasureCoderOptions( + getNumDataUnits(), getNumParityUnits()); + xorRawEncoder = CodecUtil.createRawEncoder(getConf(), + ErasureCodeConstants.XOR_CODEC_NAME, + erasureCoderOptions); } return xorRawEncoder; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java index 47efd29..afaaf24 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java @@ -22,6 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; /** @@ -55,8 +57,10 @@ public class RSErasureDecoder extends AbstractErasureDecoder { private RawErasureDecoder checkCreateRSRawDecoder() { if (rsRawDecoder == null) { // TODO: we should create the raw coder according to codec. - rsRawDecoder = CodecUtil.createRSRawDecoder(getConf(), + ErasureCoderOptions coderOptions = new ErasureCoderOptions( getNumDataUnits(), getNumParityUnits()); + rsRawDecoder = CodecUtil.createRawDecoder(getConf(), + ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions); } return rsRawDecoder; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java index 4806d9e..2139113 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java @@ -22,6 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; /** @@ -55,8 +57,10 @@ public class RSErasureEncoder extends AbstractErasureEncoder { private RawErasureEncoder checkCreateRSRawEncoder() { if (rawEncoder == null) { // TODO: we should create the raw coder according to codec. - rawEncoder = CodecUtil.createRSRawEncoder(getConf(), + ErasureCoderOptions coderOptions = new ErasureCoderOptions( getNumDataUnits(), getNumParityUnits()); + rawEncoder = CodecUtil.createRawEncoder(getConf(), + ErasureCodeConstants.RS_DEFAULT_CODEC_NAME, coderOptions); } return rawEncoder; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java index a61bafd..47fb8da 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java @@ -22,6 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; /** @@ -43,8 +45,10 @@ public class XORErasureDecoder extends AbstractErasureDecoder { @Override protected ErasureCodingStep prepareDecodingStep( final ECBlockGroup blockGroup) { - RawErasureDecoder rawDecoder = CodecUtil.createXORRawDecoder(getConf(), + ErasureCoderOptions coderOptions = new ErasureCoderOptions( getNumDataUnits(), getNumParityUnits()); + RawErasureDecoder rawDecoder = CodecUtil.createRawDecoder(getConf(), + ErasureCodeConstants.XOR_CODEC_NAME, coderOptions); ECBlock[] inputBlocks = getInputBlocks(blockGroup); http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java index 3f22247..1735179 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java @@ -22,6 +22,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; /** @@ -43,8 +45,10 @@ public class XORErasureEncoder extends AbstractErasureEncoder { @Override protected ErasureCodingStep prepareEncodingStep( final ECBlockGroup blockGroup) { - RawErasureEncoder rawEncoder = CodecUtil.createXORRawEncoder(getConf(), + ErasureCoderOptions coderOptions = new ErasureCoderOptions( getNumDataUnits(), getNumParityUnits()); + RawErasureEncoder rawEncoder = CodecUtil.createRawEncoder(getConf(), + ErasureCodeConstants.XOR_CODEC_NAME, coderOptions); ECBlock[] inputBlocks = getInputBlocks(blockGroup); http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java deleted file mode 100644 index b195216..0000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java +++ /dev/null @@ -1,220 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.io.erasurecode.rawcoder; - -import org.apache.hadoop.HadoopIllegalArgumentException; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configured; - -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; - -/** - * A common class of basic facilities to be shared by encoder and decoder - * - * It implements the {@link RawErasureCoder} interface. - */ -@InterfaceAudience.Private -public abstract class AbstractRawErasureCoder - extends Configured implements RawErasureCoder { - - private static byte[] emptyChunk = new byte[4096]; - private final int numDataUnits; - private final int numParityUnits; - private final int numAllUnits; - private final Map<CoderOption, Object> coderOptions; - - public AbstractRawErasureCoder(int numDataUnits, int numParityUnits) { - this.numDataUnits = numDataUnits; - this.numParityUnits = numParityUnits; - this.numAllUnits = numDataUnits + numParityUnits; - this.coderOptions = new HashMap<>(3); - - coderOptions.put(CoderOption.PREFER_DIRECT_BUFFER, preferDirectBuffer()); - coderOptions.put(CoderOption.ALLOW_CHANGE_INPUTS, false); - coderOptions.put(CoderOption.ALLOW_VERBOSE_DUMP, false); - } - - @Override - public Object getCoderOption(CoderOption option) { - if (option == null) { - throw new HadoopIllegalArgumentException("Invalid option"); - } - return coderOptions.get(option); - } - - @Override - public void setCoderOption(CoderOption option, Object value) { - if (option == null || value == null) { - throw new HadoopIllegalArgumentException( - "Invalid option or option value"); - } - if (option.isReadOnly()) { - throw new HadoopIllegalArgumentException( - "The option is read-only: " + option.name()); - } - - coderOptions.put(option, value); - } - - /** - * Make sure to return an empty chunk buffer for the desired length. - * @param leastLength - * @return empty chunk of zero bytes - */ - protected static byte[] getEmptyChunk(int leastLength) { - if (emptyChunk.length >= leastLength) { - return emptyChunk; // In most time - } - - synchronized (AbstractRawErasureCoder.class) { - emptyChunk = new byte[leastLength]; - } - - return emptyChunk; - } - - @Override - public int getNumDataUnits() { - return numDataUnits; - } - - @Override - public int getNumParityUnits() { - return numParityUnits; - } - - protected int getNumAllUnits() { - return numAllUnits; - } - - @Override - public void release() { - // Nothing to do by default - } - - /** - * Tell if direct buffer is preferred or not. It's for callers to - * decide how to allocate coding chunk buffers, using DirectByteBuffer or - * bytes array. It will return false by default. - * @return true if native buffer is preferred for performance consideration, - * otherwise false. - */ - protected boolean preferDirectBuffer() { - return false; - } - - protected boolean isAllowingChangeInputs() { - Object value = getCoderOption(CoderOption.ALLOW_CHANGE_INPUTS); - if (value != null && value instanceof Boolean) { - return (boolean) value; - } - return false; - } - - protected boolean isAllowingVerboseDump() { - Object value = getCoderOption(CoderOption.ALLOW_VERBOSE_DUMP); - if (value != null && value instanceof Boolean) { - return (boolean) value; - } - return false; - } - - /** - * Ensure a buffer filled with ZERO bytes from current readable/writable - * position. - * @param buffer a buffer ready to read / write certain size bytes - * @return the buffer itself, with ZERO bytes written, the position and limit - * are not changed after the call - */ - protected ByteBuffer resetBuffer(ByteBuffer buffer, int len) { - int pos = buffer.position(); - buffer.put(getEmptyChunk(len), 0, len); - buffer.position(pos); - - return buffer; - } - - /** - * Ensure the buffer (either input or output) ready to read or write with ZERO - * bytes fully in specified length of len. - * @param buffer bytes array buffer - * @return the buffer itself - */ - protected byte[] resetBuffer(byte[] buffer, int offset, int len) { - byte[] empty = getEmptyChunk(len); - System.arraycopy(empty, 0, buffer, offset, len); - - return buffer; - } - - /** - * Check and ensure the buffers are of the length specified by dataLen, also - * ensure the buffers are direct buffers or not according to isDirectBuffer. - * @param buffers the buffers to check - * @param allowNull whether to allow any element to be null or not - * @param dataLen the length of data available in the buffer to ensure with - * @param isDirectBuffer is direct buffer or not to ensure with - * @param isOutputs is output buffer or not - */ - protected void checkParameterBuffers(ByteBuffer[] buffers, boolean - allowNull, int dataLen, boolean isDirectBuffer, boolean isOutputs) { - for (ByteBuffer buffer : buffers) { - if (buffer == null && !allowNull) { - throw new HadoopIllegalArgumentException( - "Invalid buffer found, not allowing null"); - } else if (buffer != null) { - if (buffer.remaining() != dataLen) { - throw new HadoopIllegalArgumentException( - "Invalid buffer, not of length " + dataLen); - } - if (buffer.isDirect() != isDirectBuffer) { - throw new HadoopIllegalArgumentException( - "Invalid buffer, isDirect should be " + isDirectBuffer); - } - if (isOutputs) { - resetBuffer(buffer, dataLen); - } - } - } - } - - /** - * Check and ensure the buffers are of the length specified by dataLen. If is - * output buffers, ensure they will be ZEROed. - * @param buffers the buffers to check - * @param allowNull whether to allow any element to be null or not - * @param dataLen the length of data available in the buffer to ensure with - * @param isOutputs is output buffer or not - */ - protected void checkParameterBuffers(byte[][] buffers, boolean allowNull, - int dataLen, boolean isOutputs) { - for (byte[] buffer : buffers) { - if (buffer == null && !allowNull) { - throw new HadoopIllegalArgumentException( - "Invalid buffer found, not allowing null"); - } else if (buffer != null && buffer.length != dataLen) { - throw new HadoopIllegalArgumentException( - "Invalid buffer not of length " + dataLen); - } else if (isOutputs) { - resetBuffer(buffer, 0, dataLen); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java deleted file mode 100644 index cf2b738..0000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.io.erasurecode.rawcoder; - -import org.apache.hadoop.HadoopIllegalArgumentException; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.io.erasurecode.ECChunk; -import org.apache.hadoop.io.erasurecode.rawcoder.util.CoderUtil; - -import java.nio.ByteBuffer; - -/** - * An abstract raw erasure decoder that's to be inherited by new decoders. - * - * It implements the {@link RawErasureDecoder} interface. - */ -@InterfaceAudience.Private -public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder - implements RawErasureDecoder { - - public AbstractRawErasureDecoder(int numDataUnits, int numParityUnits) { - super(numDataUnits, numParityUnits); - } - - @Override - public void decode(ByteBuffer[] inputs, int[] erasedIndexes, - ByteBuffer[] outputs) { - checkParameters(inputs, erasedIndexes, outputs); - - ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs); - boolean usingDirectBuffer = validInput.isDirect(); - int dataLen = validInput.remaining(); - if (dataLen == 0) { - return; - } - checkParameterBuffers(inputs, true, dataLen, usingDirectBuffer, false); - checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true); - - int[] inputPositions = new int[inputs.length]; - for (int i = 0; i < inputPositions.length; i++) { - if (inputs[i] != null) { - inputPositions[i] = inputs[i].position(); - } - } - - if (usingDirectBuffer) { - doDecode(inputs, erasedIndexes, outputs); - } else { - int[] inputOffsets = new int[inputs.length]; - int[] outputOffsets = new int[outputs.length]; - byte[][] newInputs = new byte[inputs.length][]; - byte[][] newOutputs = new byte[outputs.length][]; - - ByteBuffer buffer; - for (int i = 0; i < inputs.length; ++i) { - buffer = inputs[i]; - if (buffer != null) { - inputOffsets[i] = buffer.arrayOffset() + buffer.position(); - newInputs[i] = buffer.array(); - } - } - - for (int i = 0; i < outputs.length; ++i) { - buffer = outputs[i]; - outputOffsets[i] = buffer.arrayOffset() + buffer.position(); - newOutputs[i] = buffer.array(); - } - - doDecode(newInputs, inputOffsets, dataLen, - erasedIndexes, newOutputs, outputOffsets); - } - - for (int i = 0; i < inputs.length; i++) { - if (inputs[i] != null) { - // dataLen bytes consumed - inputs[i].position(inputPositions[i] + dataLen); - } - } - } - - /** - * Perform the real decoding using Direct ByteBuffer. - * @param inputs Direct ByteBuffers expected - * @param erasedIndexes indexes of erased units in the inputs array - * @param outputs Direct ByteBuffers expected - */ - protected abstract void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, - ByteBuffer[] outputs); - - @Override - public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) { - checkParameters(inputs, erasedIndexes, outputs); - - byte[] validInput = CoderUtil.findFirstValidInput(inputs); - int dataLen = validInput.length; - if (dataLen == 0) { - return; - } - checkParameterBuffers(inputs, true, dataLen, false); - checkParameterBuffers(outputs, false, dataLen, true); - - int[] inputOffsets = new int[inputs.length]; // ALL ZERO - int[] outputOffsets = new int[outputs.length]; // ALL ZERO - - doDecode(inputs, inputOffsets, dataLen, erasedIndexes, outputs, - outputOffsets); - } - - /** - * Perform the real decoding using bytes array, supporting offsets and - * lengths. - * @param inputs the input byte arrays to read data from - * @param inputOffsets offsets for the input byte arrays to read data from - * @param dataLen how much data are to be read from - * @param erasedIndexes indexes of erased units in the inputs array - * @param outputs the output byte arrays to write resultant data into - * @param outputOffsets offsets from which to write resultant data into - */ - protected abstract void doDecode(byte[][] inputs, int[] inputOffsets, - int dataLen, int[] erasedIndexes, - byte[][] outputs, int[] outputOffsets); - - @Override - public void decode(ECChunk[] inputs, int[] erasedIndexes, - ECChunk[] outputs) { - ByteBuffer[] newInputs = ECChunk.toBuffers(inputs); - ByteBuffer[] newOutputs = ECChunk.toBuffers(outputs); - decode(newInputs, erasedIndexes, newOutputs); - } - - /** - * Check and validate decoding parameters, throw exception accordingly. The - * checking assumes it's a MDS code. Other code can override this. - * @param inputs input buffers to check - * @param erasedIndexes indexes of erased units in the inputs array - * @param outputs output buffers to check - */ - protected <T> void checkParameters(T[] inputs, int[] erasedIndexes, - T[] outputs) { - if (inputs.length != getNumParityUnits() + getNumDataUnits()) { - throw new IllegalArgumentException("Invalid inputs length"); - } - - if (erasedIndexes.length != outputs.length) { - throw new HadoopIllegalArgumentException( - "erasedIndexes and outputs mismatch in length"); - } - - if (erasedIndexes.length > getNumParityUnits()) { - throw new HadoopIllegalArgumentException( - "Too many erased, not recoverable"); - } - - int validInputs = 0; - for (T input : inputs) { - if (input != null) { - validInputs += 1; - } - } - - if (validInputs < getNumDataUnits()) { - throw new HadoopIllegalArgumentException( - "No enough valid inputs are provided, not recoverable"); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java deleted file mode 100644 index 49cc2c4..0000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.io.erasurecode.rawcoder; - -import org.apache.hadoop.HadoopIllegalArgumentException; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.io.erasurecode.ECChunk; - -import java.nio.ByteBuffer; - -/** - * An abstract raw erasure encoder that's to be inherited by new encoders. - * - * It implements the {@link RawErasureEncoder} interface. - */ -@InterfaceAudience.Private -public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder - implements RawErasureEncoder { - - public AbstractRawErasureEncoder(int numDataUnits, int numParityUnits) { - super(numDataUnits, numParityUnits); - } - - @Override - public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) { - checkParameters(inputs, outputs); - - boolean usingDirectBuffer = inputs[0].isDirect(); - int dataLen = inputs[0].remaining(); - if (dataLen == 0) { - return; - } - checkParameterBuffers(inputs, false, dataLen, usingDirectBuffer, false); - checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true); - - int[] inputPositions = new int[inputs.length]; - for (int i = 0; i < inputPositions.length; i++) { - if (inputs[i] != null) { - inputPositions[i] = inputs[i].position(); - } - } - - if (usingDirectBuffer) { - doEncode(inputs, outputs); - } else { - int[] inputOffsets = new int[inputs.length]; - int[] outputOffsets = new int[outputs.length]; - byte[][] newInputs = new byte[inputs.length][]; - byte[][] newOutputs = new byte[outputs.length][]; - - ByteBuffer buffer; - for (int i = 0; i < inputs.length; ++i) { - buffer = inputs[i]; - inputOffsets[i] = buffer.arrayOffset() + buffer.position(); - newInputs[i] = buffer.array(); - } - - for (int i = 0; i < outputs.length; ++i) { - buffer = outputs[i]; - outputOffsets[i] = buffer.arrayOffset() + buffer.position(); - newOutputs[i] = buffer.array(); - } - - doEncode(newInputs, inputOffsets, dataLen, newOutputs, outputOffsets); - } - - for (int i = 0; i < inputs.length; i++) { - if (inputs[i] != null) { - // dataLen bytes consumed - inputs[i].position(inputPositions[i] + dataLen); - } - } - } - - /** - * Perform the real encoding work using direct ByteBuffer - * @param inputs Direct ByteBuffers expected - * @param outputs Direct ByteBuffers expected - */ - protected abstract void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs); - - @Override - public void encode(byte[][] inputs, byte[][] outputs) { - checkParameters(inputs, outputs); - int dataLen = inputs[0].length; - if (dataLen == 0) { - return; - } - checkParameterBuffers(inputs, false, dataLen, false); - checkParameterBuffers(outputs, false, dataLen, true); - - int[] inputOffsets = new int[inputs.length]; // ALL ZERO - int[] outputOffsets = new int[outputs.length]; // ALL ZERO - - doEncode(inputs, inputOffsets, dataLen, outputs, outputOffsets); - } - - /** - * Perform the real encoding work using bytes array, supporting offsets - * and lengths. - * @param inputs the input byte arrays to read data from - * @param inputOffsets offsets for the input byte arrays to read data from - * @param dataLen how much data are to be read from - * @param outputs the output byte arrays to write resultant data into - * @param outputOffsets offsets from which to write resultant data into - */ - protected abstract void doEncode(byte[][] inputs, int[] inputOffsets, - int dataLen, byte[][] outputs, - int[] outputOffsets); - - @Override - public void encode(ECChunk[] inputs, ECChunk[] outputs) { - ByteBuffer[] newInputs = ECChunk.toBuffers(inputs); - ByteBuffer[] newOutputs = ECChunk.toBuffers(outputs); - encode(newInputs, newOutputs); - } - - /** - * Check and validate decoding parameters, throw exception accordingly. - * @param inputs input buffers to check - * @param outputs output buffers to check - */ - protected <T> void checkParameters(T[] inputs, T[] outputs) { - if (inputs.length != getNumDataUnits()) { - throw new HadoopIllegalArgumentException("Invalid inputs length"); - } - if (outputs.length != getNumParityUnits()) { - throw new HadoopIllegalArgumentException("Invalid outputs length"); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayDecodingState.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayDecodingState.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayDecodingState.java new file mode 100644 index 0000000..69c084d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayDecodingState.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A utility class that maintains decoding state during a decode call using + * byte array inputs. + */ +@InterfaceAudience.Private +class ByteArrayDecodingState extends DecodingState { + byte[][] inputs; + int[] inputOffsets; + int[] erasedIndexes; + byte[][] outputs; + int[] outputOffsets; + + ByteArrayDecodingState(RawErasureDecoder decoder, byte[][] inputs, + int[] erasedIndexes, byte[][] outputs) { + this.decoder = decoder; + this.inputs = inputs; + this.outputs = outputs; + this.erasedIndexes = erasedIndexes; + byte[] validInput = CoderUtil.findFirstValidInput(inputs); + this.decodeLength = validInput.length; + + checkParameters(inputs, erasedIndexes, outputs); + checkInputBuffers(inputs); + checkOutputBuffers(outputs); + + this.inputOffsets = new int[inputs.length]; // ALL ZERO + this.outputOffsets = new int[outputs.length]; // ALL ZERO + } + + ByteArrayDecodingState(RawErasureDecoder decoder, + int decodeLength, + int[] erasedIndexes, + byte[][] inputs, + int[] inputOffsets, + byte[][] outputs, + int[] outputOffsets) { + this.decoder = decoder; + this.decodeLength = decodeLength; + this.erasedIndexes = erasedIndexes; + this.inputs = inputs; + this.outputs = outputs; + this.inputOffsets = inputOffsets; + this.outputOffsets = outputOffsets; + } + + /** + * Check and ensure the buffers are of the desired length. + * @param buffers the buffers to check + */ + void checkInputBuffers(byte[][] buffers) { + int validInputs = 0; + + for (byte[] buffer : buffers) { + if (buffer == null) { + continue; + } + + if (buffer.length != decodeLength) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, not of length " + decodeLength); + } + + validInputs++; + } + + if (validInputs < decoder.getNumDataUnits()) { + throw new HadoopIllegalArgumentException( + "No enough valid inputs are provided, not recoverable"); + } + } + + /** + * Check and ensure the buffers are of the desired length. + * @param buffers the buffers to check + */ + void checkOutputBuffers(byte[][] buffers) { + for (byte[] buffer : buffers) { + if (buffer == null) { + throw new HadoopIllegalArgumentException( + "Invalid buffer found, not allowing null"); + } + + if (buffer.length != decodeLength) { + throw new HadoopIllegalArgumentException( + "Invalid buffer not of length " + decodeLength); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayEncodingState.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayEncodingState.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayEncodingState.java new file mode 100644 index 0000000..9d861d4 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteArrayEncodingState.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A utility class that maintains encoding state during an encode call using + * byte array inputs. + */ +@InterfaceAudience.Private +class ByteArrayEncodingState extends EncodingState { + byte[][] inputs; + byte[][] outputs; + int[] inputOffsets; + int[] outputOffsets; + + ByteArrayEncodingState(RawErasureEncoder encoder, + byte[][] inputs, byte[][] outputs) { + this.encoder = encoder; + byte[] validInput = CoderUtil.findFirstValidInput(inputs); + this.encodeLength = validInput.length; + this.inputs = inputs; + this.outputs = outputs; + + checkParameters(inputs, outputs); + checkBuffers(inputs); + checkBuffers(outputs); + + this.inputOffsets = new int[inputs.length]; // ALL ZERO + this.outputOffsets = new int[outputs.length]; // ALL ZERO + } + + ByteArrayEncodingState(RawErasureEncoder encoder, + int encodeLength, + byte[][] inputs, + int[] inputOffsets, + byte[][] outputs, + int[] outputOffsets) { + this.encoder = encoder; + this.encodeLength = encodeLength; + this.inputs = inputs; + this.outputs = outputs; + this.inputOffsets = inputOffsets; + this.outputOffsets = outputOffsets; + } + + /** + * Check and ensure the buffers are of the desired length. + * @param buffers the buffers to check + */ + void checkBuffers(byte[][] buffers) { + for (byte[] buffer : buffers) { + if (buffer == null) { + throw new HadoopIllegalArgumentException( + "Invalid buffer found, not allowing null"); + } + + if (buffer.length != encodeLength) { + throw new HadoopIllegalArgumentException( + "Invalid buffer not of length " + encodeLength); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferDecodingState.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferDecodingState.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferDecodingState.java new file mode 100644 index 0000000..5c5b0f6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferDecodingState.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; + +import java.nio.ByteBuffer; + +/** + * A utility class that maintains decoding state during a decode call using + * ByteBuffer inputs. + */ +@InterfaceAudience.Private +class ByteBufferDecodingState extends DecodingState { + ByteBuffer[] inputs; + ByteBuffer[] outputs; + int[] erasedIndexes; + boolean usingDirectBuffer; + + ByteBufferDecodingState(RawErasureDecoder decoder, ByteBuffer[] inputs, + int[] erasedIndexes, ByteBuffer[] outputs) { + this.decoder = decoder; + this.inputs = inputs; + this.outputs = outputs; + this.erasedIndexes = erasedIndexes; + ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs); + this.decodeLength = validInput.remaining(); + this.usingDirectBuffer = validInput.isDirect(); + + checkParameters(inputs, erasedIndexes, outputs); + checkInputBuffers(inputs); + checkOutputBuffers(outputs); + } + + /** + * Convert to a ByteArrayEncodingState when it's backed by on-heap arrays. + */ + ByteArrayDecodingState convertToByteArrayState() { + int[] inputOffsets = new int[inputs.length]; + int[] outputOffsets = new int[outputs.length]; + byte[][] newInputs = new byte[inputs.length][]; + byte[][] newOutputs = new byte[outputs.length][]; + + ByteBuffer buffer; + for (int i = 0; i < inputs.length; ++i) { + buffer = inputs[i]; + if (buffer != null) { + inputOffsets[i] = buffer.arrayOffset() + buffer.position(); + newInputs[i] = buffer.array(); + } + } + + for (int i = 0; i < outputs.length; ++i) { + buffer = outputs[i]; + outputOffsets[i] = buffer.arrayOffset() + buffer.position(); + newOutputs[i] = buffer.array(); + } + + ByteArrayDecodingState baeState = new ByteArrayDecodingState(decoder, + decodeLength, erasedIndexes, newInputs, + inputOffsets, newOutputs, outputOffsets); + return baeState; + } + + /** + * Check and ensure the buffers are of the desired length and type, direct + * buffers or not. + * @param buffers the buffers to check + */ + void checkInputBuffers(ByteBuffer[] buffers) { + int validInputs = 0; + + for (ByteBuffer buffer : buffers) { + if (buffer == null) { + continue; + } + + if (buffer.remaining() != decodeLength) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, not of length " + decodeLength); + } + if (buffer.isDirect() != usingDirectBuffer) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, isDirect should be " + usingDirectBuffer); + } + + validInputs++; + } + + if (validInputs < decoder.getNumDataUnits()) { + throw new HadoopIllegalArgumentException( + "No enough valid inputs are provided, not recoverable"); + } + } + + /** + * Check and ensure the buffers are of the desired length and type, direct + * buffers or not. + * @param buffers the buffers to check + */ + void checkOutputBuffers(ByteBuffer[] buffers) { + for (ByteBuffer buffer : buffers) { + if (buffer == null) { + throw new HadoopIllegalArgumentException( + "Invalid buffer found, not allowing null"); + } + + if (buffer.remaining() != decodeLength) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, not of length " + decodeLength); + } + if (buffer.isDirect() != usingDirectBuffer) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, isDirect should be " + usingDirectBuffer); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferEncodingState.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferEncodingState.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferEncodingState.java new file mode 100644 index 0000000..7a10ac2 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/ByteBufferEncodingState.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; + +import java.nio.ByteBuffer; + +/** + * A utility class that maintains encoding state during an encode call using + * ByteBuffer inputs. + */ +@InterfaceAudience.Private +class ByteBufferEncodingState extends EncodingState { + ByteBuffer[] inputs; + ByteBuffer[] outputs; + boolean usingDirectBuffer; + + ByteBufferEncodingState(RawErasureEncoder encoder, + ByteBuffer[] inputs, ByteBuffer[] outputs) { + this.encoder = encoder; + ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs); + this.encodeLength = validInput.remaining(); + this.usingDirectBuffer = validInput.isDirect(); + this.inputs = inputs; + this.outputs = outputs; + + checkParameters(inputs, outputs); + checkBuffers(inputs); + checkBuffers(outputs); + } + + /** + * Convert to a ByteArrayEncodingState when it's backed by on-heap arrays. + */ + ByteArrayEncodingState convertToByteArrayState() { + int[] inputOffsets = new int[inputs.length]; + int[] outputOffsets = new int[outputs.length]; + byte[][] newInputs = new byte[inputs.length][]; + byte[][] newOutputs = new byte[outputs.length][]; + + ByteBuffer buffer; + for (int i = 0; i < inputs.length; ++i) { + buffer = inputs[i]; + inputOffsets[i] = buffer.arrayOffset() + buffer.position(); + newInputs[i] = buffer.array(); + } + + for (int i = 0; i < outputs.length; ++i) { + buffer = outputs[i]; + outputOffsets[i] = buffer.arrayOffset() + buffer.position(); + newOutputs[i] = buffer.array(); + } + + ByteArrayEncodingState baeState = new ByteArrayEncodingState(encoder, + encodeLength, newInputs, inputOffsets, newOutputs, outputOffsets); + return baeState; + } + + /** + * Check and ensure the buffers are of the desired length and type, direct + * buffers or not. + * @param buffers the buffers to check + */ + void checkBuffers(ByteBuffer[] buffers) { + for (ByteBuffer buffer : buffers) { + if (buffer == null) { + throw new HadoopIllegalArgumentException( + "Invalid buffer found, not allowing null"); + } + + if (buffer.remaining() != encodeLength) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, not of length " + encodeLength); + } + if (buffer.isDirect() != usingDirectBuffer) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, isDirect should be " + usingDirectBuffer); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderOption.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderOption.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderOption.java deleted file mode 100644 index e4d97ca..0000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderOption.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.io.erasurecode.rawcoder; - -/** - * Supported erasure coder options. - */ -public enum CoderOption { - /* If direct buffer is preferred, for perf consideration */ - PREFER_DIRECT_BUFFER(true), // READ-ONLY - /** - * Allow changing input buffer content (not positions). - * Maybe better perf if allowed - */ - ALLOW_CHANGE_INPUTS(false), // READ-WRITE - /* Allow dump verbose debug info or not */ - ALLOW_VERBOSE_DUMP(false); // READ-WRITE - - private boolean isReadOnly = false; - - CoderOption(boolean isReadOnly) { - this.isReadOnly = isReadOnly; - } - - public boolean isReadOnly() { - return isReadOnly; - } -}; http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java new file mode 100644 index 0000000..aceb3c6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/CoderUtil.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.ECChunk; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +/** + * Helpful utilities for implementing some raw erasure coders. + */ +@InterfaceAudience.Private +final class CoderUtil { + + private CoderUtil() { + // No called + } + + private static byte[] emptyChunk = new byte[4096]; + + /** + * Make sure to return an empty chunk buffer for the desired length. + * @param leastLength + * @return empty chunk of zero bytes + */ + static byte[] getEmptyChunk(int leastLength) { + if (emptyChunk.length >= leastLength) { + return emptyChunk; // In most time + } + + synchronized (CoderUtil.class) { + emptyChunk = new byte[leastLength]; + } + + return emptyChunk; + } + + /** + * Ensure a buffer filled with ZERO bytes from current readable/writable + * position. + * @param buffer a buffer ready to read / write certain size bytes + * @return the buffer itself, with ZERO bytes written, the position and limit + * are not changed after the call + */ + static ByteBuffer resetBuffer(ByteBuffer buffer, int len) { + int pos = buffer.position(); + buffer.put(getEmptyChunk(len), 0, len); + buffer.position(pos); + + return buffer; + } + + /** + * Ensure the buffer (either input or output) ready to read or write with ZERO + * bytes fully in specified length of len. + * @param buffer bytes array buffer + * @return the buffer itself + */ + static byte[] resetBuffer(byte[] buffer, int offset, int len) { + byte[] empty = getEmptyChunk(len); + System.arraycopy(empty, 0, buffer, offset, len); + + return buffer; + } + + /** + * Initialize the output buffers with ZERO bytes. + * @param buffers + * @param dataLen + */ + static void resetOutputBuffers(ByteBuffer[] buffers, int dataLen) { + for (ByteBuffer buffer : buffers) { + resetBuffer(buffer, dataLen); + } + } + + /** + * Initialize the output buffers with ZERO bytes. + * @param buffers + * @param dataLen + */ + static void resetOutputBuffers(byte[][] buffers, int[] offsets, + int dataLen) { + for (int i = 0; i < buffers.length; i++) { + resetBuffer(buffers[i], offsets[i], dataLen); + } + } + + /** + * Convert an array of this chunks to an array of ByteBuffers + * @param chunks chunks to convertToByteArrayState into buffers + * @return an array of ByteBuffers + */ + static ByteBuffer[] toBuffers(ECChunk[] chunks) { + ByteBuffer[] buffers = new ByteBuffer[chunks.length]; + + ECChunk chunk; + for (int i = 0; i < chunks.length; i++) { + chunk = chunks[i]; + if (chunk == null) { + buffers[i] = null; + } else { + buffers[i] = chunk.getBuffer(); + } + } + + return buffers; + } + + /** + * Clone an input bytes array as direct ByteBuffer. + * @param input + * @param len + * @param offset + * @return direct ByteBuffer + */ + static ByteBuffer cloneAsDirectByteBuffer(byte[] input, int offset, int len) { + if (input == null) { // an input can be null, if erased or not to read + return null; + } + + ByteBuffer directBuffer = ByteBuffer.allocateDirect(len); + directBuffer.put(input, offset, len); + directBuffer.flip(); + return directBuffer; + } + + /** + * Get indexes array for items marked as null, either erased or + * not to read. + * @return indexes array + */ + static <T> int[] getNullIndexes(T[] inputs) { + int[] nullIndexes = new int[inputs.length]; + int idx = 0; + for (int i = 0; i < inputs.length; i++) { + if (inputs[i] == null) { + nullIndexes[idx++] = i; + } + } + + return Arrays.copyOf(nullIndexes, idx); + } + + /** + * Find the valid input from all the inputs. + * @param inputs input buffers to look for valid input + * @return the first valid input + */ + static <T> T findFirstValidInput(T[] inputs) { + if (inputs.length > 0 && inputs[0] != null) { + return inputs[0]; + } + + for (T input : inputs) { + if (input != null) { + return input; + } + } + + throw new HadoopIllegalArgumentException( + "Invalid inputs are found, all being null"); + } + + /** + * Picking up indexes of valid inputs. + * @param inputs decoding input buffers + * @param <T> + */ + static <T> int[] getValidIndexes(T[] inputs) { + int[] validIndexes = new int[inputs.length]; + int idx = 0; + for (int i = 0; i < inputs.length; i++) { + if (inputs[i] != null) { + validIndexes[idx++] = i; + } + } + + return Arrays.copyOf(validIndexes, idx); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingState.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingState.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingState.java new file mode 100644 index 0000000..4b693a4 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingState.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A utility class that maintains decoding state during a decode call. + */ +@InterfaceAudience.Private +class DecodingState { + RawErasureDecoder decoder; + int decodeLength; + + /** + * Check and validate decoding parameters, throw exception accordingly. The + * checking assumes it's a MDS code. Other code can override this. + * @param inputs input buffers to check + * @param erasedIndexes indexes of erased units in the inputs array + * @param outputs output buffers to check + */ + <T> void checkParameters(T[] inputs, int[] erasedIndexes, + T[] outputs) { + if (inputs.length != decoder.getNumParityUnits() + + decoder.getNumDataUnits()) { + throw new IllegalArgumentException("Invalid inputs length"); + } + + if (erasedIndexes.length != outputs.length) { + throw new HadoopIllegalArgumentException( + "erasedIndexes and outputs mismatch in length"); + } + + if (erasedIndexes.length > decoder.getNumParityUnits()) { + throw new HadoopIllegalArgumentException( + "Too many erased, not recoverable"); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawDecoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawDecoder.java index 25dfa57..256a725 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawDecoder.java @@ -18,8 +18,7 @@ package org.apache.hadoop.io.erasurecode.rawcoder; import org.apache.hadoop.classification.InterfaceAudience; - -import java.nio.ByteBuffer; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; /** * A dummy raw decoder that does no real computation. @@ -28,20 +27,19 @@ import java.nio.ByteBuffer; * instead of codec, and is intended for test only. */ @InterfaceAudience.Private -public class DummyRawDecoder extends AbstractRawErasureDecoder { - public DummyRawDecoder(int numDataUnits, int numParityUnits) { - super(numDataUnits, numParityUnits); +public class DummyRawDecoder extends RawErasureDecoder { + + public DummyRawDecoder(ErasureCoderOptions coderOptions) { + super(coderOptions); } @Override - protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, - ByteBuffer[] outputs) { + protected void doDecode(ByteBufferDecodingState decodingState) { // Nothing to do. Output buffers have already been reset } @Override - protected void doDecode(byte[][] inputs, int[] inputOffsets, int dataLen, - int[] erasedIndexes, byte[][] outputs, int[] outputOffsets) { + protected void doDecode(ByteArrayDecodingState decodingState) { // Nothing to do. Output buffers have already been reset } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawEncoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawEncoder.java index 33e026d..558e350 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawEncoder.java @@ -18,8 +18,7 @@ package org.apache.hadoop.io.erasurecode.rawcoder; import org.apache.hadoop.classification.InterfaceAudience; - -import java.nio.ByteBuffer; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; /** * A dummy raw encoder that does no real computation. @@ -28,19 +27,19 @@ import java.nio.ByteBuffer; * instead of codec, and is intended for test only. */ @InterfaceAudience.Private -public class DummyRawEncoder extends AbstractRawErasureEncoder { - public DummyRawEncoder(int numDataUnits, int numParityUnits) { - super(numDataUnits, numParityUnits); +public class DummyRawEncoder extends RawErasureEncoder { + + public DummyRawEncoder(ErasureCoderOptions coderOptions) { + super(coderOptions); } @Override - protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) { + protected void doEncode(ByteArrayEncodingState encodingState) { // Nothing to do. Output buffers have already been reset } @Override - protected void doEncode(byte[][] inputs, int[] inputOffsets, int dataLen, - byte[][] outputs, int[] outputOffsets) { + protected void doEncode(ByteBufferEncodingState encodingState) { // Nothing to do. Output buffers have already been reset } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawErasureCoderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawErasureCoderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawErasureCoderFactory.java index 73457c2..31ba4ef 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawErasureCoderFactory.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DummyRawErasureCoderFactory.java @@ -18,19 +18,21 @@ package org.apache.hadoop.io.erasurecode.rawcoder; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; /** * A raw erasure coder factory for dummy raw coders. */ @InterfaceAudience.Private public class DummyRawErasureCoderFactory implements RawErasureCoderFactory { + @Override - public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits) { - return new DummyRawEncoder(numDataUnits, numParityUnits); + public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) { + return new DummyRawEncoder(coderOptions); } @Override - public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits) { - return new DummyRawDecoder(numDataUnits, numParityUnits); + public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) { + return new DummyRawDecoder(coderOptions); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/77202fa1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/EncodingState.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/EncodingState.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/EncodingState.java new file mode 100644 index 0000000..a8946d2 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/EncodingState.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A utility class that maintains encoding state during an encode call. + */ +@InterfaceAudience.Private +abstract class EncodingState { + RawErasureEncoder encoder; + int encodeLength; + + /** + * Check and validate decoding parameters, throw exception accordingly. + * @param inputs input buffers to check + * @param outputs output buffers to check + */ + <T> void checkParameters(T[] inputs, T[] outputs) { + if (inputs.length != encoder.getNumDataUnits()) { + throw new HadoopIllegalArgumentException("Invalid inputs length"); + } + if (outputs.length != encoder.getNumParityUnits()) { + throw new HadoopIllegalArgumentException("Invalid outputs length"); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org