HDFS-13056. Expose file-level composite CRCs in HDFS which are comparable across different instances/layouts. Contributed by Dennis Huo.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7c9cdad6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7c9cdad6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7c9cdad6 Branch: refs/heads/HDFS-7240 Commit: 7c9cdad6d04c98db5a83e2108219bf6e6c903daf Parents: 6cc59a0 Author: Xiao Chen <x...@apache.org> Authored: Tue Apr 10 20:56:07 2018 -0700 Committer: Xiao Chen <x...@apache.org> Committed: Tue Apr 10 21:31:48 2018 -0700 ---------------------------------------------------------------------- .../hadoop/fs/CompositeCrcFileChecksum.java | 82 +++++ .../main/java/org/apache/hadoop/fs/Options.java | 11 + .../org/apache/hadoop/util/CrcComposer.java | 187 ++++++++++ .../java/org/apache/hadoop/util/CrcUtil.java | 220 +++++++++++ .../org/apache/hadoop/util/DataChecksum.java | 18 + .../org/apache/hadoop/util/TestCrcComposer.java | 242 ++++++++++++ .../org/apache/hadoop/util/TestCrcUtil.java | 232 ++++++++++++ .../main/java/org/apache/hadoop/fs/Hdfs.java | 4 +- .../java/org/apache/hadoop/hdfs/DFSClient.java | 56 ++- .../hadoop/hdfs/DistributedFileSystem.java | 5 +- .../apache/hadoop/hdfs/FileChecksumHelper.java | 365 +++++++++++++------ .../hdfs/client/HdfsClientConfigKeys.java | 2 + .../hadoop/hdfs/client/impl/DfsClientConf.java | 27 ++ .../hdfs/protocol/BlockChecksumOptions.java | 54 +++ .../hadoop/hdfs/protocol/BlockChecksumType.java | 30 ++ .../datatransfer/DataTransferProtocol.java | 12 +- .../hdfs/protocol/datatransfer/Sender.java | 11 +- .../hadoop/hdfs/protocolPB/PBHelperClient.java | 44 +++ .../src/main/proto/datatransfer.proto | 7 +- .../src/main/proto/hdfs.proto | 21 ++ .../hdfs/protocol/datatransfer/Receiver.java | 8 +- .../server/datanode/BlockChecksumHelper.java | 289 +++++++++++++-- .../hdfs/server/datanode/DataXceiver.java | 26 +- ...dBlockChecksumCompositeCrcReconstructor.java | 80 ++++ ...StripedBlockChecksumMd5CrcReconstructor.java | 74 ++++ .../StripedBlockChecksumReconstructor.java | 66 ++-- .../erasurecode/StripedBlockReconstructor.java | 1 + .../src/main/resources/hdfs-default.xml | 11 + .../org/apache/hadoop/hdfs/DFSTestUtil.java | 31 +- .../apache/hadoop/hdfs/TestFileChecksum.java | 101 ++++- .../hdfs/TestFileChecksumCompositeCrc.java | 47 +++ .../hadoop/hdfs/protocolPB/TestPBHelper.java | 14 + .../hadoop/tools/mapred/TestCopyMapper.java | 173 +++++++-- .../mapred/TestCopyMapperCompositeCrc.java | 50 +++ 34 files changed, 2359 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CompositeCrcFileChecksum.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CompositeCrcFileChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CompositeCrcFileChecksum.java new file mode 100644 index 0000000..e1ed5cb --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CompositeCrcFileChecksum.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Options.ChecksumOpt; +import org.apache.hadoop.util.CrcUtil; +import org.apache.hadoop.util.DataChecksum; + +/** Composite CRC. */ +@InterfaceAudience.LimitedPrivate({"HDFS"}) +@InterfaceStability.Unstable +public class CompositeCrcFileChecksum extends FileChecksum { + public static final int LENGTH = Integer.SIZE / Byte.SIZE; + + private int crc; + private DataChecksum.Type crcType; + private int bytesPerCrc; + + /** Create a CompositeCrcFileChecksum. */ + public CompositeCrcFileChecksum( + int crc, DataChecksum.Type crcType, int bytesPerCrc) { + this.crc = crc; + this.crcType = crcType; + this.bytesPerCrc = bytesPerCrc; + } + + @Override + public String getAlgorithmName() { + return "COMPOSITE-" + crcType.name(); + } + + @Override + public int getLength() { + return LENGTH; + } + + @Override + public byte[] getBytes() { + return CrcUtil.intToBytes(crc); + } + + @Override + public ChecksumOpt getChecksumOpt() { + return new ChecksumOpt(crcType, bytesPerCrc); + } + + @Override + public void readFields(DataInput in) throws IOException { + crc = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(crc); + } + + @Override + public String toString() { + return getAlgorithmName() + ":" + String.format("0x%08x", crc); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java index e455abf..126e754 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java @@ -504,4 +504,15 @@ public final class Options { } + /** + * Enum for indicating what mode to use when combining chunk and block + * checksums to define an aggregate FileChecksum. This should be considered + * a client-side runtime option rather than a persistent property of any + * stored metadata, which is why this is not part of ChecksumOpt, which + * deals with properties of files at rest. + */ + public enum ChecksumCombineMode { + MD5MD5CRC, // MD5 of block checksums, which are MD5 over chunk CRCs + COMPOSITE_CRC // Block/chunk-independent composite CRC + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcComposer.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcComposer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcComposer.java new file mode 100644 index 0000000..4023995 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcComposer.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.IOException; + +/** + * Encapsulates logic for composing multiple CRCs into one or more combined CRCs + * corresponding to concatenated underlying data ranges. Optimized for composing + * a large number of CRCs that correspond to underlying chunks of data all of + * same size. + */ +@InterfaceAudience.LimitedPrivate({"Common", "HDFS", "MapReduce", "Yarn"}) +@InterfaceStability.Unstable +public class CrcComposer { + private static final int CRC_SIZE_BYTES = 4; + private static final Logger LOG = LoggerFactory.getLogger(CrcComposer.class); + + private final int crcPolynomial; + private final int precomputedMonomialForHint; + private final long bytesPerCrcHint; + private final long stripeLength; + + private int curCompositeCrc = 0; + private long curPositionInStripe = 0; + private ByteArrayOutputStream digestOut = new ByteArrayOutputStream(); + + /** + * Returns a CrcComposer which will collapse all ingested CRCs into a single + * value. + */ + public static CrcComposer newCrcComposer( + DataChecksum.Type type, long bytesPerCrcHint) + throws IOException { + return newStripedCrcComposer(type, bytesPerCrcHint, Long.MAX_VALUE); + } + + /** + * Returns a CrcComposer which will collapse CRCs for every combined + * underlying data size which aligns with the specified stripe boundary. For + * example, if "update" is called with 20 CRCs and bytesPerCrc == 5, and + * stripeLength == 10, then every two (10 / 5) consecutive CRCs will be + * combined with each other, yielding a list of 10 CRC "stripes" in the + * final digest, each corresponding to 10 underlying data bytes. Using + * a stripeLength greater than the total underlying data size is equivalent + * to using a non-striped CrcComposer. + */ + public static CrcComposer newStripedCrcComposer( + DataChecksum.Type type, long bytesPerCrcHint, long stripeLength) + throws IOException { + int polynomial = DataChecksum.getCrcPolynomialForType(type); + return new CrcComposer( + polynomial, + CrcUtil.getMonomial(bytesPerCrcHint, polynomial), + bytesPerCrcHint, + stripeLength); + } + + CrcComposer( + int crcPolynomial, + int precomputedMonomialForHint, + long bytesPerCrcHint, + long stripeLength) { + LOG.debug( + "crcPolynomial=0x{}, precomputedMonomialForHint=0x{}, " + + "bytesPerCrcHint={}, stripeLength={}", + Integer.toString(crcPolynomial, 16), + Integer.toString(precomputedMonomialForHint, 16), + bytesPerCrcHint, + stripeLength); + this.crcPolynomial = crcPolynomial; + this.precomputedMonomialForHint = precomputedMonomialForHint; + this.bytesPerCrcHint = bytesPerCrcHint; + this.stripeLength = stripeLength; + } + + /** + * Composes length / CRC_SIZE_IN_BYTES more CRCs from crcBuffer, with + * each CRC expected to correspond to exactly {@code bytesPerCrc} underlying + * data bytes. + * + * @param length must be a multiple of the expected byte-size of a CRC. + */ + public void update( + byte[] crcBuffer, int offset, int length, long bytesPerCrc) + throws IOException { + if (length % CRC_SIZE_BYTES != 0) { + throw new IOException(String.format( + "Trying to update CRC from byte array with length '%d' at offset " + + "'%d' which is not a multiple of %d!", + length, offset, CRC_SIZE_BYTES)); + } + int limit = offset + length; + while (offset < limit) { + int crcB = CrcUtil.readInt(crcBuffer, offset); + update(crcB, bytesPerCrc); + offset += CRC_SIZE_BYTES; + } + } + + /** + * Composes {@code numChecksumsToRead} additional CRCs into the current digest + * out of {@code checksumIn}, with each CRC expected to correspond to exactly + * {@code bytesPerCrc} underlying data bytes. + */ + public void update( + DataInputStream checksumIn, long numChecksumsToRead, long bytesPerCrc) + throws IOException { + for (long i = 0; i < numChecksumsToRead; ++i) { + int crcB = checksumIn.readInt(); + update(crcB, bytesPerCrc); + } + } + + /** + * Updates with a single additional CRC which corresponds to an underlying + * data size of {@code bytesPerCrc}. + */ + public void update(int crcB, long bytesPerCrc) throws IOException { + if (curCompositeCrc == 0) { + curCompositeCrc = crcB; + } else if (bytesPerCrc == bytesPerCrcHint) { + curCompositeCrc = CrcUtil.composeWithMonomial( + curCompositeCrc, crcB, precomputedMonomialForHint, crcPolynomial); + } else { + curCompositeCrc = CrcUtil.compose( + curCompositeCrc, crcB, bytesPerCrc, crcPolynomial); + } + + curPositionInStripe += bytesPerCrc; + + if (curPositionInStripe > stripeLength) { + throw new IOException(String.format( + "Current position in stripe '%d' after advancing by bytesPerCrc '%d' " + + "exceeds stripeLength '%d' without stripe alignment.", + curPositionInStripe, bytesPerCrc, stripeLength)); + } else if (curPositionInStripe == stripeLength) { + // Hit a stripe boundary; flush the curCompositeCrc and reset for next + // stripe. + digestOut.write(CrcUtil.intToBytes(curCompositeCrc), 0, CRC_SIZE_BYTES); + curCompositeCrc = 0; + curPositionInStripe = 0; + } + } + + /** + * Returns byte representation of composed CRCs; if no stripeLength was + * specified, the digest should be of length equal to exactly one CRC. + * Otherwise, the number of CRCs in the returned array is equal to the + * total sum bytesPerCrc divided by stripeLength. If the sum of bytesPerCrc + * is not a multiple of stripeLength, then the last CRC in the array + * corresponds to totalLength % stripeLength underlying data bytes. + */ + public byte[] digest() { + if (curPositionInStripe > 0) { + digestOut.write(CrcUtil.intToBytes(curCompositeCrc), 0, CRC_SIZE_BYTES); + curCompositeCrc = 0; + curPositionInStripe = 0; + } + byte[] digestValue = digestOut.toByteArray(); + digestOut.reset(); + return digestValue; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcUtil.java new file mode 100644 index 0000000..42eaf14 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcUtil.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.io.IOException; +import java.util.Arrays; + +/** + * This class provides utilities for working with CRCs. + */ +@InterfaceAudience.LimitedPrivate({"Common", "HDFS", "MapReduce", "Yarn"}) +@InterfaceStability.Unstable +public final class CrcUtil { + public static final int MULTIPLICATIVE_IDENTITY = 0x80000000; + public static final int GZIP_POLYNOMIAL = 0xEDB88320; + public static final int CASTAGNOLI_POLYNOMIAL = 0x82F63B78; + + /** + * Hide default constructor for a static utils class. + */ + private CrcUtil() { + } + + /** + * Compute x^({@code lengthBytes} * 8) mod {@code mod}, where {@code mod} is + * in "reversed" (little-endian) format such that {@code mod & 1} represents + * x^31 and has an implicit term x^32. + */ + public static int getMonomial(long lengthBytes, int mod) { + if (lengthBytes == 0) { + return MULTIPLICATIVE_IDENTITY; + } else if (lengthBytes < 0) { + throw new IllegalArgumentException( + "lengthBytes must be positive, got " + lengthBytes); + } + + // Decompose into + // x^degree == x ^ SUM(bit[i] * 2^i) == PRODUCT(x ^ (bit[i] * 2^i)) + // Generate each x^(2^i) by squaring. + // Since 'degree' is in 'bits', but we only need to support byte + // granularity we can begin with x^8. + int multiplier = MULTIPLICATIVE_IDENTITY >>> 8; + int product = MULTIPLICATIVE_IDENTITY; + long degree = lengthBytes; + while (degree > 0) { + if ((degree & 1) != 0) { + product = (product == MULTIPLICATIVE_IDENTITY) ? multiplier : + galoisFieldMultiply(product, multiplier, mod); + } + multiplier = galoisFieldMultiply(multiplier, multiplier, mod); + degree >>= 1; + } + return product; + } + + /** + * @param monomial Precomputed x^(lengthBInBytes * 8) mod {@code mod} + */ + public static int composeWithMonomial( + int crcA, int crcB, int monomial, int mod) { + return galoisFieldMultiply(crcA, monomial, mod) ^ crcB; + } + + /** + * @param lengthB length of content corresponding to {@code crcB}, in bytes. + */ + public static int compose(int crcA, int crcB, long lengthB, int mod) { + int monomial = getMonomial(lengthB, mod); + return composeWithMonomial(crcA, crcB, monomial, mod); + } + + /** + * @return 4-byte array holding the big-endian representation of + * {@code value}. + */ + public static byte[] intToBytes(int value) { + byte[] buf = new byte[4]; + try { + writeInt(buf, 0, value); + } catch (IOException ioe) { + // Since this should only be able to occur from code bugs within this + // class rather than user input, we throw as a RuntimeException + // rather than requiring this method to declare throwing IOException + // for something the caller can't control. + throw new RuntimeException(ioe); + } + return buf; + } + + /** + * Writes big-endian representation of {@code value} into {@code buf} + * starting at {@code offset}. buf.length must be greater than or + * equal to offset + 4. + */ + public static void writeInt(byte[] buf, int offset, int value) + throws IOException { + if (offset + 4 > buf.length) { + throw new IOException(String.format( + "writeInt out of bounds: buf.length=%d, offset=%d", + buf.length, offset)); + } + buf[offset + 0] = (byte)((value >>> 24) & 0xff); + buf[offset + 1] = (byte)((value >>> 16) & 0xff); + buf[offset + 2] = (byte)((value >>> 8) & 0xff); + buf[offset + 3] = (byte)(value & 0xff); + } + + /** + * Reads 4-byte big-endian int value from {@code buf} starting at + * {@code offset}. buf.length must be greater than or equal to offset + 4. + */ + public static int readInt(byte[] buf, int offset) + throws IOException { + if (offset + 4 > buf.length) { + throw new IOException(String.format( + "readInt out of bounds: buf.length=%d, offset=%d", + buf.length, offset)); + } + int value = ((buf[offset + 0] & 0xff) << 24) | + ((buf[offset + 1] & 0xff) << 16) | + ((buf[offset + 2] & 0xff) << 8) | + ((buf[offset + 3] & 0xff)); + return value; + } + + /** + * For use with debug statements; verifies bytes.length on creation, + * expecting it to represent exactly one CRC, and returns a hex + * formatted value. + */ + public static String toSingleCrcString(final byte[] bytes) + throws IOException { + if (bytes.length != 4) { + throw new IOException((String.format( + "Unexpected byte[] length '%d' for single CRC. Contents: %s", + bytes.length, Arrays.toString(bytes)))); + } + return String.format("0x%08x", readInt(bytes, 0)); + } + + /** + * For use with debug statements; verifies bytes.length on creation, + * expecting it to be divisible by CRC byte size, and returns a list of + * hex formatted values. + */ + public static String toMultiCrcString(final byte[] bytes) + throws IOException { + if (bytes.length % 4 != 0) { + throw new IOException((String.format( + "Unexpected byte[] length '%d' not divisible by 4. Contents: %s", + bytes.length, Arrays.toString(bytes)))); + } + StringBuilder sb = new StringBuilder(); + sb.append('['); + for (int i = 0; i < bytes.length; i += 4) { + sb.append(String.format("0x%08x", readInt(bytes, i))); + if (i != bytes.length - 4) { + sb.append(", "); + } + } + sb.append(']'); + return sb.toString(); + } + + /** + * Galois field multiplication of {@code p} and {@code q} with the + * generator polynomial {@code m} as the modulus. + * + * @param m The little-endian polynomial to use as the modulus when + * multiplying p and q, with implicit "1" bit beyond the bottom bit. + */ + private static int galoisFieldMultiply(int p, int q, int m) { + int summation = 0; + + // Top bit is the x^0 place; each right-shift increments the degree of the + // current term. + int curTerm = MULTIPLICATIVE_IDENTITY; + + // Iteratively multiply p by x mod m as we go to represent the q[i] term + // (of degree x^i) times p. + int px = p; + + while (curTerm != 0) { + if ((q & curTerm) != 0) { + summation ^= px; + } + + // Bottom bit represents highest degree since we're little-endian; before + // we multiply by "x" for the next term, check bottom bit to know whether + // the resulting px will thus have a term matching the implicit "1" term + // of "m" and thus will need to subtract "m" after mutiplying by "x". + boolean hasMaxDegree = ((px & 1) != 0); + px >>>= 1; + if (hasMaxDegree) { + px ^= m; + } + curTerm >>>= 1; + } + return summation; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java index 43e377f..06ef8ac 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java @@ -104,6 +104,24 @@ public class DataChecksum implements Checksum { } } + /** + * @return the int representation of the polynomial associated with the + * CRC {@code type}, suitable for use with further CRC arithmetic. + * @throws IOException if there is no CRC polynomial applicable + * to the given {@code type}. + */ + public static int getCrcPolynomialForType(Type type) throws IOException { + switch (type) { + case CRC32: + return CrcUtil.GZIP_POLYNOMIAL; + case CRC32C: + return CrcUtil.CASTAGNOLI_POLYNOMIAL; + default: + throw new IOException( + "No CRC polynomial could be associated with type: " + type); + } + } + public static DataChecksum newDataChecksum(Type type, int bytesPerChecksum ) { if ( bytesPerChecksum <= 0 ) { return null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcComposer.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcComposer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcComposer.java new file mode 100644 index 0000000..f08702e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcComposer.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import static org.junit.Assert.*; + +/** + * Unittests for CrcComposer. + */ +public class TestCrcComposer { + @Rule + public Timeout globalTimeout = new Timeout(10000); + + private Random rand = new Random(1234); + + private DataChecksum.Type type = DataChecksum.Type.CRC32C; + private DataChecksum checksum = DataChecksum.newDataChecksum( + type, Integer.MAX_VALUE); + private int dataSize = 75; + private byte[] data = new byte[dataSize]; + private int chunkSize = 10; + private int cellSize = 20; + + private int fullCrc; + private int[] crcsByChunk; + private int[] crcsByCell; + + private byte[] crcBytesByChunk; + private byte[] crcBytesByCell; + + @Before + public void setup() throws IOException { + rand.nextBytes(data); + fullCrc = getRangeChecksum(data, 0, dataSize); + + // 7 chunks of size chunkSize, 1 chunk of size (dataSize % chunkSize). + crcsByChunk = new int[8]; + for (int i = 0; i < 7; ++i) { + crcsByChunk[i] = getRangeChecksum(data, i * chunkSize, chunkSize); + } + crcsByChunk[7] = getRangeChecksum( + data, (crcsByChunk.length - 1) * chunkSize, dataSize % chunkSize); + + // 3 cells of size cellSize, 1 cell of size (dataSize % cellSize). + crcsByCell = new int[4]; + for (int i = 0; i < 3; ++i) { + crcsByCell[i] = getRangeChecksum(data, i * cellSize, cellSize); + } + crcsByCell[3] = getRangeChecksum( + data, (crcsByCell.length - 1) * cellSize, dataSize % cellSize); + + crcBytesByChunk = intArrayToByteArray(crcsByChunk); + crcBytesByCell = intArrayToByteArray(crcsByCell); + } + + private int getRangeChecksum(byte[] buf, int offset, int length) { + checksum.reset(); + checksum.update(buf, offset, length); + return (int) checksum.getValue(); + } + + private byte[] intArrayToByteArray(int[] values) throws IOException { + byte[] bytes = new byte[values.length * 4]; + for (int i = 0; i < values.length; ++i) { + CrcUtil.writeInt(bytes, i * 4, values[i]); + } + return bytes; + } + + @Test + public void testUnstripedIncorrectChunkSize() throws IOException { + CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize); + + // If we incorrectly specify that all CRCs ingested correspond to chunkSize + // when the last CRC in the array actually corresponds to + // dataSize % chunkSize then we expect the resulting CRC to not be equal to + // the fullCrc. + digester.update(crcBytesByChunk, 0, crcBytesByChunk.length, chunkSize); + byte[] digest = digester.digest(); + assertEquals(4, digest.length); + int calculatedCrc = CrcUtil.readInt(digest, 0); + assertNotEquals(fullCrc, calculatedCrc); + } + + @Test + public void testUnstripedByteArray() throws IOException { + CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize); + digester.update(crcBytesByChunk, 0, crcBytesByChunk.length - 4, chunkSize); + digester.update( + crcBytesByChunk, crcBytesByChunk.length - 4, 4, dataSize % chunkSize); + + byte[] digest = digester.digest(); + assertEquals(4, digest.length); + int calculatedCrc = CrcUtil.readInt(digest, 0); + assertEquals(fullCrc, calculatedCrc); + } + + @Test + public void testUnstripedDataInputStream() throws IOException { + CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize); + DataInputStream input = + new DataInputStream(new ByteArrayInputStream(crcBytesByChunk)); + digester.update(input, crcsByChunk.length - 1, chunkSize); + digester.update(input, 1, dataSize % chunkSize); + + byte[] digest = digester.digest(); + assertEquals(4, digest.length); + int calculatedCrc = CrcUtil.readInt(digest, 0); + assertEquals(fullCrc, calculatedCrc); + } + + @Test + public void testUnstripedSingleCrcs() throws IOException { + CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize); + for (int i = 0; i < crcsByChunk.length - 1; ++i) { + digester.update(crcsByChunk[i], chunkSize); + } + digester.update(crcsByChunk[crcsByChunk.length - 1], dataSize % chunkSize); + + byte[] digest = digester.digest(); + assertEquals(4, digest.length); + int calculatedCrc = CrcUtil.readInt(digest, 0); + assertEquals(fullCrc, calculatedCrc); + } + + @Test + public void testStripedByteArray() throws IOException { + CrcComposer digester = + CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize); + digester.update(crcBytesByChunk, 0, crcBytesByChunk.length - 4, chunkSize); + digester.update( + crcBytesByChunk, crcBytesByChunk.length - 4, 4, dataSize % chunkSize); + + byte[] digest = digester.digest(); + assertArrayEquals(crcBytesByCell, digest); + } + + @Test + public void testStripedDataInputStream() throws IOException { + CrcComposer digester = + CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize); + DataInputStream input = + new DataInputStream(new ByteArrayInputStream(crcBytesByChunk)); + digester.update(input, crcsByChunk.length - 1, chunkSize); + digester.update(input, 1, dataSize % chunkSize); + + byte[] digest = digester.digest(); + assertArrayEquals(crcBytesByCell, digest); + } + + @Test + public void testStripedSingleCrcs() throws IOException { + CrcComposer digester = + CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize); + for (int i = 0; i < crcsByChunk.length - 1; ++i) { + digester.update(crcsByChunk[i], chunkSize); + } + digester.update(crcsByChunk[crcsByChunk.length - 1], dataSize % chunkSize); + + byte[] digest = digester.digest(); + assertArrayEquals(crcBytesByCell, digest); + } + + @Test + public void testMultiStageMixed() throws IOException { + CrcComposer digester = + CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize); + + // First combine chunks into cells. + DataInputStream input = + new DataInputStream(new ByteArrayInputStream(crcBytesByChunk)); + digester.update(input, crcsByChunk.length - 1, chunkSize); + digester.update(input, 1, dataSize % chunkSize); + byte[] digest = digester.digest(); + + // Second, individually combine cells into full crc. + digester = + CrcComposer.newCrcComposer(type, cellSize); + for (int i = 0; i < digest.length - 4; i += 4) { + int cellCrc = CrcUtil.readInt(digest, i); + digester.update(cellCrc, cellSize); + } + digester.update(digest, digest.length - 4, 4, dataSize % cellSize); + digest = digester.digest(); + assertEquals(4, digest.length); + int calculatedCrc = CrcUtil.readInt(digest, 0); + assertEquals(fullCrc, calculatedCrc); + } + + @Test + public void testUpdateMismatchesStripe() throws Exception { + CrcComposer digester = + CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize); + + digester.update(crcsByChunk[0], chunkSize); + + // Going from chunkSize to chunkSize + cellSize will cross a cellSize + // boundary in a single CRC, which is not allowed, since we'd lack a + // CRC corresponding to the actual cellSize boundary. + LambdaTestUtils.intercept( + IOException.class, + "stripe", + () -> digester.update(crcsByChunk[1], cellSize)); + } + + @Test + public void testUpdateByteArrayLengthUnalignedWithCrcSize() + throws Exception { + CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize); + + LambdaTestUtils.intercept( + IOException.class, + "length", + () -> digester.update(crcBytesByChunk, 0, 6, chunkSize)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcUtil.java new file mode 100644 index 0000000..a98cb8a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcUtil.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import static org.junit.Assert.*; + +/** + * Unittests for CrcUtil. + */ +public class TestCrcUtil { + @Rule + public Timeout globalTimeout = new Timeout(10000); + + private Random rand = new Random(1234); + + @Test + public void testComposeCrc32() throws IOException { + byte[] data = new byte[64 * 1024]; + rand.nextBytes(data); + doTestComposeCrc(data, DataChecksum.Type.CRC32, 512, false); + doTestComposeCrc(data, DataChecksum.Type.CRC32, 511, false); + doTestComposeCrc(data, DataChecksum.Type.CRC32, 32 * 1024, false); + doTestComposeCrc(data, DataChecksum.Type.CRC32, 32 * 1024 - 1, false); + } + + @Test + public void testComposeCrc32c() throws IOException { + byte[] data = new byte[64 * 1024]; + rand.nextBytes(data); + doTestComposeCrc(data, DataChecksum.Type.CRC32C, 512, false); + doTestComposeCrc(data, DataChecksum.Type.CRC32C, 511, false); + doTestComposeCrc(data, DataChecksum.Type.CRC32C, 32 * 1024, false); + doTestComposeCrc(data, DataChecksum.Type.CRC32C, 32 * 1024 - 1, false); + } + + @Test + public void testComposeCrc32WithMonomial() throws IOException { + byte[] data = new byte[64 * 1024]; + rand.nextBytes(data); + doTestComposeCrc(data, DataChecksum.Type.CRC32, 512, true); + doTestComposeCrc(data, DataChecksum.Type.CRC32, 511, true); + doTestComposeCrc(data, DataChecksum.Type.CRC32, 32 * 1024, true); + doTestComposeCrc(data, DataChecksum.Type.CRC32, 32 * 1024 - 1, true); + } + + @Test + public void testComposeCrc32cWithMonomial() throws IOException { + byte[] data = new byte[64 * 1024]; + rand.nextBytes(data); + doTestComposeCrc(data, DataChecksum.Type.CRC32C, 512, true); + doTestComposeCrc(data, DataChecksum.Type.CRC32C, 511, true); + doTestComposeCrc(data, DataChecksum.Type.CRC32C, 32 * 1024, true); + doTestComposeCrc(data, DataChecksum.Type.CRC32C, 32 * 1024 - 1, true); + } + + @Test + public void testComposeCrc32ZeroLength() throws IOException { + doTestComposeCrcZerolength(DataChecksum.Type.CRC32); + } + + @Test + public void testComposeCrc32CZeroLength() throws IOException { + doTestComposeCrcZerolength(DataChecksum.Type.CRC32C); + } + + /** + * Helper method to compare a DataChecksum-computed end-to-end CRC against + * a piecewise-computed CRC that uses CrcUtil.compose on "chunk CRCs" + * corresponding to ever {@code chunkSize} bytes. + */ + private static void doTestComposeCrc( + byte[] data, DataChecksum.Type type, int chunkSize, boolean useMonomial) + throws IOException { + int crcPolynomial = DataChecksum.getCrcPolynomialForType(type); + + // Get full end-to-end CRC in a single shot first. + DataChecksum checksum = DataChecksum.newDataChecksum( + type, Integer.MAX_VALUE); + checksum.update(data, 0, data.length); + int fullCrc = (int) checksum.getValue(); + + // Now compute CRCs of each chunk individually first, and compose them in a + // second pass to compare to the end-to-end CRC. + int compositeCrc = 0; + int crcMonomial = + useMonomial ? CrcUtil.getMonomial(chunkSize, crcPolynomial) : 0; + for (int offset = 0; + offset + chunkSize <= data.length; + offset += chunkSize) { + checksum.reset(); + checksum.update(data, offset, chunkSize); + int partialCrc = (int) checksum.getValue(); + if (useMonomial) { + compositeCrc = CrcUtil.composeWithMonomial( + compositeCrc, partialCrc, crcMonomial, crcPolynomial); + } else { + compositeCrc = CrcUtil.compose( + compositeCrc, partialCrc, chunkSize, crcPolynomial); + } + } + + // There may be a final partial chunk smaller than chunkSize. + int partialChunkSize = data.length % chunkSize; + if (partialChunkSize > 0) { + checksum.reset(); + checksum.update(data, data.length - partialChunkSize, partialChunkSize); + int partialCrc = (int) checksum.getValue(); + compositeCrc = CrcUtil.compose( + compositeCrc, partialCrc, partialChunkSize, crcPolynomial); + } + assertEquals( + String.format( + "Using CRC type '%s' with crcPolynomial '0x%08x' and chunkSize '%d'" + + ", expected '0x%08x', got '0x%08x'", + type, crcPolynomial, chunkSize, fullCrc, compositeCrc), + fullCrc, + compositeCrc); + } + + /** + * Helper method for testing the behavior of composing a CRC with a + * zero-length second CRC. + */ + private static void doTestComposeCrcZerolength(DataChecksum.Type type) + throws IOException { + // Without loss of generality, we can pick any integer as our fake crcA + // even if we don't happen to know the preimage. + int crcA = 0xCAFEBEEF; + int crcPolynomial = DataChecksum.getCrcPolynomialForType(type); + DataChecksum checksum = DataChecksum.newDataChecksum( + type, Integer.MAX_VALUE); + int crcB = (int) checksum.getValue(); + assertEquals(crcA, CrcUtil.compose(crcA, crcB, 0, crcPolynomial)); + + int monomial = CrcUtil.getMonomial(0, crcPolynomial); + assertEquals( + crcA, CrcUtil.composeWithMonomial(crcA, crcB, monomial, crcPolynomial)); + } + + @Test + public void testIntSerialization() throws IOException { + byte[] bytes = CrcUtil.intToBytes(0xCAFEBEEF); + assertEquals(0xCAFEBEEF, CrcUtil.readInt(bytes, 0)); + + bytes = new byte[8]; + CrcUtil.writeInt(bytes, 0, 0xCAFEBEEF); + assertEquals(0xCAFEBEEF, CrcUtil.readInt(bytes, 0)); + CrcUtil.writeInt(bytes, 4, 0xABCDABCD); + assertEquals(0xABCDABCD, CrcUtil.readInt(bytes, 4)); + + // Assert big-endian format for general Java consistency. + assertEquals(0xBEEFABCD, CrcUtil.readInt(bytes, 2)); + } + + @Test + public void testToSingleCrcStringBadLength() + throws Exception { + LambdaTestUtils.intercept( + IOException.class, + "length", + () -> CrcUtil.toSingleCrcString(new byte[8])); + } + + @Test + public void testToSingleCrcString() throws IOException { + byte[] buf = CrcUtil.intToBytes(0xcafebeef); + assertEquals( + "0xcafebeef", CrcUtil.toSingleCrcString(buf)); + } + + @Test + public void testToMultiCrcStringBadLength() + throws Exception { + LambdaTestUtils.intercept( + IOException.class, + "length", + () -> CrcUtil.toMultiCrcString(new byte[6])); + } + + @Test + public void testToMultiCrcStringMultipleElements() + throws IOException { + byte[] buf = new byte[12]; + CrcUtil.writeInt(buf, 0, 0xcafebeef); + CrcUtil.writeInt(buf, 4, 0xababcccc); + CrcUtil.writeInt(buf, 8, 0xddddefef); + assertEquals( + "[0xcafebeef, 0xababcccc, 0xddddefef]", + CrcUtil.toMultiCrcString(buf)); + } + + @Test + public void testToMultiCrcStringSingleElement() + throws IOException { + byte[] buf = new byte[4]; + CrcUtil.writeInt(buf, 0, 0xcafebeef); + assertEquals( + "[0xcafebeef]", + CrcUtil.toMultiCrcString(buf)); + } + + @Test + public void testToMultiCrcStringNoElements() + throws IOException { + assertEquals( + "[]", + CrcUtil.toMultiCrcString(new byte[0])); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java index 0138195..d07d5a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java @@ -130,9 +130,9 @@ public class Hdfs extends AbstractFileSystem { } @Override - public FileChecksum getFileChecksum(Path f) + public FileChecksum getFileChecksum(Path f) throws IOException, UnresolvedLinkException { - return dfs.getFileChecksum(getUriPath(f), Long.MAX_VALUE); + return dfs.getFileChecksumWithCombineMode(getUriPath(f), Long.MAX_VALUE); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 0875328..09154d0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -66,6 +66,7 @@ import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -76,6 +77,7 @@ import org.apache.hadoop.fs.HdfsBlockLocation; import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Options.ChecksumCombineMode; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; @@ -1753,18 +1755,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, return encryptionKey; } - /** - * Get the checksum of the whole file or a range of the file. Note that the - * range always starts from the beginning of the file. The file can be - * in replicated form, or striped mode. It can be used to checksum and compare - * two replicated files, or two striped files, but not applicable for two - * files of different block layout forms. - * @param src The file path - * @param length the length of the range, i.e., the range is [0, length] - * @return The checksum - * @see DistributedFileSystem#getFileChecksum(Path) - */ - public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length) + private FileChecksum getFileChecksumInternal( + String src, long length, ChecksumCombineMode combineMode) throws IOException { checkOpen(); Preconditions.checkArgument(length >= 0); @@ -1779,15 +1771,51 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, maker = ecPolicy != null ? new FileChecksumHelper.StripedFileNonStripedChecksumComputer(src, - length, blockLocations, namenode, this, ecPolicy) : + length, blockLocations, namenode, this, ecPolicy, combineMode) : new FileChecksumHelper.ReplicatedFileChecksumComputer(src, length, - blockLocations, namenode, this); + blockLocations, namenode, this, combineMode); maker.compute(); return maker.getFileChecksum(); } + /** + * Get the checksum of the whole file or a range of the file. Note that the + * range always starts from the beginning of the file. The file can be + * in replicated form, or striped mode. Depending on the + * dfs.checksum.combine.mode, checksums may or may not be comparable between + * different block layout forms. + * + * @param src The file path + * @param length the length of the range, i.e., the range is [0, length] + * @return The checksum + * @see DistributedFileSystem#getFileChecksum(Path) + */ + public FileChecksum getFileChecksumWithCombineMode(String src, long length) + throws IOException { + ChecksumCombineMode combineMode = getConf().getChecksumCombineMode(); + return getFileChecksumInternal(src, length, combineMode); + } + + /** + * Get the checksum of the whole file or a range of the file. Note that the + * range always starts from the beginning of the file. The file can be + * in replicated form, or striped mode. It can be used to checksum and compare + * two replicated files, or two striped files, but not applicable for two + * files of different block layout forms. + * + * @param src The file path + * @param length the length of the range, i.e., the range is [0, length] + * @return The checksum + * @see DistributedFileSystem#getFileChecksum(Path) + */ + public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length) + throws IOException { + return (MD5MD5CRC32FileChecksum) getFileChecksumInternal( + src, length, ChecksumCombineMode.MD5MD5CRC); + } + protected LocatedBlocks getBlockLocations(String src, long length) throws IOException { //get block locations for the file range http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 03cb317..1e9ed09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -1681,7 +1681,8 @@ public class DistributedFileSystem extends FileSystem return new FileSystemLinkResolver<FileChecksum>() { @Override public FileChecksum doCall(final Path p) throws IOException { - return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE); + return dfs.getFileChecksumWithCombineMode( + getPathName(p), Long.MAX_VALUE); } @Override @@ -1701,7 +1702,7 @@ public class DistributedFileSystem extends FileSystem return new FileSystemLinkResolver<FileChecksum>() { @Override public FileChecksum doCall(final Path p) throws IOException { - return dfs.getFileChecksum(getPathName(p), length); + return dfs.getFileChecksumWithCombineMode(getPathName(p), length); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java index 72cf147..8f807d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java @@ -17,9 +17,14 @@ */ package org.apache.hadoop.hdfs; +import org.apache.hadoop.fs.CompositeCrcFileChecksum; +import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum; -import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum; +import org.apache.hadoop.fs.Options.ChecksumCombineMode; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions; +import org.apache.hadoop.hdfs.protocol.BlockChecksumType; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; @@ -41,6 +46,8 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; +import org.apache.hadoop.util.CrcComposer; +import org.apache.hadoop.util.CrcUtil; import org.apache.hadoop.util.DataChecksum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,9 +74,11 @@ final class FileChecksumHelper { private final long length; private final DFSClient client; private final ClientProtocol namenode; - private final DataOutputBuffer md5out = new DataOutputBuffer(); + private final ChecksumCombineMode combineMode; + private final BlockChecksumType blockChecksumType; + private final DataOutputBuffer blockChecksumBuf = new DataOutputBuffer(); - private MD5MD5CRC32FileChecksum fileChecksum; + private FileChecksum fileChecksum; private LocatedBlocks blockLocations; private int timeout; @@ -88,12 +97,24 @@ final class FileChecksumHelper { FileChecksumComputer(String src, long length, LocatedBlocks blockLocations, ClientProtocol namenode, - DFSClient client) throws IOException { + DFSClient client, + ChecksumCombineMode combineMode) throws IOException { this.src = src; this.length = length; this.blockLocations = blockLocations; this.namenode = namenode; this.client = client; + this.combineMode = combineMode; + switch (combineMode) { + case MD5MD5CRC: + this.blockChecksumType = BlockChecksumType.MD5CRC; + break; + case COMPOSITE_CRC: + this.blockChecksumType = BlockChecksumType.COMPOSITE_CRC; + break; + default: + throw new IOException("Unknown ChecksumCombineMode: " + combineMode); + } this.remaining = length; @@ -121,11 +142,19 @@ final class FileChecksumHelper { return namenode; } - DataOutputBuffer getMd5out() { - return md5out; + ChecksumCombineMode getCombineMode() { + return combineMode; + } + + BlockChecksumType getBlockChecksumType() { + return blockChecksumType; + } + + DataOutputBuffer getBlockChecksumBuf() { + return blockChecksumBuf; } - MD5MD5CRC32FileChecksum getFileChecksum() { + FileChecksum getFileChecksum() { return fileChecksum; } @@ -226,17 +255,31 @@ final class FileChecksumHelper { } /** - * Compute and aggregate block checksums block by block. + * Compute block checksums block by block and append the raw bytes of the + * block checksums into getBlockChecksumBuf(). + * * @throws IOException */ abstract void checksumBlocks() throws IOException; /** - * Make final file checksum result given the computing process done. + * Make final file checksum result given the per-block or per-block-group + * checksums collected into getBlockChecksumBuf(). */ - MD5MD5CRC32FileChecksum makeFinalResult() { + FileChecksum makeFinalResult() throws IOException { + switch (combineMode) { + case MD5MD5CRC: + return makeMd5CrcResult(); + case COMPOSITE_CRC: + return makeCompositeCrcResult(); + default: + throw new IOException("Unknown ChecksumCombineMode: " + combineMode); + } + } + + FileChecksum makeMd5CrcResult() { //compute file MD5 - final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData()); + final MD5Hash fileMD5 = MD5Hash.digest(blockChecksumBuf.getData()); switch (crcType) { case CRC32: return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC, @@ -250,6 +293,58 @@ final class FileChecksumHelper { } } + FileChecksum makeCompositeCrcResult() throws IOException { + long blockSizeHint = 0; + if (locatedBlocks.size() > 0) { + blockSizeHint = locatedBlocks.get(0).getBlockSize(); + } + CrcComposer crcComposer = + CrcComposer.newCrcComposer(getCrcType(), blockSizeHint); + byte[] blockChecksumBytes = blockChecksumBuf.getData(); + + long sumBlockLengths = 0; + for (int i = 0; i < locatedBlocks.size() - 1; ++i) { + LocatedBlock block = locatedBlocks.get(i); + // For everything except the last LocatedBlock, we expect getBlockSize() + // to accurately reflect the number of file bytes digested in the block + // checksum. + sumBlockLengths += block.getBlockSize(); + int blockCrc = CrcUtil.readInt(blockChecksumBytes, i * 4); + + crcComposer.update(blockCrc, block.getBlockSize()); + LOG.debug( + "Added blockCrc 0x{} for block index {} of size {}", + Integer.toString(blockCrc, 16), i, block.getBlockSize()); + } + + // NB: In some cases the located blocks have their block size adjusted + // explicitly based on the requested length, but not all cases; + // these numbers may or may not reflect actual sizes on disk. + long reportedLastBlockSize = + blockLocations.getLastLocatedBlock().getBlockSize(); + long consumedLastBlockLength = reportedLastBlockSize; + if (length - sumBlockLengths < reportedLastBlockSize) { + LOG.warn( + "Last block length {} is less than reportedLastBlockSize {}", + length - sumBlockLengths, reportedLastBlockSize); + consumedLastBlockLength = length - sumBlockLengths; + } + // NB: blockChecksumBytes.length may be much longer than actual bytes + // written into the DataOutput. + int lastBlockCrc = CrcUtil.readInt( + blockChecksumBytes, 4 * (locatedBlocks.size() - 1)); + crcComposer.update(lastBlockCrc, consumedLastBlockLength); + LOG.debug( + "Added lastBlockCrc 0x{} for block index {} of size {}", + Integer.toString(lastBlockCrc, 16), + locatedBlocks.size() - 1, + consumedLastBlockLength); + + int compositeCrc = CrcUtil.readInt(crcComposer.digest(), 0); + return new CompositeCrcFileChecksum( + compositeCrc, getCrcType(), bytesPerCRC); + } + /** * Create and return a sender given an IO stream pair. */ @@ -267,6 +362,117 @@ final class FileChecksumHelper { IOUtils.closeStream(pair.out); } } + + /** + * Parses out various checksum properties like bytesPerCrc, crcPerBlock, + * and crcType from {@code checksumData} and either stores them as the + * authoritative value or compares them to a previously extracted value + * to check comppatibility. + * + * @param checksumData response from the datanode + * @param locatedBlock the block corresponding to the response + * @param datanode the datanode which produced the response + * @param blockIdx the block or block-group index of the response + */ + void extractChecksumProperties( + OpBlockChecksumResponseProto checksumData, + LocatedBlock locatedBlock, + DatanodeInfo datanode, + int blockIdx) + throws IOException { + //read byte-per-checksum + final int bpc = checksumData.getBytesPerCrc(); + if (blockIdx == 0) { //first block + setBytesPerCRC(bpc); + } else if (bpc != getBytesPerCRC()) { + if (getBlockChecksumType() == BlockChecksumType.COMPOSITE_CRC) { + LOG.warn( + "Current bytesPerCRC={} doesn't match next bpc={}, but " + + "continuing anyway because we're using COMPOSITE_CRC. " + + "If trying to preserve CHECKSUMTYPE, only the current " + + "bytesPerCRC will be preserved.", getBytesPerCRC(), bpc); + } else { + throw new IOException("Byte-per-checksum not matched: bpc=" + bpc + + " but bytesPerCRC=" + getBytesPerCRC()); + } + } + + //read crc-per-block + final long cpb = checksumData.getCrcPerBlock(); + if (getLocatedBlocks().size() > 1 && blockIdx == 0) { + setCrcPerBlock(cpb); + } + + // read crc-type + final DataChecksum.Type ct; + if (checksumData.hasCrcType()) { + ct = PBHelperClient.convert(checksumData.getCrcType()); + } else { + LOG.debug("Retrieving checksum from an earlier-version DataNode: " + + "inferring checksum by reading first byte"); + ct = getClient().inferChecksumTypeByReading(locatedBlock, datanode); + } + + if (blockIdx == 0) { + setCrcType(ct); + } else if (getCrcType() != DataChecksum.Type.MIXED && + getCrcType() != ct) { + if (getBlockChecksumType() == BlockChecksumType.COMPOSITE_CRC) { + throw new IOException( + "DataChecksum.Type.MIXED is not supported for COMPOSITE_CRC"); + } else { + // if crc types are mixed in a file + setCrcType(DataChecksum.Type.MIXED); + } + } + + if (blockIdx == 0) { + LOG.debug("set bytesPerCRC={}, crcPerBlock={}", + getBytesPerCRC(), getCrcPerBlock()); + } + } + + /** + * Parses out the raw blockChecksum bytes from {@code checksumData} + * according to the blockChecksumType and populates the cumulative + * blockChecksumBuf with it. + * + * @return a debug-string representation of the parsed checksum if + * debug is enabled, otherwise null. + */ + String populateBlockChecksumBuf(OpBlockChecksumResponseProto checksumData) + throws IOException { + String blockChecksumForDebug = null; + switch (getBlockChecksumType()) { + case MD5CRC: + //read md5 + final MD5Hash md5 = new MD5Hash( + checksumData.getBlockChecksum().toByteArray()); + md5.write(getBlockChecksumBuf()); + if (LOG.isDebugEnabled()) { + blockChecksumForDebug = md5.toString(); + } + break; + case COMPOSITE_CRC: + BlockChecksumType returnedType = PBHelperClient.convert( + checksumData.getBlockChecksumOptions().getBlockChecksumType()); + if (returnedType != BlockChecksumType.COMPOSITE_CRC) { + throw new IOException(String.format( + "Unexpected blockChecksumType '%s', expecting COMPOSITE_CRC", + returnedType)); + } + byte[] crcBytes = checksumData.getBlockChecksum().toByteArray(); + if (LOG.isDebugEnabled()) { + blockChecksumForDebug = CrcUtil.toSingleCrcString(crcBytes); + } + getBlockChecksumBuf().write(crcBytes); + break; + default: + throw new IOException( + "Unknown BlockChecksumType: " + getBlockChecksumType()); + } + return blockChecksumForDebug; + } } /** @@ -278,8 +484,10 @@ final class FileChecksumHelper { ReplicatedFileChecksumComputer(String src, long length, LocatedBlocks blockLocations, ClientProtocol namenode, - DFSClient client) throws IOException { - super(src, length, blockLocations, namenode, client); + DFSClient client, + ChecksumCombineMode combineMode) + throws IOException { + super(src, length, blockLocations, namenode, client, combineMode); } @Override @@ -295,7 +503,8 @@ final class FileChecksumHelper { LocatedBlock locatedBlock = getLocatedBlocks().get(blockIdx); if (!checksumBlock(locatedBlock)) { - throw new IOException("Fail to get block MD5 for " + locatedBlock); + throw new PathIOException( + getSrc(), "Fail to get block MD5 for " + locatedBlock); } } } @@ -368,9 +577,11 @@ final class FileChecksumHelper { LOG.debug("write to {}: {}, block={}", datanode, Op.BLOCK_CHECKSUM, block); - // get block MD5 - createSender(pair).blockChecksum(block, - locatedBlock.getBlockToken()); + // get block checksum + createSender(pair).blockChecksum( + block, + locatedBlock.getBlockToken(), + new BlockChecksumOptions(getBlockChecksumType())); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom( PBHelperClient.vintPrefixed(pair.in)); @@ -381,51 +592,11 @@ final class FileChecksumHelper { OpBlockChecksumResponseProto checksumData = reply.getChecksumResponse(); - - //read byte-per-checksum - final int bpc = checksumData.getBytesPerCrc(); - if (blockIdx == 0) { //first block - setBytesPerCRC(bpc); - } else if (bpc != getBytesPerCRC()) { - throw new IOException("Byte-per-checksum not matched: bpc=" + bpc - + " but bytesPerCRC=" + getBytesPerCRC()); - } - - //read crc-per-block - final long cpb = checksumData.getCrcPerBlock(); - if (getLocatedBlocks().size() > 1 && blockIdx == 0) { - setCrcPerBlock(cpb); - } - - //read md5 - final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray()); - md5.write(getMd5out()); - - // read crc-type - final DataChecksum.Type ct; - if (checksumData.hasCrcType()) { - ct = PBHelperClient.convert(checksumData.getCrcType()); - } else { - LOG.debug("Retrieving checksum from an earlier-version DataNode: " + - "inferring checksum by reading first byte"); - ct = getClient().inferChecksumTypeByReading(locatedBlock, datanode); - } - - if (blockIdx == 0) { // first block - setCrcType(ct); - } else if (getCrcType() != DataChecksum.Type.MIXED - && getCrcType() != ct) { - // if crc types are mixed in a file - setCrcType(DataChecksum.Type.MIXED); - } - - if (LOG.isDebugEnabled()) { - if (blockIdx == 0) { - LOG.debug("set bytesPerCRC=" + getBytesPerCRC() - + ", crcPerBlock=" + getCrcPerBlock()); - } - LOG.debug("got reply from " + datanode + ": md5=" + md5); - } + extractChecksumProperties( + checksumData, locatedBlock, datanode, blockIdx); + String blockChecksumForDebug = populateBlockChecksumBuf(checksumData); + LOG.debug("got reply from {}: blockChecksum={}, blockChecksumType={}", + datanode, blockChecksumForDebug, getBlockChecksumType()); } } } @@ -442,9 +613,10 @@ final class FileChecksumHelper { LocatedBlocks blockLocations, ClientProtocol namenode, DFSClient client, - ErasureCodingPolicy ecPolicy) + ErasureCodingPolicy ecPolicy, + ChecksumCombineMode combineMode) throws IOException { - super(src, length, blockLocations, namenode, client); + super(src, length, blockLocations, namenode, client, combineMode); this.ecPolicy = ecPolicy; } @@ -464,7 +636,8 @@ final class FileChecksumHelper { LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock; if (!checksumBlockGroup(blockGroup)) { - throw new IOException("Fail to get block MD5 for " + locatedBlock); + throw new PathIOException( + getSrc(), "Fail to get block checksum for " + locatedBlock); } } } @@ -519,16 +692,18 @@ final class FileChecksumHelper { StripedBlockInfo stripedBlockInfo, DatanodeInfo datanode, long requestedNumBytes) throws IOException { - try (IOStreamPair pair = getClient().connectToDN(datanode, getTimeout(), blockGroup.getBlockToken())) { LOG.debug("write to {}: {}, blockGroup={}", datanode, Op.BLOCK_GROUP_CHECKSUM, blockGroup); - // get block MD5 - createSender(pair).blockGroupChecksum(stripedBlockInfo, - blockGroup.getBlockToken(), requestedNumBytes); + // get block group checksum + createSender(pair).blockGroupChecksum( + stripedBlockInfo, + blockGroup.getBlockToken(), + requestedNumBytes, + new BlockChecksumOptions(getBlockChecksumType())); BlockOpResponseProto reply = BlockOpResponseProto.parseFrom( PBHelperClient.vintPrefixed(pair.in)); @@ -538,54 +713,10 @@ final class FileChecksumHelper { DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); OpBlockChecksumResponseProto checksumData = reply.getChecksumResponse(); - - //read byte-per-checksum - final int bpc = checksumData.getBytesPerCrc(); - if (bgIdx == 0) { //first block - setBytesPerCRC(bpc); - } else { - if (bpc != getBytesPerCRC()) { - throw new IOException("Byte-per-checksum not matched: bpc=" + bpc - + " but bytesPerCRC=" + getBytesPerCRC()); - } - } - - //read crc-per-block - final long cpb = checksumData.getCrcPerBlock(); - if (getLocatedBlocks().size() > 1 && bgIdx == 0) { // first block - setCrcPerBlock(cpb); - } - - //read md5 - final MD5Hash md5 = new MD5Hash( - checksumData.getMd5().toByteArray()); - md5.write(getMd5out()); - - // read crc-type - final DataChecksum.Type ct; - if (checksumData.hasCrcType()) { - ct = PBHelperClient.convert(checksumData.getCrcType()); - } else { - LOG.debug("Retrieving checksum from an earlier-version DataNode: " + - "inferring checksum by reading first byte"); - ct = getClient().inferChecksumTypeByReading(blockGroup, datanode); - } - - if (bgIdx == 0) { - setCrcType(ct); - } else if (getCrcType() != DataChecksum.Type.MIXED && - getCrcType() != ct) { - // if crc types are mixed in a file - setCrcType(DataChecksum.Type.MIXED); - } - - if (LOG.isDebugEnabled()) { - if (bgIdx == 0) { - LOG.debug("set bytesPerCRC=" + getBytesPerCRC() - + ", crcPerBlock=" + getCrcPerBlock()); - } - LOG.debug("got reply from " + datanode + ": md5=" + md5); - } + extractChecksumProperties(checksumData, blockGroup, datanode, bgIdx); + String blockChecksumForDebug = populateBlockChecksumBuf(checksumData); + LOG.debug("got reply from {}: blockChecksum={}, blockChecksumType={}", + datanode, blockChecksumForDebug, getBlockChecksumType()); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 52a7cd0..f2cec31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -120,6 +120,8 @@ public interface HdfsClientConfigKeys { String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C"; String DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum"; int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512; + String DFS_CHECKSUM_COMBINE_MODE_KEY = "dfs.checksum.combine.mode"; + String DFS_CHECKSUM_COMBINE_MODE_DEFAULT = "MD5MD5CRC"; String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout"; String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 2703617..e63e3f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.Options.ChecksumCombineMode; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.ReplicaAccessorBuilder; @@ -38,6 +39,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT; @@ -106,6 +109,7 @@ public class DfsClientConf { private final int datanodeSocketWriteTimeout; private final int ioBufferSize; private final ChecksumOpt defaultChecksumOpt; + private final ChecksumCombineMode checksumCombineMode; private final int writePacketSize; private final int writeMaxPackets; private final ByteArrayManager.Conf writeByteArrayManagerConf; @@ -177,6 +181,7 @@ public class DfsClientConf { CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); defaultChecksumOpt = getChecksumOptFromConf(conf); + checksumCombineMode = getChecksumCombineModeFromConf(conf); dataTransferTcpNoDelay = conf.getBoolean( DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY, DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT); @@ -300,6 +305,21 @@ public class DfsClientConf { } } + private static ChecksumCombineMode getChecksumCombineModeFromConf( + Configuration conf) { + final String mode = conf.get( + DFS_CHECKSUM_COMBINE_MODE_KEY, + DFS_CHECKSUM_COMBINE_MODE_DEFAULT); + try { + return ChecksumCombineMode.valueOf(mode); + } catch(IllegalArgumentException iae) { + LOG.warn("Bad checksum combine mode: {}. Using default {}", mode, + DFS_CHECKSUM_COMBINE_MODE_DEFAULT); + return ChecksumCombineMode.valueOf( + DFS_CHECKSUM_COMBINE_MODE_DEFAULT); + } + } + // Construct a checksum option from conf public static ChecksumOpt getChecksumOptFromConf(Configuration conf) { DataChecksum.Type type = getChecksumType(conf); @@ -393,6 +413,13 @@ public class DfsClientConf { } /** + * @return the checksumCombineMode + */ + public ChecksumCombineMode getChecksumCombineMode() { + return checksumCombineMode; + } + + /** * @return the writePacketSize */ public int getWritePacketSize() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumOptions.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumOptions.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumOptions.java new file mode 100644 index 0000000..82e07d4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumOptions.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Encapsulates various options related to how fine-grained data checksums are + * combined into block-level checksums. + */ +@InterfaceAudience.Private +public class BlockChecksumOptions { + private final BlockChecksumType blockChecksumType; + private final long stripeLength; + + public BlockChecksumOptions( + BlockChecksumType blockChecksumType, long stripeLength) { + this.blockChecksumType = blockChecksumType; + this.stripeLength = stripeLength; + } + + public BlockChecksumOptions(BlockChecksumType blockChecksumType) { + this(blockChecksumType, 0); + } + + public BlockChecksumType getBlockChecksumType() { + return blockChecksumType; + } + + public long getStripeLength() { + return stripeLength; + } + + @Override + public String toString() { + return String.format("blockChecksumType=%s, stripedLength=%d", + blockChecksumType, stripeLength); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumType.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumType.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumType.java new file mode 100644 index 0000000..cc33660 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumType.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Algorithms/types denoting how block-level checksums are computed using + * lower-level chunk checksums/CRCs. + */ +@InterfaceAudience.Private +public enum BlockChecksumType { + MD5CRC, // BlockChecksum obtained by taking the MD5 digest of chunk CRCs + COMPOSITE_CRC // Chunk-independent CRC, optionally striped +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index fe20c37..384f1dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; @@ -214,11 +215,13 @@ public interface DataTransferProtocol { * * @param blk a block. * @param blockToken security token for accessing the block. + * @param blockChecksumOptions determines how the block-level checksum is + * computed from underlying block metadata. * @throws IOException */ void blockChecksum(ExtendedBlock blk, - Token<BlockTokenIdentifier> blockToken) throws IOException; - + Token<BlockTokenIdentifier> blockToken, + BlockChecksumOptions blockChecksumOptions) throws IOException; /** * Get striped block group checksum (MD5 of CRC32). @@ -227,9 +230,12 @@ public interface DataTransferProtocol { * @param blockToken security token for accessing the block. * @param requestedNumBytes requested number of bytes in the block group * to compute the checksum. + * @param blockChecksumOptions determines how the block-level checksum is + * computed from underlying block metadata. * @throws IOException */ void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, Token<BlockTokenIdentifier> blockToken, - long requestedNumBytes) throws IOException; + long requestedNumBytes, + BlockChecksumOptions blockChecksumOptions) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 8a8d20d..7526f96 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -27,6 +27,7 @@ import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; @@ -267,9 +268,11 @@ public class Sender implements DataTransferProtocol { @Override public void blockChecksum(final ExtendedBlock blk, - final Token<BlockTokenIdentifier> blockToken) throws IOException { + final Token<BlockTokenIdentifier> blockToken, + BlockChecksumOptions blockChecksumOptions) throws IOException { OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) + .setBlockChecksumOptions(PBHelperClient.convert(blockChecksumOptions)) .build(); send(out, Op.BLOCK_CHECKSUM, proto); @@ -277,8 +280,9 @@ public class Sender implements DataTransferProtocol { @Override public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, - Token<BlockTokenIdentifier> blockToken, long requestedNumBytes) - throws IOException { + Token<BlockTokenIdentifier> blockToken, + long requestedNumBytes, + BlockChecksumOptions blockChecksumOptions) throws IOException { OpBlockGroupChecksumProto proto = OpBlockGroupChecksumProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader( stripedBlockInfo.getBlock(), blockToken)) @@ -291,6 +295,7 @@ public class Sender implements DataTransferProtocol { .setEcPolicy(PBHelperClient.convertErasureCodingPolicy( stripedBlockInfo.getErasureCodingPolicy())) .setRequestedNumBytes(requestedNumBytes) + .setBlockChecksumOptions(PBHelperClient.convert(blockChecksumOptions)) .build(); send(out, Op.BLOCK_GROUP_CHECKSUM, proto); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index d9e7aa0..ff9733c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -61,6 +61,8 @@ import org.apache.hadoop.hdfs.inotify.EventBatch; import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions; +import org.apache.hadoop.hdfs.protocol.BlockChecksumType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockType; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; @@ -247,6 +249,48 @@ public class PBHelperClient { return HdfsProtos.ChecksumTypeProto.valueOf(type.id); } + public static HdfsProtos.BlockChecksumTypeProto convert( + BlockChecksumType type) { + switch(type) { + case MD5CRC: + return HdfsProtos.BlockChecksumTypeProto.MD5CRC; + case COMPOSITE_CRC: + return HdfsProtos.BlockChecksumTypeProto.COMPOSITE_CRC; + default: + throw new IllegalStateException( + "BUG: BlockChecksumType not found, type=" + type); + } + } + + public static BlockChecksumType convert( + HdfsProtos.BlockChecksumTypeProto blockChecksumTypeProto) { + switch(blockChecksumTypeProto) { + case MD5CRC: + return BlockChecksumType.MD5CRC; + case COMPOSITE_CRC: + return BlockChecksumType.COMPOSITE_CRC; + default: + throw new IllegalStateException( + "BUG: BlockChecksumTypeProto not found, type=" + + blockChecksumTypeProto); + } + } + + public static HdfsProtos.BlockChecksumOptionsProto convert( + BlockChecksumOptions options) { + return HdfsProtos.BlockChecksumOptionsProto.newBuilder() + .setBlockChecksumType(convert(options.getBlockChecksumType())) + .setStripeLength(options.getStripeLength()) + .build(); + } + + public static BlockChecksumOptions convert( + HdfsProtos.BlockChecksumOptionsProto options) { + return new BlockChecksumOptions( + convert(options.getBlockChecksumType()), + options.getStripeLength()); + } + public static ExtendedBlockProto convert(final ExtendedBlock b) { if (b == null) return null; return ExtendedBlockProto.newBuilder(). --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org