HDFS-13056. Expose file-level composite CRCs in HDFS which are comparable 
across different instances/layouts. Contributed by Dennis Huo.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7c9cdad6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7c9cdad6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7c9cdad6

Branch: refs/heads/trunk
Commit: 7c9cdad6d04c98db5a83e2108219bf6e6c903daf
Parents: 6cc59a0
Author: Xiao Chen <x...@apache.org>
Authored: Tue Apr 10 20:56:07 2018 -0700
Committer: Xiao Chen <x...@apache.org>
Committed: Tue Apr 10 21:31:48 2018 -0700

----------------------------------------------------------------------
 .../hadoop/fs/CompositeCrcFileChecksum.java     |  82 +++++
 .../main/java/org/apache/hadoop/fs/Options.java |  11 +
 .../org/apache/hadoop/util/CrcComposer.java     | 187 ++++++++++
 .../java/org/apache/hadoop/util/CrcUtil.java    | 220 +++++++++++
 .../org/apache/hadoop/util/DataChecksum.java    |  18 +
 .../org/apache/hadoop/util/TestCrcComposer.java | 242 ++++++++++++
 .../org/apache/hadoop/util/TestCrcUtil.java     | 232 ++++++++++++
 .../main/java/org/apache/hadoop/fs/Hdfs.java    |   4 +-
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |  56 ++-
 .../hadoop/hdfs/DistributedFileSystem.java      |   5 +-
 .../apache/hadoop/hdfs/FileChecksumHelper.java  | 365 +++++++++++++------
 .../hdfs/client/HdfsClientConfigKeys.java       |   2 +
 .../hadoop/hdfs/client/impl/DfsClientConf.java  |  27 ++
 .../hdfs/protocol/BlockChecksumOptions.java     |  54 +++
 .../hadoop/hdfs/protocol/BlockChecksumType.java |  30 ++
 .../datatransfer/DataTransferProtocol.java      |  12 +-
 .../hdfs/protocol/datatransfer/Sender.java      |  11 +-
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |  44 +++
 .../src/main/proto/datatransfer.proto           |   7 +-
 .../src/main/proto/hdfs.proto                   |  21 ++
 .../hdfs/protocol/datatransfer/Receiver.java    |   8 +-
 .../server/datanode/BlockChecksumHelper.java    | 289 +++++++++++++--
 .../hdfs/server/datanode/DataXceiver.java       |  26 +-
 ...dBlockChecksumCompositeCrcReconstructor.java |  80 ++++
 ...StripedBlockChecksumMd5CrcReconstructor.java |  74 ++++
 .../StripedBlockChecksumReconstructor.java      |  66 ++--
 .../erasurecode/StripedBlockReconstructor.java  |   1 +
 .../src/main/resources/hdfs-default.xml         |  11 +
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  31 +-
 .../apache/hadoop/hdfs/TestFileChecksum.java    | 101 ++++-
 .../hdfs/TestFileChecksumCompositeCrc.java      |  47 +++
 .../hadoop/hdfs/protocolPB/TestPBHelper.java    |  14 +
 .../hadoop/tools/mapred/TestCopyMapper.java     | 173 +++++++--
 .../mapred/TestCopyMapperCompositeCrc.java      |  50 +++
 34 files changed, 2359 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CompositeCrcFileChecksum.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CompositeCrcFileChecksum.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CompositeCrcFileChecksum.java
new file mode 100644
index 0000000..e1ed5cb
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CompositeCrcFileChecksum.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
+import org.apache.hadoop.util.CrcUtil;
+import org.apache.hadoop.util.DataChecksum;
+
+/** Composite CRC. */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+@InterfaceStability.Unstable
+public class CompositeCrcFileChecksum extends FileChecksum {
+  public static final int LENGTH = Integer.SIZE / Byte.SIZE;
+
+  private int crc;
+  private DataChecksum.Type crcType;
+  private int bytesPerCrc;
+
+  /** Create a CompositeCrcFileChecksum. */
+  public CompositeCrcFileChecksum(
+      int crc, DataChecksum.Type crcType, int bytesPerCrc) {
+    this.crc = crc;
+    this.crcType = crcType;
+    this.bytesPerCrc = bytesPerCrc;
+  }
+
+  @Override
+  public String getAlgorithmName() {
+    return "COMPOSITE-" + crcType.name();
+  }
+
+  @Override
+  public int getLength() {
+    return LENGTH;
+  }
+
+  @Override
+  public byte[] getBytes() {
+    return CrcUtil.intToBytes(crc);
+  }
+
+  @Override
+  public ChecksumOpt getChecksumOpt() {
+    return new ChecksumOpt(crcType, bytesPerCrc);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    crc = in.readInt();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(crc);
+  }
+
+  @Override
+  public String toString() {
+    return getAlgorithmName() + ":" + String.format("0x%08x", crc);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
index e455abf..126e754 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
@@ -504,4 +504,15 @@ public final class Options {
 
   }
 
+  /**
+   * Enum for indicating what mode to use when combining chunk and block
+   * checksums to define an aggregate FileChecksum. This should be considered
+   * a client-side runtime option rather than a persistent property of any
+   * stored metadata, which is why this is not part of ChecksumOpt, which
+   * deals with properties of files at rest.
+   */
+  public enum ChecksumCombineMode {
+    MD5MD5CRC,  // MD5 of block checksums, which are MD5 over chunk CRCs
+    COMPOSITE_CRC  // Block/chunk-independent composite CRC
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcComposer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcComposer.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcComposer.java
new file mode 100644
index 0000000..4023995
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcComposer.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+/**
+ * Encapsulates logic for composing multiple CRCs into one or more combined 
CRCs
+ * corresponding to concatenated underlying data ranges. Optimized for 
composing
+ * a large number of CRCs that correspond to underlying chunks of data all of
+ * same size.
+ */
+@InterfaceAudience.LimitedPrivate({"Common", "HDFS", "MapReduce", "Yarn"})
+@InterfaceStability.Unstable
+public class CrcComposer {
+  private static final int CRC_SIZE_BYTES = 4;
+  private static final Logger LOG = LoggerFactory.getLogger(CrcComposer.class);
+
+  private final int crcPolynomial;
+  private final int precomputedMonomialForHint;
+  private final long bytesPerCrcHint;
+  private final long stripeLength;
+
+  private int curCompositeCrc = 0;
+  private long curPositionInStripe = 0;
+  private ByteArrayOutputStream digestOut = new ByteArrayOutputStream();
+
+  /**
+   * Returns a CrcComposer which will collapse all ingested CRCs into a single
+   * value.
+   */
+  public static CrcComposer newCrcComposer(
+      DataChecksum.Type type, long bytesPerCrcHint)
+      throws IOException {
+    return newStripedCrcComposer(type, bytesPerCrcHint, Long.MAX_VALUE);
+  }
+
+  /**
+   * Returns a CrcComposer which will collapse CRCs for every combined
+   * underlying data size which aligns with the specified stripe boundary. For
+   * example, if "update" is called with 20 CRCs and bytesPerCrc == 5, and
+   * stripeLength == 10, then every two (10 / 5) consecutive CRCs will be
+   * combined with each other, yielding a list of 10 CRC "stripes" in the
+   * final digest, each corresponding to 10 underlying data bytes. Using
+   * a stripeLength greater than the total underlying data size is equivalent
+   * to using a non-striped CrcComposer.
+   */
+  public static CrcComposer newStripedCrcComposer(
+      DataChecksum.Type type, long bytesPerCrcHint, long stripeLength)
+      throws IOException {
+    int polynomial = DataChecksum.getCrcPolynomialForType(type);
+    return new CrcComposer(
+        polynomial,
+        CrcUtil.getMonomial(bytesPerCrcHint, polynomial),
+        bytesPerCrcHint,
+        stripeLength);
+  }
+
+  CrcComposer(
+      int crcPolynomial,
+      int precomputedMonomialForHint,
+      long bytesPerCrcHint,
+      long stripeLength) {
+    LOG.debug(
+        "crcPolynomial=0x{}, precomputedMonomialForHint=0x{}, "
+        + "bytesPerCrcHint={}, stripeLength={}",
+        Integer.toString(crcPolynomial, 16),
+        Integer.toString(precomputedMonomialForHint, 16),
+        bytesPerCrcHint,
+        stripeLength);
+    this.crcPolynomial = crcPolynomial;
+    this.precomputedMonomialForHint = precomputedMonomialForHint;
+    this.bytesPerCrcHint = bytesPerCrcHint;
+    this.stripeLength = stripeLength;
+  }
+
+  /**
+   * Composes length / CRC_SIZE_IN_BYTES more CRCs from crcBuffer, with
+   * each CRC expected to correspond to exactly {@code bytesPerCrc} underlying
+   * data bytes.
+   *
+   * @param length must be a multiple of the expected byte-size of a CRC.
+   */
+  public void update(
+      byte[] crcBuffer, int offset, int length, long bytesPerCrc)
+      throws IOException {
+    if (length % CRC_SIZE_BYTES != 0) {
+      throw new IOException(String.format(
+          "Trying to update CRC from byte array with length '%d' at offset "
+          + "'%d' which is not a multiple of %d!",
+          length, offset, CRC_SIZE_BYTES));
+    }
+    int limit = offset + length;
+    while (offset < limit) {
+      int crcB = CrcUtil.readInt(crcBuffer, offset);
+      update(crcB, bytesPerCrc);
+      offset += CRC_SIZE_BYTES;
+    }
+  }
+
+  /**
+   * Composes {@code numChecksumsToRead} additional CRCs into the current 
digest
+   * out of {@code checksumIn}, with each CRC expected to correspond to exactly
+   * {@code bytesPerCrc} underlying data bytes.
+   */
+  public void update(
+      DataInputStream checksumIn, long numChecksumsToRead, long bytesPerCrc)
+      throws IOException {
+    for (long i = 0; i < numChecksumsToRead; ++i) {
+      int crcB = checksumIn.readInt();
+      update(crcB, bytesPerCrc);
+    }
+  }
+
+  /**
+   * Updates with a single additional CRC which corresponds to an underlying
+   * data size of {@code bytesPerCrc}.
+   */
+  public void update(int crcB, long bytesPerCrc) throws IOException {
+    if (curCompositeCrc == 0) {
+      curCompositeCrc = crcB;
+    } else if (bytesPerCrc == bytesPerCrcHint) {
+      curCompositeCrc = CrcUtil.composeWithMonomial(
+          curCompositeCrc, crcB, precomputedMonomialForHint, crcPolynomial);
+    } else {
+      curCompositeCrc = CrcUtil.compose(
+          curCompositeCrc, crcB, bytesPerCrc, crcPolynomial);
+    }
+
+    curPositionInStripe += bytesPerCrc;
+
+    if (curPositionInStripe > stripeLength) {
+      throw new IOException(String.format(
+          "Current position in stripe '%d' after advancing by bytesPerCrc '%d' 
"
+          + "exceeds stripeLength '%d' without stripe alignment.",
+          curPositionInStripe, bytesPerCrc, stripeLength));
+    } else if (curPositionInStripe == stripeLength) {
+      // Hit a stripe boundary; flush the curCompositeCrc and reset for next
+      // stripe.
+      digestOut.write(CrcUtil.intToBytes(curCompositeCrc), 0, CRC_SIZE_BYTES);
+      curCompositeCrc = 0;
+      curPositionInStripe = 0;
+    }
+  }
+
+  /**
+   * Returns byte representation of composed CRCs; if no stripeLength was
+   * specified, the digest should be of length equal to exactly one CRC.
+   * Otherwise, the number of CRCs in the returned array is equal to the
+   * total sum bytesPerCrc divided by stripeLength. If the sum of bytesPerCrc
+   * is not a multiple of stripeLength, then the last CRC in the array
+   * corresponds to totalLength % stripeLength underlying data bytes.
+   */
+  public byte[] digest() {
+    if (curPositionInStripe > 0) {
+      digestOut.write(CrcUtil.intToBytes(curCompositeCrc), 0, CRC_SIZE_BYTES);
+      curCompositeCrc = 0;
+      curPositionInStripe = 0;
+    }
+    byte[] digestValue = digestOut.toByteArray();
+    digestOut.reset();
+    return digestValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcUtil.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcUtil.java
new file mode 100644
index 0000000..42eaf14
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CrcUtil.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * This class provides utilities for working with CRCs.
+ */
+@InterfaceAudience.LimitedPrivate({"Common", "HDFS", "MapReduce", "Yarn"})
+@InterfaceStability.Unstable
+public final class CrcUtil {
+  public static final int MULTIPLICATIVE_IDENTITY = 0x80000000;
+  public static final int GZIP_POLYNOMIAL = 0xEDB88320;
+  public static final int CASTAGNOLI_POLYNOMIAL = 0x82F63B78;
+
+  /**
+   * Hide default constructor for a static utils class.
+   */
+  private CrcUtil() {
+  }
+
+  /**
+   * Compute x^({@code lengthBytes} * 8) mod {@code mod}, where {@code mod} is
+   * in "reversed" (little-endian) format such that {@code mod & 1} represents
+   * x^31 and has an implicit term x^32.
+   */
+  public static int getMonomial(long lengthBytes, int mod) {
+    if (lengthBytes == 0) {
+      return MULTIPLICATIVE_IDENTITY;
+    } else if (lengthBytes < 0) {
+      throw new IllegalArgumentException(
+          "lengthBytes must be positive, got " + lengthBytes);
+    }
+
+    // Decompose into
+    // x^degree == x ^ SUM(bit[i] * 2^i) == PRODUCT(x ^ (bit[i] * 2^i))
+    // Generate each x^(2^i) by squaring.
+    // Since 'degree' is in 'bits', but we only need to support byte
+    // granularity we can begin with x^8.
+    int multiplier = MULTIPLICATIVE_IDENTITY >>> 8;
+    int product = MULTIPLICATIVE_IDENTITY;
+    long degree = lengthBytes;
+    while (degree > 0) {
+      if ((degree & 1) != 0) {
+        product = (product == MULTIPLICATIVE_IDENTITY) ? multiplier :
+            galoisFieldMultiply(product, multiplier, mod);
+      }
+      multiplier = galoisFieldMultiply(multiplier, multiplier, mod);
+      degree >>= 1;
+    }
+    return product;
+  }
+
+  /**
+   * @param monomial Precomputed x^(lengthBInBytes * 8) mod {@code mod}
+   */
+  public static int composeWithMonomial(
+      int crcA, int crcB, int monomial, int mod) {
+    return galoisFieldMultiply(crcA, monomial, mod) ^ crcB;
+  }
+
+  /**
+   * @param lengthB length of content corresponding to {@code crcB}, in bytes.
+   */
+  public static int compose(int crcA, int crcB, long lengthB, int mod) {
+    int monomial = getMonomial(lengthB, mod);
+    return composeWithMonomial(crcA, crcB, monomial, mod);
+  }
+
+  /**
+   * @return 4-byte array holding the big-endian representation of
+   *     {@code value}.
+   */
+  public static byte[] intToBytes(int value) {
+    byte[] buf = new byte[4];
+    try {
+      writeInt(buf, 0, value);
+    } catch (IOException ioe) {
+      // Since this should only be able to occur from code bugs within this
+      // class rather than user input, we throw as a RuntimeException
+      // rather than requiring this method to declare throwing IOException
+      // for something the caller can't control.
+      throw new RuntimeException(ioe);
+    }
+    return buf;
+  }
+
+  /**
+   * Writes big-endian representation of {@code value} into {@code buf}
+   * starting at {@code offset}. buf.length must be greater than or
+   * equal to offset + 4.
+   */
+  public static void writeInt(byte[] buf, int offset, int value)
+      throws IOException {
+    if (offset + 4  > buf.length) {
+      throw new IOException(String.format(
+          "writeInt out of bounds: buf.length=%d, offset=%d",
+          buf.length, offset));
+    }
+    buf[offset + 0] = (byte)((value >>> 24) & 0xff);
+    buf[offset + 1] = (byte)((value >>> 16) & 0xff);
+    buf[offset + 2] = (byte)((value >>> 8) & 0xff);
+    buf[offset + 3] = (byte)(value & 0xff);
+  }
+
+  /**
+   * Reads 4-byte big-endian int value from {@code buf} starting at
+   * {@code offset}. buf.length must be greater than or equal to offset + 4.
+   */
+  public static int readInt(byte[] buf, int offset)
+      throws IOException {
+    if (offset + 4  > buf.length) {
+      throw new IOException(String.format(
+          "readInt out of bounds: buf.length=%d, offset=%d",
+          buf.length, offset));
+    }
+    int value = ((buf[offset + 0] & 0xff) << 24) |
+                ((buf[offset + 1] & 0xff) << 16) |
+                ((buf[offset + 2] & 0xff) << 8)  |
+                ((buf[offset + 3] & 0xff));
+    return value;
+  }
+
+  /**
+   * For use with debug statements; verifies bytes.length on creation,
+   * expecting it to represent exactly one CRC, and returns a hex
+   * formatted value.
+   */
+  public static String toSingleCrcString(final byte[] bytes)
+      throws IOException {
+    if (bytes.length != 4) {
+      throw new IOException((String.format(
+          "Unexpected byte[] length '%d' for single CRC. Contents: %s",
+          bytes.length, Arrays.toString(bytes))));
+    }
+    return String.format("0x%08x", readInt(bytes, 0));
+  }
+
+  /**
+   * For use with debug statements; verifies bytes.length on creation,
+   * expecting it to be divisible by CRC byte size, and returns a list of
+   * hex formatted values.
+   */
+  public static String toMultiCrcString(final byte[] bytes)
+      throws IOException {
+    if (bytes.length % 4 != 0) {
+      throw new IOException((String.format(
+          "Unexpected byte[] length '%d' not divisible by 4. Contents: %s",
+          bytes.length, Arrays.toString(bytes))));
+    }
+    StringBuilder sb = new StringBuilder();
+    sb.append('[');
+    for (int i = 0; i < bytes.length; i += 4) {
+      sb.append(String.format("0x%08x", readInt(bytes, i)));
+      if (i != bytes.length - 4) {
+        sb.append(", ");
+      }
+    }
+    sb.append(']');
+    return sb.toString();
+  }
+
+  /**
+   * Galois field multiplication of {@code p} and {@code q} with the
+   * generator polynomial {@code m} as the modulus.
+   *
+   * @param m The little-endian polynomial to use as the modulus when
+   *     multiplying p and q, with implicit "1" bit beyond the bottom bit.
+   */
+  private static int galoisFieldMultiply(int p, int q, int m) {
+    int summation = 0;
+
+    // Top bit is the x^0 place; each right-shift increments the degree of the
+    // current term.
+    int curTerm = MULTIPLICATIVE_IDENTITY;
+
+    // Iteratively multiply p by x mod m as we go to represent the q[i] term
+    // (of degree x^i) times p.
+    int px = p;
+
+    while (curTerm != 0) {
+      if ((q & curTerm) != 0) {
+        summation ^= px;
+      }
+
+      // Bottom bit represents highest degree since we're little-endian; before
+      // we multiply by "x" for the next term, check bottom bit to know whether
+      // the resulting px will thus have a term matching the implicit "1" term
+      // of "m" and thus will need to subtract "m" after mutiplying by "x".
+      boolean hasMaxDegree = ((px & 1) != 0);
+      px >>>= 1;
+      if (hasMaxDegree) {
+        px ^= m;
+      }
+      curTerm >>>= 1;
+    }
+    return summation;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
index 43e377f..06ef8ac 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
@@ -104,6 +104,24 @@ public class DataChecksum implements Checksum {
     }
   }
 
+  /**
+   * @return the int representation of the polynomial associated with the
+   *     CRC {@code type}, suitable for use with further CRC arithmetic.
+   * @throws IOException if there is no CRC polynomial applicable
+   *     to the given {@code type}.
+   */
+  public static int getCrcPolynomialForType(Type type) throws IOException {
+    switch (type) {
+    case CRC32:
+      return CrcUtil.GZIP_POLYNOMIAL;
+    case CRC32C:
+      return CrcUtil.CASTAGNOLI_POLYNOMIAL;
+    default:
+      throw new IOException(
+          "No CRC polynomial could be associated with type: " + type);
+    }
+  }
+
   public static DataChecksum newDataChecksum(Type type, int bytesPerChecksum ) 
{
     if ( bytesPerChecksum <= 0 ) {
       return null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcComposer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcComposer.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcComposer.java
new file mode 100644
index 0000000..f08702e
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcComposer.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unittests for CrcComposer.
+ */
+public class TestCrcComposer {
+  @Rule
+  public Timeout globalTimeout = new Timeout(10000);
+
+  private Random rand = new Random(1234);
+
+  private DataChecksum.Type type = DataChecksum.Type.CRC32C;
+  private DataChecksum checksum = DataChecksum.newDataChecksum(
+      type, Integer.MAX_VALUE);
+  private int dataSize = 75;
+  private byte[] data = new byte[dataSize];
+  private int chunkSize = 10;
+  private int cellSize = 20;
+
+  private int fullCrc;
+  private int[] crcsByChunk;
+  private int[] crcsByCell;
+
+  private byte[] crcBytesByChunk;
+  private byte[] crcBytesByCell;
+
+  @Before
+  public void setup() throws IOException {
+    rand.nextBytes(data);
+    fullCrc = getRangeChecksum(data, 0, dataSize);
+
+    // 7 chunks of size chunkSize, 1 chunk of size (dataSize % chunkSize).
+    crcsByChunk = new int[8];
+    for (int i = 0; i < 7; ++i) {
+      crcsByChunk[i] = getRangeChecksum(data, i * chunkSize, chunkSize);
+    }
+    crcsByChunk[7] = getRangeChecksum(
+        data, (crcsByChunk.length - 1) * chunkSize, dataSize % chunkSize);
+
+    // 3 cells of size cellSize, 1 cell of size (dataSize % cellSize).
+    crcsByCell = new int[4];
+    for (int i = 0; i < 3; ++i) {
+      crcsByCell[i] = getRangeChecksum(data, i * cellSize, cellSize);
+    }
+    crcsByCell[3] = getRangeChecksum(
+        data, (crcsByCell.length - 1) * cellSize, dataSize % cellSize);
+
+    crcBytesByChunk = intArrayToByteArray(crcsByChunk);
+    crcBytesByCell = intArrayToByteArray(crcsByCell);
+  }
+
+  private int getRangeChecksum(byte[] buf, int offset, int length) {
+    checksum.reset();
+    checksum.update(buf, offset, length);
+    return (int) checksum.getValue();
+  }
+
+  private byte[] intArrayToByteArray(int[] values) throws IOException {
+    byte[] bytes = new byte[values.length * 4];
+    for (int i = 0; i < values.length; ++i) {
+      CrcUtil.writeInt(bytes, i * 4, values[i]);
+    }
+    return bytes;
+  }
+
+  @Test
+  public void testUnstripedIncorrectChunkSize() throws IOException {
+    CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize);
+
+    // If we incorrectly specify that all CRCs ingested correspond to chunkSize
+    // when the last CRC in the array actually corresponds to
+    // dataSize % chunkSize then we expect the resulting CRC to not be equal to
+    // the fullCrc.
+    digester.update(crcBytesByChunk, 0, crcBytesByChunk.length, chunkSize);
+    byte[] digest = digester.digest();
+    assertEquals(4, digest.length);
+    int calculatedCrc = CrcUtil.readInt(digest, 0);
+    assertNotEquals(fullCrc, calculatedCrc);
+  }
+
+  @Test
+  public void testUnstripedByteArray() throws IOException {
+    CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize);
+    digester.update(crcBytesByChunk, 0, crcBytesByChunk.length - 4, chunkSize);
+    digester.update(
+        crcBytesByChunk, crcBytesByChunk.length - 4, 4, dataSize % chunkSize);
+
+    byte[] digest = digester.digest();
+    assertEquals(4, digest.length);
+    int calculatedCrc = CrcUtil.readInt(digest, 0);
+    assertEquals(fullCrc, calculatedCrc);
+  }
+
+  @Test
+  public void testUnstripedDataInputStream() throws IOException {
+    CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize);
+    DataInputStream input =
+        new DataInputStream(new ByteArrayInputStream(crcBytesByChunk));
+    digester.update(input, crcsByChunk.length - 1, chunkSize);
+    digester.update(input, 1, dataSize % chunkSize);
+
+    byte[] digest = digester.digest();
+    assertEquals(4, digest.length);
+    int calculatedCrc = CrcUtil.readInt(digest, 0);
+    assertEquals(fullCrc, calculatedCrc);
+  }
+
+  @Test
+  public void testUnstripedSingleCrcs() throws IOException {
+    CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize);
+    for (int i = 0; i < crcsByChunk.length - 1; ++i) {
+      digester.update(crcsByChunk[i], chunkSize);
+    }
+    digester.update(crcsByChunk[crcsByChunk.length - 1], dataSize % chunkSize);
+
+    byte[] digest = digester.digest();
+    assertEquals(4, digest.length);
+    int calculatedCrc = CrcUtil.readInt(digest, 0);
+    assertEquals(fullCrc, calculatedCrc);
+  }
+
+  @Test
+  public void testStripedByteArray() throws IOException {
+    CrcComposer digester =
+        CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize);
+    digester.update(crcBytesByChunk, 0, crcBytesByChunk.length - 4, chunkSize);
+    digester.update(
+        crcBytesByChunk, crcBytesByChunk.length - 4, 4, dataSize % chunkSize);
+
+    byte[] digest = digester.digest();
+    assertArrayEquals(crcBytesByCell, digest);
+  }
+
+  @Test
+  public void testStripedDataInputStream() throws IOException {
+    CrcComposer digester =
+        CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize);
+    DataInputStream input =
+        new DataInputStream(new ByteArrayInputStream(crcBytesByChunk));
+    digester.update(input, crcsByChunk.length - 1, chunkSize);
+    digester.update(input, 1, dataSize % chunkSize);
+
+    byte[] digest = digester.digest();
+    assertArrayEquals(crcBytesByCell, digest);
+  }
+
+  @Test
+  public void testStripedSingleCrcs() throws IOException {
+    CrcComposer digester =
+        CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize);
+    for (int i = 0; i < crcsByChunk.length - 1; ++i) {
+      digester.update(crcsByChunk[i], chunkSize);
+    }
+    digester.update(crcsByChunk[crcsByChunk.length - 1], dataSize % chunkSize);
+
+    byte[] digest = digester.digest();
+    assertArrayEquals(crcBytesByCell, digest);
+  }
+
+  @Test
+  public void testMultiStageMixed() throws IOException {
+    CrcComposer digester =
+        CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize);
+
+    // First combine chunks into cells.
+    DataInputStream input =
+        new DataInputStream(new ByteArrayInputStream(crcBytesByChunk));
+    digester.update(input, crcsByChunk.length - 1, chunkSize);
+    digester.update(input, 1, dataSize % chunkSize);
+    byte[] digest = digester.digest();
+
+    // Second, individually combine cells into full crc.
+    digester =
+        CrcComposer.newCrcComposer(type, cellSize);
+    for (int i = 0; i < digest.length - 4; i += 4) {
+      int cellCrc = CrcUtil.readInt(digest, i);
+      digester.update(cellCrc, cellSize);
+    }
+    digester.update(digest, digest.length - 4, 4, dataSize % cellSize);
+    digest = digester.digest();
+    assertEquals(4, digest.length);
+    int calculatedCrc = CrcUtil.readInt(digest, 0);
+    assertEquals(fullCrc, calculatedCrc);
+  }
+
+  @Test
+  public void testUpdateMismatchesStripe() throws Exception {
+    CrcComposer digester =
+        CrcComposer.newStripedCrcComposer(type, chunkSize, cellSize);
+
+    digester.update(crcsByChunk[0], chunkSize);
+
+    // Going from chunkSize to chunkSize + cellSize will cross a cellSize
+    // boundary in a single CRC, which is not allowed, since we'd lack a
+    // CRC corresponding to the actual cellSize boundary.
+    LambdaTestUtils.intercept(
+        IOException.class,
+        "stripe",
+        () -> digester.update(crcsByChunk[1], cellSize));
+  }
+
+  @Test
+  public void testUpdateByteArrayLengthUnalignedWithCrcSize()
+      throws Exception {
+    CrcComposer digester = CrcComposer.newCrcComposer(type, chunkSize);
+
+    LambdaTestUtils.intercept(
+        IOException.class,
+        "length",
+        () -> digester.update(crcBytesByChunk, 0, 6, chunkSize));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcUtil.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcUtil.java
new file mode 100644
index 0000000..a98cb8a
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestCrcUtil.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unittests for CrcUtil.
+ */
+public class TestCrcUtil {
+  @Rule
+  public Timeout globalTimeout = new Timeout(10000);
+
+  private Random rand = new Random(1234);
+
+  @Test
+  public void testComposeCrc32() throws IOException {
+    byte[] data = new byte[64 * 1024];
+    rand.nextBytes(data);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32, 512, false);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32, 511, false);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32, 32 * 1024, false);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32, 32 * 1024 - 1, false);
+  }
+
+  @Test
+  public void testComposeCrc32c() throws IOException {
+    byte[] data = new byte[64 * 1024];
+    rand.nextBytes(data);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32C, 512, false);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32C, 511, false);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32C, 32 * 1024, false);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32C, 32 * 1024 - 1, false);
+  }
+
+  @Test
+  public void testComposeCrc32WithMonomial() throws IOException {
+    byte[] data = new byte[64 * 1024];
+    rand.nextBytes(data);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32, 512, true);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32, 511, true);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32, 32 * 1024, true);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32, 32 * 1024 - 1, true);
+  }
+
+  @Test
+  public void testComposeCrc32cWithMonomial() throws IOException {
+    byte[] data = new byte[64 * 1024];
+    rand.nextBytes(data);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32C, 512, true);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32C, 511, true);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32C, 32 * 1024, true);
+    doTestComposeCrc(data, DataChecksum.Type.CRC32C, 32 * 1024 - 1, true);
+  }
+
+  @Test
+  public void testComposeCrc32ZeroLength() throws IOException {
+    doTestComposeCrcZerolength(DataChecksum.Type.CRC32);
+  }
+
+  @Test
+  public void testComposeCrc32CZeroLength() throws IOException {
+    doTestComposeCrcZerolength(DataChecksum.Type.CRC32C);
+  }
+
+  /**
+   * Helper method to compare a DataChecksum-computed end-to-end CRC against
+   * a piecewise-computed CRC that uses CrcUtil.compose on "chunk CRCs"
+   * corresponding to ever {@code chunkSize} bytes.
+   */
+  private static void doTestComposeCrc(
+      byte[] data, DataChecksum.Type type, int chunkSize, boolean useMonomial)
+      throws IOException {
+    int crcPolynomial = DataChecksum.getCrcPolynomialForType(type);
+
+    // Get full end-to-end CRC in a single shot first.
+    DataChecksum checksum = DataChecksum.newDataChecksum(
+        type, Integer.MAX_VALUE);
+    checksum.update(data, 0, data.length);
+    int fullCrc = (int) checksum.getValue();
+
+    // Now compute CRCs of each chunk individually first, and compose them in a
+    // second pass to compare to the end-to-end CRC.
+    int compositeCrc = 0;
+    int crcMonomial =
+        useMonomial ? CrcUtil.getMonomial(chunkSize, crcPolynomial) : 0;
+    for (int offset = 0;
+        offset + chunkSize <= data.length;
+        offset += chunkSize) {
+      checksum.reset();
+      checksum.update(data, offset, chunkSize);
+      int partialCrc = (int) checksum.getValue();
+      if (useMonomial) {
+        compositeCrc = CrcUtil.composeWithMonomial(
+            compositeCrc, partialCrc, crcMonomial, crcPolynomial);
+      } else {
+        compositeCrc = CrcUtil.compose(
+            compositeCrc, partialCrc, chunkSize, crcPolynomial);
+      }
+    }
+
+    // There may be a final partial chunk smaller than chunkSize.
+    int partialChunkSize = data.length % chunkSize;
+    if (partialChunkSize > 0) {
+      checksum.reset();
+      checksum.update(data, data.length - partialChunkSize, partialChunkSize);
+      int partialCrc = (int) checksum.getValue();
+      compositeCrc = CrcUtil.compose(
+          compositeCrc, partialCrc, partialChunkSize, crcPolynomial);
+    }
+    assertEquals(
+        String.format(
+            "Using CRC type '%s' with crcPolynomial '0x%08x' and chunkSize 
'%d'"
+            + ", expected '0x%08x', got '0x%08x'",
+            type, crcPolynomial, chunkSize, fullCrc, compositeCrc),
+        fullCrc,
+        compositeCrc);
+  }
+
+  /**
+   * Helper method for testing the behavior of composing a CRC with a
+   * zero-length second CRC.
+   */
+  private static void doTestComposeCrcZerolength(DataChecksum.Type type)
+      throws IOException {
+    // Without loss of generality, we can pick any integer as our fake crcA
+    // even if we don't happen to know the preimage.
+    int crcA = 0xCAFEBEEF;
+    int crcPolynomial = DataChecksum.getCrcPolynomialForType(type);
+    DataChecksum checksum = DataChecksum.newDataChecksum(
+        type, Integer.MAX_VALUE);
+    int crcB = (int) checksum.getValue();
+    assertEquals(crcA, CrcUtil.compose(crcA, crcB, 0, crcPolynomial));
+
+    int monomial = CrcUtil.getMonomial(0, crcPolynomial);
+    assertEquals(
+        crcA, CrcUtil.composeWithMonomial(crcA, crcB, monomial, 
crcPolynomial));
+  }
+
+  @Test
+  public void testIntSerialization() throws IOException {
+    byte[] bytes = CrcUtil.intToBytes(0xCAFEBEEF);
+    assertEquals(0xCAFEBEEF, CrcUtil.readInt(bytes, 0));
+
+    bytes = new byte[8];
+    CrcUtil.writeInt(bytes, 0, 0xCAFEBEEF);
+    assertEquals(0xCAFEBEEF, CrcUtil.readInt(bytes, 0));
+    CrcUtil.writeInt(bytes, 4, 0xABCDABCD);
+    assertEquals(0xABCDABCD, CrcUtil.readInt(bytes, 4));
+
+    // Assert big-endian format for general Java consistency.
+    assertEquals(0xBEEFABCD, CrcUtil.readInt(bytes, 2));
+  }
+
+  @Test
+  public void testToSingleCrcStringBadLength()
+      throws Exception {
+    LambdaTestUtils.intercept(
+        IOException.class,
+        "length",
+        () -> CrcUtil.toSingleCrcString(new byte[8]));
+  }
+
+  @Test
+  public void testToSingleCrcString() throws IOException {
+    byte[] buf = CrcUtil.intToBytes(0xcafebeef);
+    assertEquals(
+        "0xcafebeef", CrcUtil.toSingleCrcString(buf));
+  }
+
+  @Test
+  public void testToMultiCrcStringBadLength()
+      throws Exception {
+    LambdaTestUtils.intercept(
+        IOException.class,
+        "length",
+        () -> CrcUtil.toMultiCrcString(new byte[6]));
+  }
+
+  @Test
+  public void testToMultiCrcStringMultipleElements()
+      throws IOException {
+    byte[] buf = new byte[12];
+    CrcUtil.writeInt(buf, 0, 0xcafebeef);
+    CrcUtil.writeInt(buf, 4, 0xababcccc);
+    CrcUtil.writeInt(buf, 8, 0xddddefef);
+    assertEquals(
+        "[0xcafebeef, 0xababcccc, 0xddddefef]",
+        CrcUtil.toMultiCrcString(buf));
+  }
+
+  @Test
+  public void testToMultiCrcStringSingleElement()
+      throws IOException {
+    byte[] buf = new byte[4];
+    CrcUtil.writeInt(buf, 0, 0xcafebeef);
+    assertEquals(
+        "[0xcafebeef]",
+        CrcUtil.toMultiCrcString(buf));
+  }
+
+  @Test
+  public void testToMultiCrcStringNoElements()
+      throws IOException {
+    assertEquals(
+        "[]",
+        CrcUtil.toMultiCrcString(new byte[0]));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java
index 0138195..d07d5a2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java
@@ -130,9 +130,9 @@ public class Hdfs extends AbstractFileSystem {
   }
 
   @Override
-  public FileChecksum getFileChecksum(Path f) 
+  public FileChecksum getFileChecksum(Path f)
       throws IOException, UnresolvedLinkException {
-    return dfs.getFileChecksum(getUriPath(f), Long.MAX_VALUE);
+    return dfs.getFileChecksumWithCombineMode(getUriPath(f), Long.MAX_VALUE);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 0875328..09154d0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -76,6 +77,7 @@ import org.apache.hadoop.fs.HdfsBlockLocation;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.ChecksumCombineMode;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
@@ -1753,18 +1755,8 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
     return encryptionKey;
   }
 
-  /**
-   * Get the checksum of the whole file or a range of the file. Note that the
-   * range always starts from the beginning of the file. The file can be
-   * in replicated form, or striped mode. It can be used to checksum and 
compare
-   * two replicated files, or two striped files, but not applicable for two
-   * files of different block layout forms.
-   * @param src The file path
-   * @param length the length of the range, i.e., the range is [0, length]
-   * @return The checksum
-   * @see DistributedFileSystem#getFileChecksum(Path)
-   */
-  public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
+  private FileChecksum getFileChecksumInternal(
+      String src, long length, ChecksumCombineMode combineMode)
       throws IOException {
     checkOpen();
     Preconditions.checkArgument(length >= 0);
@@ -1779,15 +1771,51 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
 
     maker = ecPolicy != null ?
         new FileChecksumHelper.StripedFileNonStripedChecksumComputer(src,
-            length, blockLocations, namenode, this, ecPolicy) :
+            length, blockLocations, namenode, this, ecPolicy, combineMode) :
         new FileChecksumHelper.ReplicatedFileChecksumComputer(src, length,
-            blockLocations, namenode, this);
+            blockLocations, namenode, this, combineMode);
 
     maker.compute();
 
     return maker.getFileChecksum();
   }
 
+  /**
+   * Get the checksum of the whole file or a range of the file. Note that the
+   * range always starts from the beginning of the file. The file can be
+   * in replicated form, or striped mode. Depending on the
+   * dfs.checksum.combine.mode, checksums may or may not be comparable between
+   * different block layout forms.
+   *
+   * @param src The file path
+   * @param length the length of the range, i.e., the range is [0, length]
+   * @return The checksum
+   * @see DistributedFileSystem#getFileChecksum(Path)
+   */
+  public FileChecksum getFileChecksumWithCombineMode(String src, long length)
+      throws IOException {
+    ChecksumCombineMode combineMode = getConf().getChecksumCombineMode();
+    return getFileChecksumInternal(src, length, combineMode);
+  }
+
+  /**
+   * Get the checksum of the whole file or a range of the file. Note that the
+   * range always starts from the beginning of the file. The file can be
+   * in replicated form, or striped mode. It can be used to checksum and 
compare
+   * two replicated files, or two striped files, but not applicable for two
+   * files of different block layout forms.
+   *
+   * @param src The file path
+   * @param length the length of the range, i.e., the range is [0, length]
+   * @return The checksum
+   * @see DistributedFileSystem#getFileChecksum(Path)
+   */
+  public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
+      throws IOException {
+    return (MD5MD5CRC32FileChecksum) getFileChecksumInternal(
+        src, length, ChecksumCombineMode.MD5MD5CRC);
+  }
+
   protected LocatedBlocks getBlockLocations(String src,
                                             long length) throws IOException {
     //get block locations for the file range

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 03cb317..1e9ed09 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -1681,7 +1681,8 @@ public class DistributedFileSystem extends FileSystem
     return new FileSystemLinkResolver<FileChecksum>() {
       @Override
       public FileChecksum doCall(final Path p) throws IOException {
-        return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE);
+        return dfs.getFileChecksumWithCombineMode(
+            getPathName(p), Long.MAX_VALUE);
       }
 
       @Override
@@ -1701,7 +1702,7 @@ public class DistributedFileSystem extends FileSystem
     return new FileSystemLinkResolver<FileChecksum>() {
       @Override
       public FileChecksum doCall(final Path p) throws IOException {
-        return dfs.getFileChecksum(getPathName(p), length);
+        return dfs.getFileChecksumWithCombineMode(getPathName(p), length);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
index 72cf147..8f807d5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
@@ -17,9 +17,14 @@
  */
 package org.apache.hadoop.hdfs;
 
+import org.apache.hadoop.fs.CompositeCrcFileChecksum;
+import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
-import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
+import org.apache.hadoop.fs.Options.ChecksumCombineMode;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumType;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@@ -41,6 +46,8 @@ import 
org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.util.CrcComposer;
+import org.apache.hadoop.util.CrcUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,9 +74,11 @@ final class FileChecksumHelper {
     private final long length;
     private final DFSClient client;
     private final ClientProtocol namenode;
-    private final DataOutputBuffer md5out = new DataOutputBuffer();
+    private final ChecksumCombineMode combineMode;
+    private final BlockChecksumType blockChecksumType;
+    private final DataOutputBuffer blockChecksumBuf = new DataOutputBuffer();
 
-    private MD5MD5CRC32FileChecksum fileChecksum;
+    private FileChecksum fileChecksum;
     private LocatedBlocks blockLocations;
 
     private int timeout;
@@ -88,12 +97,24 @@ final class FileChecksumHelper {
     FileChecksumComputer(String src, long length,
                          LocatedBlocks blockLocations,
                          ClientProtocol namenode,
-                         DFSClient client) throws IOException {
+                         DFSClient client,
+                         ChecksumCombineMode combineMode) throws IOException {
       this.src = src;
       this.length = length;
       this.blockLocations = blockLocations;
       this.namenode = namenode;
       this.client = client;
+      this.combineMode = combineMode;
+      switch (combineMode) {
+      case MD5MD5CRC:
+        this.blockChecksumType = BlockChecksumType.MD5CRC;
+        break;
+      case COMPOSITE_CRC:
+        this.blockChecksumType = BlockChecksumType.COMPOSITE_CRC;
+        break;
+      default:
+        throw new IOException("Unknown ChecksumCombineMode: " + combineMode);
+      }
 
       this.remaining = length;
 
@@ -121,11 +142,19 @@ final class FileChecksumHelper {
       return namenode;
     }
 
-    DataOutputBuffer getMd5out() {
-      return md5out;
+    ChecksumCombineMode getCombineMode() {
+      return combineMode;
+    }
+
+    BlockChecksumType getBlockChecksumType() {
+      return blockChecksumType;
+    }
+
+    DataOutputBuffer getBlockChecksumBuf() {
+      return blockChecksumBuf;
     }
 
-    MD5MD5CRC32FileChecksum getFileChecksum() {
+    FileChecksum getFileChecksum() {
       return fileChecksum;
     }
 
@@ -226,17 +255,31 @@ final class FileChecksumHelper {
     }
 
     /**
-     * Compute and aggregate block checksums block by block.
+     * Compute block checksums block by block and append the raw bytes of the
+     * block checksums into getBlockChecksumBuf().
+     *
      * @throws IOException
      */
     abstract void checksumBlocks() throws IOException;
 
     /**
-     * Make final file checksum result given the computing process done.
+     * Make final file checksum result given the per-block or per-block-group
+     * checksums collected into getBlockChecksumBuf().
      */
-    MD5MD5CRC32FileChecksum makeFinalResult() {
+    FileChecksum makeFinalResult() throws IOException {
+      switch (combineMode) {
+      case MD5MD5CRC:
+        return makeMd5CrcResult();
+      case COMPOSITE_CRC:
+        return makeCompositeCrcResult();
+      default:
+        throw new IOException("Unknown ChecksumCombineMode: " + combineMode);
+      }
+    }
+
+    FileChecksum makeMd5CrcResult() {
       //compute file MD5
-      final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData());
+      final MD5Hash fileMD5 = MD5Hash.digest(blockChecksumBuf.getData());
       switch (crcType) {
       case CRC32:
         return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,
@@ -250,6 +293,58 @@ final class FileChecksumHelper {
       }
     }
 
+    FileChecksum makeCompositeCrcResult() throws IOException {
+      long blockSizeHint = 0;
+      if (locatedBlocks.size() > 0) {
+        blockSizeHint = locatedBlocks.get(0).getBlockSize();
+      }
+      CrcComposer crcComposer =
+          CrcComposer.newCrcComposer(getCrcType(), blockSizeHint);
+      byte[] blockChecksumBytes = blockChecksumBuf.getData();
+
+      long sumBlockLengths = 0;
+      for (int i = 0; i < locatedBlocks.size() - 1; ++i) {
+        LocatedBlock block = locatedBlocks.get(i);
+        // For everything except the last LocatedBlock, we expect 
getBlockSize()
+        // to accurately reflect the number of file bytes digested in the block
+        // checksum.
+        sumBlockLengths += block.getBlockSize();
+        int blockCrc = CrcUtil.readInt(blockChecksumBytes, i * 4);
+
+        crcComposer.update(blockCrc, block.getBlockSize());
+        LOG.debug(
+            "Added blockCrc 0x{} for block index {} of size {}",
+            Integer.toString(blockCrc, 16), i, block.getBlockSize());
+      }
+
+      // NB: In some cases the located blocks have their block size adjusted
+      // explicitly based on the requested length, but not all cases;
+      // these numbers may or may not reflect actual sizes on disk.
+      long reportedLastBlockSize =
+          blockLocations.getLastLocatedBlock().getBlockSize();
+      long consumedLastBlockLength = reportedLastBlockSize;
+      if (length - sumBlockLengths < reportedLastBlockSize) {
+        LOG.warn(
+            "Last block length {} is less than reportedLastBlockSize {}",
+            length - sumBlockLengths, reportedLastBlockSize);
+        consumedLastBlockLength = length - sumBlockLengths;
+      }
+      // NB: blockChecksumBytes.length may be much longer than actual bytes
+      // written into the DataOutput.
+      int lastBlockCrc = CrcUtil.readInt(
+          blockChecksumBytes, 4 * (locatedBlocks.size() - 1));
+      crcComposer.update(lastBlockCrc, consumedLastBlockLength);
+      LOG.debug(
+          "Added lastBlockCrc 0x{} for block index {} of size {}",
+          Integer.toString(lastBlockCrc, 16),
+          locatedBlocks.size() - 1,
+          consumedLastBlockLength);
+
+      int compositeCrc = CrcUtil.readInt(crcComposer.digest(), 0);
+      return new CompositeCrcFileChecksum(
+          compositeCrc, getCrcType(), bytesPerCRC);
+    }
+
     /**
      * Create and return a sender given an IO stream pair.
      */
@@ -267,6 +362,117 @@ final class FileChecksumHelper {
         IOUtils.closeStream(pair.out);
       }
     }
+
+    /**
+     * Parses out various checksum properties like bytesPerCrc, crcPerBlock,
+     * and crcType from {@code checksumData} and either stores them as the
+     * authoritative value or compares them to a previously extracted value
+     * to check comppatibility.
+     *
+     * @param checksumData response from the datanode
+     * @param locatedBlock the block corresponding to the response
+     * @param datanode the datanode which produced the response
+     * @param blockIdx the block or block-group index of the response
+     */
+    void extractChecksumProperties(
+        OpBlockChecksumResponseProto checksumData,
+        LocatedBlock locatedBlock,
+        DatanodeInfo datanode,
+        int blockIdx)
+        throws IOException {
+      //read byte-per-checksum
+      final int bpc = checksumData.getBytesPerCrc();
+      if (blockIdx == 0) { //first block
+        setBytesPerCRC(bpc);
+      } else if (bpc != getBytesPerCRC()) {
+        if (getBlockChecksumType() == BlockChecksumType.COMPOSITE_CRC) {
+          LOG.warn(
+              "Current bytesPerCRC={} doesn't match next bpc={}, but "
+              + "continuing anyway because we're using COMPOSITE_CRC. "
+              + "If trying to preserve CHECKSUMTYPE, only the current "
+              + "bytesPerCRC will be preserved.", getBytesPerCRC(), bpc);
+        } else {
+          throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+              + " but bytesPerCRC=" + getBytesPerCRC());
+        }
+      }
+
+      //read crc-per-block
+      final long cpb = checksumData.getCrcPerBlock();
+      if (getLocatedBlocks().size() > 1 && blockIdx == 0) {
+        setCrcPerBlock(cpb);
+      }
+
+      // read crc-type
+      final DataChecksum.Type ct;
+      if (checksumData.hasCrcType()) {
+        ct = PBHelperClient.convert(checksumData.getCrcType());
+      } else {
+        LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
+            "inferring checksum by reading first byte");
+        ct = getClient().inferChecksumTypeByReading(locatedBlock, datanode);
+      }
+
+      if (blockIdx == 0) {
+        setCrcType(ct);
+      } else if (getCrcType() != DataChecksum.Type.MIXED &&
+          getCrcType() != ct) {
+        if (getBlockChecksumType() == BlockChecksumType.COMPOSITE_CRC) {
+          throw new IOException(
+              "DataChecksum.Type.MIXED is not supported for COMPOSITE_CRC");
+        } else {
+          // if crc types are mixed in a file
+          setCrcType(DataChecksum.Type.MIXED);
+        }
+      }
+
+      if (blockIdx == 0) {
+        LOG.debug("set bytesPerCRC={}, crcPerBlock={}",
+            getBytesPerCRC(), getCrcPerBlock());
+      }
+    }
+
+    /**
+     * Parses out the raw blockChecksum bytes from {@code checksumData}
+     * according to the blockChecksumType and populates the cumulative
+     * blockChecksumBuf with it.
+     *
+     * @return a debug-string representation of the parsed checksum if
+     *     debug is enabled, otherwise null.
+     */
+    String populateBlockChecksumBuf(OpBlockChecksumResponseProto checksumData)
+        throws IOException {
+      String blockChecksumForDebug = null;
+      switch (getBlockChecksumType()) {
+      case MD5CRC:
+        //read md5
+        final MD5Hash md5 = new MD5Hash(
+            checksumData.getBlockChecksum().toByteArray());
+        md5.write(getBlockChecksumBuf());
+        if (LOG.isDebugEnabled()) {
+          blockChecksumForDebug = md5.toString();
+        }
+        break;
+      case COMPOSITE_CRC:
+        BlockChecksumType returnedType = PBHelperClient.convert(
+            checksumData.getBlockChecksumOptions().getBlockChecksumType());
+        if (returnedType != BlockChecksumType.COMPOSITE_CRC) {
+          throw new IOException(String.format(
+              "Unexpected blockChecksumType '%s', expecting COMPOSITE_CRC",
+              returnedType));
+        }
+        byte[] crcBytes = checksumData.getBlockChecksum().toByteArray();
+        if (LOG.isDebugEnabled()) {
+          blockChecksumForDebug = CrcUtil.toSingleCrcString(crcBytes);
+        }
+        getBlockChecksumBuf().write(crcBytes);
+        break;
+      default:
+        throw new IOException(
+            "Unknown BlockChecksumType: " + getBlockChecksumType());
+      }
+      return blockChecksumForDebug;
+    }
   }
 
   /**
@@ -278,8 +484,10 @@ final class FileChecksumHelper {
     ReplicatedFileChecksumComputer(String src, long length,
                                    LocatedBlocks blockLocations,
                                    ClientProtocol namenode,
-                                   DFSClient client) throws IOException {
-      super(src, length, blockLocations, namenode, client);
+                                   DFSClient client,
+                                   ChecksumCombineMode combineMode)
+        throws IOException {
+      super(src, length, blockLocations, namenode, client, combineMode);
     }
 
     @Override
@@ -295,7 +503,8 @@ final class FileChecksumHelper {
         LocatedBlock locatedBlock = getLocatedBlocks().get(blockIdx);
 
         if (!checksumBlock(locatedBlock)) {
-          throw new IOException("Fail to get block MD5 for " + locatedBlock);
+          throw new PathIOException(
+              getSrc(), "Fail to get block MD5 for " + locatedBlock);
         }
       }
     }
@@ -368,9 +577,11 @@ final class FileChecksumHelper {
         LOG.debug("write to {}: {}, block={}", datanode,
             Op.BLOCK_CHECKSUM, block);
 
-        // get block MD5
-        createSender(pair).blockChecksum(block,
-            locatedBlock.getBlockToken());
+        // get block checksum
+        createSender(pair).blockChecksum(
+            block,
+            locatedBlock.getBlockToken(),
+            new BlockChecksumOptions(getBlockChecksumType()));
 
         final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(
             PBHelperClient.vintPrefixed(pair.in));
@@ -381,51 +592,11 @@ final class FileChecksumHelper {
 
         OpBlockChecksumResponseProto checksumData =
             reply.getChecksumResponse();
-
-        //read byte-per-checksum
-        final int bpc = checksumData.getBytesPerCrc();
-        if (blockIdx == 0) { //first block
-          setBytesPerCRC(bpc);
-        } else if (bpc != getBytesPerCRC()) {
-          throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
-              + " but bytesPerCRC=" + getBytesPerCRC());
-        }
-
-        //read crc-per-block
-        final long cpb = checksumData.getCrcPerBlock();
-        if (getLocatedBlocks().size() > 1 && blockIdx == 0) {
-          setCrcPerBlock(cpb);
-        }
-
-        //read md5
-        final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray());
-        md5.write(getMd5out());
-
-        // read crc-type
-        final DataChecksum.Type ct;
-        if (checksumData.hasCrcType()) {
-          ct = PBHelperClient.convert(checksumData.getCrcType());
-        } else {
-          LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
-              "inferring checksum by reading first byte");
-          ct = getClient().inferChecksumTypeByReading(locatedBlock, datanode);
-        }
-
-        if (blockIdx == 0) { // first block
-          setCrcType(ct);
-        } else if (getCrcType() != DataChecksum.Type.MIXED
-            && getCrcType() != ct) {
-          // if crc types are mixed in a file
-          setCrcType(DataChecksum.Type.MIXED);
-        }
-
-        if (LOG.isDebugEnabled()) {
-          if (blockIdx == 0) {
-            LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
-                + ", crcPerBlock=" + getCrcPerBlock());
-          }
-          LOG.debug("got reply from " + datanode + ": md5=" + md5);
-        }
+        extractChecksumProperties(
+            checksumData, locatedBlock, datanode, blockIdx);
+        String blockChecksumForDebug = populateBlockChecksumBuf(checksumData);
+        LOG.debug("got reply from {}: blockChecksum={}, blockChecksumType={}",
+            datanode, blockChecksumForDebug, getBlockChecksumType());
       }
     }
   }
@@ -442,9 +613,10 @@ final class FileChecksumHelper {
                                           LocatedBlocks blockLocations,
                                           ClientProtocol namenode,
                                           DFSClient client,
-                                          ErasureCodingPolicy ecPolicy)
+                                          ErasureCodingPolicy ecPolicy,
+                                          ChecksumCombineMode combineMode)
         throws IOException {
-      super(src, length, blockLocations, namenode, client);
+      super(src, length, blockLocations, namenode, client, combineMode);
 
       this.ecPolicy = ecPolicy;
     }
@@ -464,7 +636,8 @@ final class FileChecksumHelper {
         LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock;
 
         if (!checksumBlockGroup(blockGroup)) {
-          throw new IOException("Fail to get block MD5 for " + locatedBlock);
+          throw new PathIOException(
+              getSrc(), "Fail to get block checksum for " + locatedBlock);
         }
       }
     }
@@ -519,16 +692,18 @@ final class FileChecksumHelper {
                              StripedBlockInfo stripedBlockInfo,
                              DatanodeInfo datanode,
                              long requestedNumBytes) throws IOException {
-
       try (IOStreamPair pair = getClient().connectToDN(datanode,
           getTimeout(), blockGroup.getBlockToken())) {
 
         LOG.debug("write to {}: {}, blockGroup={}",
             datanode, Op.BLOCK_GROUP_CHECKSUM, blockGroup);
 
-        // get block MD5
-        createSender(pair).blockGroupChecksum(stripedBlockInfo,
-            blockGroup.getBlockToken(), requestedNumBytes);
+        // get block group checksum
+        createSender(pair).blockGroupChecksum(
+            stripedBlockInfo,
+            blockGroup.getBlockToken(),
+            requestedNumBytes,
+            new BlockChecksumOptions(getBlockChecksumType()));
 
         BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(
             PBHelperClient.vintPrefixed(pair.in));
@@ -538,54 +713,10 @@ final class FileChecksumHelper {
         DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
 
         OpBlockChecksumResponseProto checksumData = 
reply.getChecksumResponse();
-
-        //read byte-per-checksum
-        final int bpc = checksumData.getBytesPerCrc();
-        if (bgIdx == 0) { //first block
-          setBytesPerCRC(bpc);
-        } else {
-          if (bpc != getBytesPerCRC()) {
-            throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
-                + " but bytesPerCRC=" + getBytesPerCRC());
-          }
-        }
-
-        //read crc-per-block
-        final long cpb = checksumData.getCrcPerBlock();
-        if (getLocatedBlocks().size() > 1 && bgIdx == 0) { // first block
-          setCrcPerBlock(cpb);
-        }
-
-        //read md5
-        final MD5Hash md5 = new MD5Hash(
-            checksumData.getMd5().toByteArray());
-        md5.write(getMd5out());
-
-        // read crc-type
-        final DataChecksum.Type ct;
-        if (checksumData.hasCrcType()) {
-          ct = PBHelperClient.convert(checksumData.getCrcType());
-        } else {
-          LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
-              "inferring checksum by reading first byte");
-          ct = getClient().inferChecksumTypeByReading(blockGroup, datanode);
-        }
-
-        if (bgIdx == 0) {
-          setCrcType(ct);
-        } else if (getCrcType() != DataChecksum.Type.MIXED &&
-            getCrcType() != ct) {
-          // if crc types are mixed in a file
-          setCrcType(DataChecksum.Type.MIXED);
-        }
-
-        if (LOG.isDebugEnabled()) {
-          if (bgIdx == 0) {
-            LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
-                + ", crcPerBlock=" + getCrcPerBlock());
-          }
-          LOG.debug("got reply from " + datanode + ": md5=" + md5);
-        }
+        extractChecksumProperties(checksumData, blockGroup, datanode, bgIdx);
+        String blockChecksumForDebug = populateBlockChecksumBuf(checksumData);
+        LOG.debug("got reply from {}: blockChecksum={}, blockChecksumType={}",
+            datanode, blockChecksumForDebug, getBlockChecksumType());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 52a7cd0..f2cec31 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -120,6 +120,8 @@ public interface HdfsClientConfigKeys {
   String  DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
   String  DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum";
   int     DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
+  String  DFS_CHECKSUM_COMBINE_MODE_KEY = "dfs.checksum.combine.mode";
+  String  DFS_CHECKSUM_COMBINE_MODE_DEFAULT = "MD5MD5CRC";
   String  DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY =
       "dfs.datanode.socket.write.timeout";
   String  DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index 2703617..e63e3f5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.Options.ChecksumCombineMode;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.ReplicaAccessorBuilder;
@@ -38,6 +39,8 @@ import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_KEY;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_KEY;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
@@ -106,6 +109,7 @@ public class DfsClientConf {
   private final int datanodeSocketWriteTimeout;
   private final int ioBufferSize;
   private final ChecksumOpt defaultChecksumOpt;
+  private final ChecksumCombineMode checksumCombineMode;
   private final int writePacketSize;
   private final int writeMaxPackets;
   private final ByteArrayManager.Conf writeByteArrayManagerConf;
@@ -177,6 +181,7 @@ public class DfsClientConf {
         CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
         CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
     defaultChecksumOpt = getChecksumOptFromConf(conf);
+    checksumCombineMode = getChecksumCombineModeFromConf(conf);
     dataTransferTcpNoDelay = conf.getBoolean(
         DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY,
         DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT);
@@ -300,6 +305,21 @@ public class DfsClientConf {
     }
   }
 
+  private static ChecksumCombineMode getChecksumCombineModeFromConf(
+      Configuration conf) {
+    final String mode = conf.get(
+        DFS_CHECKSUM_COMBINE_MODE_KEY,
+        DFS_CHECKSUM_COMBINE_MODE_DEFAULT);
+    try {
+      return ChecksumCombineMode.valueOf(mode);
+    } catch(IllegalArgumentException iae) {
+      LOG.warn("Bad checksum combine mode: {}. Using default {}", mode,
+               DFS_CHECKSUM_COMBINE_MODE_DEFAULT);
+      return ChecksumCombineMode.valueOf(
+          DFS_CHECKSUM_COMBINE_MODE_DEFAULT);
+    }
+  }
+
   // Construct a checksum option from conf
   public static ChecksumOpt getChecksumOptFromConf(Configuration conf) {
     DataChecksum.Type type = getChecksumType(conf);
@@ -393,6 +413,13 @@ public class DfsClientConf {
   }
 
   /**
+   * @return the checksumCombineMode
+   */
+  public ChecksumCombineMode getChecksumCombineMode() {
+    return checksumCombineMode;
+  }
+
+  /**
    * @return the writePacketSize
    */
   public int getWritePacketSize() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumOptions.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumOptions.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumOptions.java
new file mode 100644
index 0000000..82e07d4
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumOptions.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Encapsulates various options related to how fine-grained data checksums are
+ * combined into block-level checksums.
+ */
+@InterfaceAudience.Private
+public class BlockChecksumOptions {
+  private final BlockChecksumType blockChecksumType;
+  private final long stripeLength;
+
+  public BlockChecksumOptions(
+      BlockChecksumType blockChecksumType, long stripeLength) {
+    this.blockChecksumType = blockChecksumType;
+    this.stripeLength = stripeLength;
+  }
+
+  public BlockChecksumOptions(BlockChecksumType blockChecksumType) {
+    this(blockChecksumType, 0);
+  }
+
+  public BlockChecksumType getBlockChecksumType() {
+    return blockChecksumType;
+  }
+
+  public long getStripeLength() {
+    return stripeLength;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("blockChecksumType=%s, stripedLength=%d",
+        blockChecksumType, stripeLength);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumType.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumType.java
new file mode 100644
index 0000000..cc33660
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockChecksumType.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Algorithms/types denoting how block-level checksums are computed using
+ * lower-level chunk checksums/CRCs.
+ */
+@InterfaceAudience.Private
+public enum BlockChecksumType {
+  MD5CRC,  // BlockChecksum obtained by taking the MD5 digest of chunk CRCs
+  COMPOSITE_CRC  // Chunk-independent CRC, optionally striped
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index fe20c37..384f1dc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
@@ -214,11 +215,13 @@ public interface DataTransferProtocol {
    *
    * @param blk a block.
    * @param blockToken security token for accessing the block.
+   * @param blockChecksumOptions determines how the block-level checksum is
+   *     computed from underlying block metadata.
    * @throws IOException
    */
   void blockChecksum(ExtendedBlock blk,
-      Token<BlockTokenIdentifier> blockToken) throws IOException;
-
+      Token<BlockTokenIdentifier> blockToken,
+      BlockChecksumOptions blockChecksumOptions) throws IOException;
 
   /**
    * Get striped block group checksum (MD5 of CRC32).
@@ -227,9 +230,12 @@ public interface DataTransferProtocol {
    * @param blockToken security token for accessing the block.
    * @param requestedNumBytes requested number of bytes in the block group
    *                          to compute the checksum.
+   * @param blockChecksumOptions determines how the block-level checksum is
+   *     computed from underlying block metadata.
    * @throws IOException
    */
   void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
           Token<BlockTokenIdentifier> blockToken,
-          long requestedNumBytes) throws IOException;
+          long requestedNumBytes,
+          BlockChecksumOptions blockChecksumOptions) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index 8a8d20d..7526f96 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
@@ -267,9 +268,11 @@ public class Sender implements DataTransferProtocol {
 
   @Override
   public void blockChecksum(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken) throws IOException {
+      final Token<BlockTokenIdentifier> blockToken,
+      BlockChecksumOptions blockChecksumOptions) throws IOException {
     OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder()
         .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+        .setBlockChecksumOptions(PBHelperClient.convert(blockChecksumOptions))
         .build();
 
     send(out, Op.BLOCK_CHECKSUM, proto);
@@ -277,8 +280,9 @@ public class Sender implements DataTransferProtocol {
 
   @Override
   public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
-      Token<BlockTokenIdentifier> blockToken, long requestedNumBytes)
-          throws IOException {
+      Token<BlockTokenIdentifier> blockToken,
+      long requestedNumBytes,
+      BlockChecksumOptions blockChecksumOptions) throws IOException {
     OpBlockGroupChecksumProto proto = OpBlockGroupChecksumProto.newBuilder()
         .setHeader(DataTransferProtoUtil.buildBaseHeader(
             stripedBlockInfo.getBlock(), blockToken))
@@ -291,6 +295,7 @@ public class Sender implements DataTransferProtocol {
         .setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
             stripedBlockInfo.getErasureCodingPolicy()))
         .setRequestedNumBytes(requestedNumBytes)
+        .setBlockChecksumOptions(PBHelperClient.convert(blockChecksumOptions))
         .build();
 
     send(out, Op.BLOCK_GROUP_CHECKSUM, proto);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index d9e7aa0..ff9733c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -61,6 +61,8 @@ import org.apache.hadoop.hdfs.inotify.EventBatch;
 import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumType;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.BlockType;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -247,6 +249,48 @@ public class PBHelperClient {
     return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
   }
 
+  public static HdfsProtos.BlockChecksumTypeProto convert(
+      BlockChecksumType type) {
+    switch(type) {
+    case MD5CRC:
+      return HdfsProtos.BlockChecksumTypeProto.MD5CRC;
+    case COMPOSITE_CRC:
+      return HdfsProtos.BlockChecksumTypeProto.COMPOSITE_CRC;
+    default:
+      throw new IllegalStateException(
+          "BUG: BlockChecksumType not found, type=" + type);
+    }
+  }
+
+  public static BlockChecksumType convert(
+      HdfsProtos.BlockChecksumTypeProto blockChecksumTypeProto) {
+    switch(blockChecksumTypeProto) {
+    case MD5CRC:
+      return BlockChecksumType.MD5CRC;
+    case COMPOSITE_CRC:
+      return BlockChecksumType.COMPOSITE_CRC;
+    default:
+      throw new IllegalStateException(
+          "BUG: BlockChecksumTypeProto not found, type="
+          + blockChecksumTypeProto);
+    }
+  }
+
+  public static HdfsProtos.BlockChecksumOptionsProto convert(
+      BlockChecksumOptions options) {
+    return HdfsProtos.BlockChecksumOptionsProto.newBuilder()
+        .setBlockChecksumType(convert(options.getBlockChecksumType()))
+        .setStripeLength(options.getStripeLength())
+        .build();
+  }
+
+  public static BlockChecksumOptions convert(
+      HdfsProtos.BlockChecksumOptionsProto options) {
+    return new BlockChecksumOptions(
+        convert(options.getBlockChecksumType()),
+        options.getStripeLength());
+  }
+
   public static ExtendedBlockProto convert(final ExtendedBlock b) {
     if (b == null) return null;
     return ExtendedBlockProto.newBuilder().


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to