Repository: hadoop
Updated Branches:
  refs/heads/trunk 6cc59a09e -> 7c9cdad6d


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
index 2356201..384da54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
@@ -148,8 +148,9 @@ message OpCopyBlockProto {
   required BaseHeaderProto header = 1;
 }
 
-message OpBlockChecksumProto { 
+message OpBlockChecksumProto {
   required BaseHeaderProto header = 1;
+  optional BlockChecksumOptionsProto blockChecksumOptions = 2;
 }
 
 message OpBlockGroupChecksumProto {
@@ -160,6 +161,7 @@ message OpBlockGroupChecksumProto {
   required ErasureCodingPolicyProto ecPolicy = 4;
   repeated uint32 blockIndices = 5;
   required uint64 requestedNumBytes = 6;
+  optional BlockChecksumOptionsProto blockChecksumOptions = 7;
 }
 
 /**
@@ -313,8 +315,9 @@ message DNTransferAckProto {
 message OpBlockChecksumResponseProto {
   required uint32 bytesPerCrc = 1;
   required uint64 crcPerBlock = 2;
-  required bytes md5 = 3;
+  required bytes blockChecksum = 3;
   optional ChecksumTypeProto crcType = 4;
+  optional BlockChecksumOptionsProto blockChecksumOptions = 5;
 }
 
 message OpCustomProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index 29d0b4e..441b9d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@ -480,6 +480,27 @@ enum ChecksumTypeProto {
   CHECKSUM_CRC32C = 2;
 }
 
+enum BlockChecksumTypeProto {
+  MD5CRC = 1;  // BlockChecksum obtained by taking the MD5 digest of chunk CRCs
+  COMPOSITE_CRC = 2;  // Chunk-independent CRC, optionally striped
+}
+
+/**
+ * Algorithms/types denoting how block-level checksums are computed using
+ * lower-level chunk checksums/CRCs.
+ * These options should be kept in sync with
+ * org.apache.hadoop.hdfs.protocol.BlockChecksumOptions.
+ */
+message BlockChecksumOptionsProto {
+  optional BlockChecksumTypeProto blockChecksumType = 1 [default = MD5CRC];
+
+  // Only used if blockChecksumType specifies a striped format, such as
+  // COMPOSITE_CRC. If so, then the blockChecksum in the response is expected
+  // to be the concatenation of N crcs, where
+  // N == ((requestedLength - 1) / stripedLength) + 1
+  optional uint64 stripeLength = 2;
+}
+
 /**
  * HDFS Server Defaults
  */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index bab2e8d..5d2d1f8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -301,8 +301,9 @@ public abstract class Receiver implements 
DataTransferProtocol {
     TraceScope traceScope = continueTraceSpan(proto.getHeader(),
         proto.getClass().getSimpleName());
     try {
-    blockChecksum(PBHelperClient.convert(proto.getHeader().getBlock()),
-        PBHelperClient.convert(proto.getHeader().getToken()));
+      blockChecksum(PBHelperClient.convert(proto.getHeader().getBlock()),
+          PBHelperClient.convert(proto.getHeader().getToken()),
+          PBHelperClient.convert(proto.getBlockChecksumOptions()));
     } finally {
       if (traceScope != null) traceScope.close();
     }
@@ -325,7 +326,8 @@ public abstract class Receiver implements 
DataTransferProtocol {
     try {
       blockGroupChecksum(stripedBlockInfo,
           PBHelperClient.convert(proto.getHeader().getToken()),
-          proto.getRequestedNumBytes());
+          proto.getRequestedNumBytes(),
+          PBHelperClient.convert(proto.getBlockChecksumOptions()));
     } finally {
       if (traceScope != null) {
         traceScope.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
index e99911b..3388855 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
@@ -21,6 +21,8 @@ import com.google.common.base.Preconditions;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -32,6 +34,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import 
org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedBlockChecksumCompositeCrcReconstructor;
+import 
org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedBlockChecksumMd5CrcReconstructor;
 import 
org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedBlockChecksumReconstructor;
 import 
org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedReconstructionInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
@@ -40,6 +44,8 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.CrcComposer;
+import org.apache.hadoop.util.CrcUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,6 +76,7 @@ final class BlockChecksumHelper {
    */
   static abstract class AbstractBlockChecksumComputer {
     private final DataNode datanode;
+    private final BlockChecksumOptions blockChecksumOptions;
 
     private byte[] outBytes;
     private int bytesPerCRC = -1;
@@ -77,8 +84,11 @@ final class BlockChecksumHelper {
     private long crcPerBlock = -1;
     private int checksumSize = -1;
 
-    AbstractBlockChecksumComputer(DataNode datanode) throws IOException {
+    AbstractBlockChecksumComputer(
+        DataNode datanode,
+        BlockChecksumOptions blockChecksumOptions) throws IOException {
       this.datanode = datanode;
+      this.blockChecksumOptions = blockChecksumOptions;
     }
 
     abstract void compute() throws IOException;
@@ -92,6 +102,10 @@ final class BlockChecksumHelper {
       return datanode;
     }
 
+    BlockChecksumOptions getBlockChecksumOptions() {
+      return blockChecksumOptions;
+    }
+
     InputStream getBlockInputStream(ExtendedBlock block, long seekOffset)
         throws IOException {
       return datanode.data.getBlockInputStream(block, seekOffset);
@@ -155,8 +169,10 @@ final class BlockChecksumHelper {
     private DataChecksum checksum;
 
     BlockChecksumComputer(DataNode datanode,
-                          ExtendedBlock block) throws IOException {
-      super(datanode);
+                          ExtendedBlock block,
+                          BlockChecksumOptions blockChecksumOptions)
+        throws IOException {
+      super(datanode, blockChecksumOptions);
       this.block = block;
       this.requestLength = block.getNumBytes();
       Preconditions.checkArgument(requestLength >= 0);
@@ -268,8 +284,10 @@ final class BlockChecksumHelper {
   static class ReplicatedBlockChecksumComputer extends BlockChecksumComputer {
 
     ReplicatedBlockChecksumComputer(DataNode datanode,
-                                    ExtendedBlock block) throws IOException {
-      super(datanode, block);
+                                    ExtendedBlock block,
+                                    BlockChecksumOptions blockChecksumOptions)
+        throws IOException {
+      super(datanode, block, blockChecksumOptions);
     }
 
     @Override
@@ -277,22 +295,38 @@ final class BlockChecksumHelper {
       try {
         readHeader();
 
-        MD5Hash md5out;
-        if (isPartialBlk() && getCrcPerBlock() > 0) {
-          md5out = checksumPartialBlock();
-        } else {
-          md5out = checksumWholeBlock();
+        BlockChecksumType type =
+            getBlockChecksumOptions().getBlockChecksumType();
+        switch (type) {
+        case MD5CRC:
+          computeMd5Crc();
+          break;
+        case COMPOSITE_CRC:
+          computeCompositeCrc(getBlockChecksumOptions().getStripeLength());
+          break;
+        default:
+          throw new IOException(String.format(
+              "Unrecognized BlockChecksumType: %s", type));
         }
-        setOutBytes(md5out.getDigest());
-
-        LOG.debug("block={}, bytesPerCRC={}, crcPerBlock={}, md5out={}",
-            getBlock(), getBytesPerCRC(), getCrcPerBlock(), md5out);
       } finally {
         IOUtils.closeStream(getChecksumIn());
         IOUtils.closeStream(getMetadataIn());
       }
     }
 
+    private void computeMd5Crc() throws IOException {
+      MD5Hash md5out;
+      if (isPartialBlk() && getCrcPerBlock() > 0) {
+        md5out = checksumPartialBlock();
+      } else {
+        md5out = checksumWholeBlock();
+      }
+      setOutBytes(md5out.getDigest());
+
+      LOG.debug("block={}, bytesPerCRC={}, crcPerBlock={}, md5out={}",
+          getBlock(), getBytesPerCRC(), getCrcPerBlock(), md5out);
+    }
+
     private MD5Hash checksumWholeBlock() throws IOException {
       MD5Hash md5out = MD5Hash.digest(getChecksumIn());
       return md5out;
@@ -320,6 +354,56 @@ final class BlockChecksumHelper {
 
       return new MD5Hash(digester.digest());
     }
+
+    private void computeCompositeCrc(long stripeLength) throws IOException {
+      long checksumDataLength =
+          Math.min(getVisibleLength(), getRequestLength());
+      if (stripeLength <= 0 || stripeLength > checksumDataLength) {
+        stripeLength = checksumDataLength;
+      }
+
+      CrcComposer crcComposer = CrcComposer.newStripedCrcComposer(
+          getCrcType(), getBytesPerCRC(), stripeLength);
+      DataInputStream checksumIn = getChecksumIn();
+
+      // Whether getting the checksum for the entire block (which itself may
+      // not be a full block size and may have a final chunk smaller than
+      // getBytesPerCRC()), we begin with a number of full chunks, all of size
+      // getBytesPerCRC().
+      long numFullChunks = checksumDataLength / getBytesPerCRC();
+      crcComposer.update(checksumIn, numFullChunks, getBytesPerCRC());
+
+      // There may be a final partial chunk that is not full-sized. Unlike the
+      // MD5 case, we still consider this a "partial chunk" even if
+      // getRequestLength() == getVisibleLength(), since the CRC composition
+      // depends on the byte size of that final chunk, even if it already has a
+      // precomputed CRC stored in metadata. So there are two cases:
+      //   1. Reading only part of a block via getRequestLength(); we get the
+      //      crcPartialBlock() explicitly.
+      //   2. Reading full visible length; the partial chunk already has a CRC
+      //      stored in block metadata, so we just continue reading checksumIn.
+      long partialChunkSize = checksumDataLength % getBytesPerCRC();
+      if (partialChunkSize > 0) {
+        if (isPartialBlk()) {
+          byte[] partialChunkCrcBytes = crcPartialBlock();
+          crcComposer.update(
+              partialChunkCrcBytes, 0, partialChunkCrcBytes.length,
+              partialChunkSize);
+        } else {
+          int partialChunkCrc = checksumIn.readInt();
+          crcComposer.update(partialChunkCrc, partialChunkSize);
+        }
+      }
+
+      byte[] composedCrcs = crcComposer.digest();
+      setOutBytes(composedCrcs);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "block={}, getBytesPerCRC={}, crcPerBlock={}, compositeCrc={}",
+            getBlock(), getBytesPerCRC(), getCrcPerBlock(),
+            CrcUtil.toMultiCrcString(composedCrcs));
+      }
+    }
   }
 
   /**
@@ -335,19 +419,29 @@ final class BlockChecksumHelper {
     private final byte[] blockIndices;
     private final long requestedNumBytes;
 
-    private final DataOutputBuffer md5writer = new DataOutputBuffer();
+    private final DataOutputBuffer blockChecksumBuf = new DataOutputBuffer();
+
+    // Keeps track of the positions within blockChecksumBuf where each data
+    // block's checksum begins; for fixed-size block checksums this is easily
+    // calculated as a multiple of the checksum size, but for striped block
+    // CRCs, it's less error-prone to simply keep track of exact byte offsets
+    // before each block checksum is populated into the buffer.
+    private final int[] blockChecksumPositions;
 
-    BlockGroupNonStripedChecksumComputer(DataNode datanode,
-                                         StripedBlockInfo stripedBlockInfo,
-                                         long requestedNumBytes)
+    BlockGroupNonStripedChecksumComputer(
+        DataNode datanode,
+        StripedBlockInfo stripedBlockInfo,
+        long requestedNumBytes,
+        BlockChecksumOptions blockChecksumOptions)
         throws IOException {
-      super(datanode);
+      super(datanode, blockChecksumOptions);
       this.blockGroup = stripedBlockInfo.getBlock();
       this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy();
       this.datanodes = stripedBlockInfo.getDatanodes();
       this.blockTokens = stripedBlockInfo.getBlockTokens();
       this.blockIndices = stripedBlockInfo.getBlockIndices();
       this.requestedNumBytes = requestedNumBytes;
+      this.blockChecksumPositions = new int[this.ecPolicy.getNumDataUnits()];
     }
 
     private static class LiveBlockInfo {
@@ -383,6 +477,9 @@ final class BlockChecksumHelper {
       }
       long checksumLen = 0;
       for (int idx = 0; idx < numDataUnits && idx < blkIndxLen; idx++) {
+        // Before populating the blockChecksum at this index, record the byte
+        // offset where it will begin.
+        blockChecksumPositions[idx] = blockChecksumBuf.getLength();
         try {
           ExtendedBlock block = getInternalBlock(numDataUnits, idx);
 
@@ -409,8 +506,75 @@ final class BlockChecksumHelper {
         }
       }
 
-      MD5Hash md5out = MD5Hash.digest(md5writer.getData());
-      setOutBytes(md5out.getDigest());
+      BlockChecksumType type = 
getBlockChecksumOptions().getBlockChecksumType();
+      switch (type) {
+      case MD5CRC:
+        MD5Hash md5out = MD5Hash.digest(blockChecksumBuf.getData());
+        setOutBytes(md5out.getDigest());
+        break;
+      case COMPOSITE_CRC:
+        byte[] digest = reassembleNonStripedCompositeCrc(checksumLen);
+        setOutBytes(digest);
+        break;
+      default:
+        throw new IOException(String.format(
+            "Unrecognized BlockChecksumType: %s", type));
+      }
+    }
+
+    /**
+     * @param checksumLen The sum of bytes associated with the block checksum
+     *     data being digested into a block-group level checksum.
+     */
+    private byte[] reassembleNonStripedCompositeCrc(long checksumLen)
+        throws IOException {
+      int numDataUnits = ecPolicy.getNumDataUnits();
+      CrcComposer crcComposer = CrcComposer.newCrcComposer(
+          getCrcType(), ecPolicy.getCellSize());
+
+      // This should hold all the cell-granularity checksums of blk0
+      // followed by all cell checksums of blk1, etc. We must unstripe the
+      // cell checksums in order of logical file bytes. Also, note that the
+      // length of this array may not equal the the number of actually valid
+      // bytes in the buffer (blockChecksumBuf.getLength()).
+      byte[] flatBlockChecksumData = blockChecksumBuf.getData();
+
+      // Initialize byte-level cursors to where each block's checksum begins
+      // inside the combined flattened buffer.
+      int[] blockChecksumCursors = new int[numDataUnits];
+      for (int idx = 0; idx < numDataUnits; ++idx) {
+        blockChecksumCursors[idx] = blockChecksumPositions[idx];
+      }
+
+      // Reassemble cell-level CRCs in the right order.
+      long numFullCells = checksumLen / ecPolicy.getCellSize();
+      for (long cellIndex = 0; cellIndex < numFullCells; ++cellIndex) {
+        int blockIndex = (int) (cellIndex % numDataUnits);
+        int checksumCursor = blockChecksumCursors[blockIndex];
+        int cellCrc = CrcUtil.readInt(
+            flatBlockChecksumData, checksumCursor);
+        blockChecksumCursors[blockIndex] += 4;
+        crcComposer.update(cellCrc, ecPolicy.getCellSize());
+      }
+      if (checksumLen % ecPolicy.getCellSize() != 0) {
+        // Final partial cell.
+        int blockIndex = (int) (numFullCells % numDataUnits);
+        int checksumCursor = blockChecksumCursors[blockIndex];
+        int cellCrc = CrcUtil.readInt(
+            flatBlockChecksumData, checksumCursor);
+        blockChecksumCursors[blockIndex] += 4;
+        crcComposer.update(cellCrc, checksumLen % ecPolicy.getCellSize());
+      }
+      byte[] digest = crcComposer.digest();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("flatBlockChecksumData.length={}, numDataUnits={}, "
+            + "checksumLen={}, digest={}",
+            flatBlockChecksumData.length,
+            numDataUnits,
+            checksumLen,
+            CrcUtil.toSingleCrcString(digest));
+      }
+      return digest;
     }
 
     private ExtendedBlock getInternalBlock(int numDataUnits, int idx) {
@@ -437,8 +601,26 @@ final class BlockChecksumHelper {
         LOG.debug("write to {}: {}, block={}",
             getDatanode(), Op.BLOCK_CHECKSUM, block);
 
-        // get block MD5
-        createSender(pair).blockChecksum(block, blockToken);
+        // get block checksum
+        // A BlockGroupCheckum of type COMPOSITE_CRC uses underlying
+        // BlockChecksums also of type COMPOSITE_CRC but with
+        // stripeLength == ecPolicy.getCellSize().
+        BlockChecksumOptions childOptions;
+        BlockChecksumType groupChecksumType =
+            getBlockChecksumOptions().getBlockChecksumType();
+        switch (groupChecksumType) {
+        case MD5CRC:
+          childOptions = getBlockChecksumOptions();
+          break;
+        case COMPOSITE_CRC:
+          childOptions = new BlockChecksumOptions(
+              BlockChecksumType.COMPOSITE_CRC, ecPolicy.getCellSize());
+          break;
+        default:
+          throw new IOException(
+              "Unknown BlockChecksumType: " + groupChecksumType);
+        }
+        createSender(pair).blockChecksum(block, blockToken, childOptions);
 
         final DataTransferProtos.BlockOpResponseProto reply =
             DataTransferProtos.BlockOpResponseProto.parseFrom(
@@ -463,10 +645,37 @@ final class BlockChecksumHelper {
 
         setOrVerifyChecksumProperties(blockIdx, checksumData.getBytesPerCrc(),
             checksumData.getCrcPerBlock(), ct);
-        //read md5
-        final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray());
-        md5.write(md5writer);
-        LOG.debug("got reply from datanode:{}, md5={}", targetDatanode, md5);
+
+        switch (groupChecksumType) {
+        case MD5CRC:
+          //read md5
+          final MD5Hash md5 =
+              new MD5Hash(checksumData.getBlockChecksum().toByteArray());
+          md5.write(blockChecksumBuf);
+          LOG.debug("got reply from datanode:{}, md5={}",
+              targetDatanode, md5);
+          break;
+        case COMPOSITE_CRC:
+          BlockChecksumType returnedType = PBHelperClient.convert(
+              checksumData.getBlockChecksumOptions().getBlockChecksumType());
+          if (returnedType != BlockChecksumType.COMPOSITE_CRC) {
+            throw new IOException(String.format(
+                "Unexpected blockChecksumType '%s', expecting COMPOSITE_CRC",
+                returnedType));
+          }
+          byte[] checksumBytes =
+              checksumData.getBlockChecksum().toByteArray();
+          blockChecksumBuf.write(checksumBytes, 0, checksumBytes.length);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("got reply from datanode:{} for blockIdx:{}, 
checksum:{}",
+                targetDatanode, blockIdx,
+                CrcUtil.toMultiCrcString(checksumBytes));
+          }
+          break;
+        default:
+          throw new IOException(
+              "Unknown BlockChecksumType: " + groupChecksumType);
+        }
       }
     }
 
@@ -489,10 +698,16 @@ final class BlockChecksumHelper {
       StripedReconstructionInfo stripedReconInfo =
           new StripedReconstructionInfo(
               blockGroup, ecPolicy, blockIndices, datanodes, errIndices);
+      BlockChecksumType groupChecksumType =
+          getBlockChecksumOptions().getBlockChecksumType();
       final StripedBlockChecksumReconstructor checksumRecon =
-          new StripedBlockChecksumReconstructor(
+          groupChecksumType == BlockChecksumType.COMPOSITE_CRC ?
+          new StripedBlockChecksumCompositeCrcReconstructor(
               getDatanode().getErasureCodingWorker(), stripedReconInfo,
-              md5writer, blockLength);
+              blockChecksumBuf, blockLength) :
+          new StripedBlockChecksumMd5CrcReconstructor(
+              getDatanode().getErasureCodingWorker(), stripedReconInfo,
+              blockChecksumBuf, blockLength);
       checksumRecon.reconstruct();
 
       DataChecksum checksum = checksumRecon.getChecksum();
@@ -501,8 +716,8 @@ final class BlockChecksumHelper {
       setOrVerifyChecksumProperties(errBlkIndex,
           checksum.getBytesPerChecksum(), crcPerBlock,
           checksum.getChecksumType());
-      LOG.debug("Recalculated checksum for the block index:{}, md5={}",
-          errBlkIndex, checksumRecon.getMD5());
+      LOG.debug("Recalculated checksum for the block index:{}, checksum={}",
+          errBlkIndex, checksumRecon.getDigestObject());
     }
 
     private void setOrVerifyChecksumProperties(int blockIdx, int bpc,
@@ -524,8 +739,16 @@ final class BlockChecksumHelper {
         setCrcType(ct);
       } else if (getCrcType() != DataChecksum.Type.MIXED &&
           getCrcType() != ct) {
-        // if crc types are mixed in a file
-        setCrcType(DataChecksum.Type.MIXED);
+        BlockChecksumType groupChecksumType =
+            getBlockChecksumOptions().getBlockChecksumType();
+        if (groupChecksumType == BlockChecksumType.COMPOSITE_CRC) {
+          throw new IOException(String.format(
+              "BlockChecksumType COMPOSITE_CRC doesn't support MIXED "
+              + "underlying types; previous block was %s, next block is %s",
+              getCrcType(), ct));
+        } else {
+          setCrcType(DataChecksum.Type.MIXED);
+        }
       }
 
       if (blockIdx == 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index d0ded89..0bb1987 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -968,15 +969,16 @@ class DataXceiver extends Receiver implements Runnable {
 
   @Override
   public void blockChecksum(ExtendedBlock block,
-                            Token<BlockTokenIdentifier> blockToken)
+      Token<BlockTokenIdentifier> blockToken,
+      BlockChecksumOptions blockChecksumOptions)
       throws IOException {
     updateCurrentThreadName("Getting checksum for block " + block);
     final DataOutputStream out = new DataOutputStream(
         getOutputStream());
     checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM,
         BlockTokenIdentifier.AccessMode.READ);
-    BlockChecksumComputer maker =
-        new ReplicatedBlockChecksumComputer(datanode, block);
+    BlockChecksumComputer maker = new ReplicatedBlockChecksumComputer(
+        datanode, block, blockChecksumOptions);
 
     try {
       maker.compute();
@@ -987,8 +989,10 @@ class DataXceiver extends Receiver implements Runnable {
           .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()
               .setBytesPerCrc(maker.getBytesPerCRC())
               .setCrcPerBlock(maker.getCrcPerBlock())
-              .setMd5(ByteString.copyFrom(maker.getOutBytes()))
-              .setCrcType(PBHelperClient.convert(maker.getCrcType())))
+              .setBlockChecksum(ByteString.copyFrom(maker.getOutBytes()))
+              .setCrcType(PBHelperClient.convert(maker.getCrcType()))
+              .setBlockChecksumOptions(
+                  PBHelperClient.convert(blockChecksumOptions)))
           .build()
           .writeDelimitedTo(out);
       out.flush();
@@ -1007,7 +1011,9 @@ class DataXceiver extends Receiver implements Runnable {
 
   @Override
   public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo,
-      final Token<BlockTokenIdentifier> blockToken, long requestedNumBytes)
+      final Token<BlockTokenIdentifier> blockToken,
+      long requestedNumBytes,
+      BlockChecksumOptions blockChecksumOptions)
       throws IOException {
     final ExtendedBlock block = stripedBlockInfo.getBlock();
     updateCurrentThreadName("Getting checksum for block group" +
@@ -1018,7 +1024,7 @@ class DataXceiver extends Receiver implements Runnable {
 
     AbstractBlockChecksumComputer maker =
         new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo,
-            requestedNumBytes);
+            requestedNumBytes, blockChecksumOptions);
 
     try {
       maker.compute();
@@ -1029,8 +1035,10 @@ class DataXceiver extends Receiver implements Runnable {
           .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()
               .setBytesPerCrc(maker.getBytesPerCRC())
               .setCrcPerBlock(maker.getCrcPerBlock())
-              .setMd5(ByteString.copyFrom(maker.getOutBytes()))
-              .setCrcType(PBHelperClient.convert(maker.getCrcType())))
+              .setBlockChecksum(ByteString.copyFrom(maker.getOutBytes()))
+              .setCrcType(PBHelperClient.convert(maker.getCrcType()))
+              .setBlockChecksumOptions(
+                  PBHelperClient.convert(blockChecksumOptions)))
           .build()
           .writeDelimitedTo(out);
       out.flush();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumCompositeCrcReconstructor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumCompositeCrcReconstructor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumCompositeCrcReconstructor.java
new file mode 100644
index 0000000..afcc8cb
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumCompositeCrcReconstructor.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.util.CrcComposer;
+
+/**
+ * Computes striped composite CRCs over reconstructed chunk CRCs.
+ */
+@InterfaceAudience.Private
+public class StripedBlockChecksumCompositeCrcReconstructor
+    extends StripedBlockChecksumReconstructor {
+  private final int ecPolicyCellSize;
+
+  private byte[] digestValue;
+  private CrcComposer digester;
+
+  public StripedBlockChecksumCompositeCrcReconstructor(
+      ErasureCodingWorker worker,
+      StripedReconstructionInfo stripedReconInfo,
+      DataOutputBuffer checksumWriter,
+      long requestedBlockLength) throws IOException {
+    super(worker, stripedReconInfo, checksumWriter, requestedBlockLength);
+    this.ecPolicyCellSize = stripedReconInfo.getEcPolicy().getCellSize();
+  }
+
+  @Override
+  public Object getDigestObject() {
+    return digestValue;
+  }
+
+  @Override
+  void prepareDigester() throws IOException {
+    digester = CrcComposer.newStripedCrcComposer(
+        getChecksum().getChecksumType(),
+        getChecksum().getBytesPerChecksum(),
+        ecPolicyCellSize);
+  }
+
+  @Override
+  void updateDigester(byte[] checksumBytes, int dataBytesPerChecksum)
+      throws IOException {
+    if (digester == null) {
+      throw new IOException(String.format(
+          "Called updatedDigester with checksumBytes.length=%d, "
+          + "dataBytesPerChecksum=%d but digester is null",
+          checksumBytes.length, dataBytesPerChecksum));
+    }
+    digester.update(
+        checksumBytes, 0, checksumBytes.length, dataBytesPerChecksum);
+  }
+
+  @Override
+  void commitDigest() throws IOException {
+    if (digester == null) {
+      throw new IOException("Called commitDigest() but digester is null");
+    }
+    digestValue = digester.digest();
+    getChecksumWriter().write(digestValue, 0, digestValue.length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumMd5CrcReconstructor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumMd5CrcReconstructor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumMd5CrcReconstructor.java
new file mode 100644
index 0000000..2f809b8
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumMd5CrcReconstructor.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.MD5Hash;
+
+/**
+ * Computes running MD5-of-CRC over reconstructed chunk CRCs.
+ */
+@InterfaceAudience.Private
+public class StripedBlockChecksumMd5CrcReconstructor
+    extends StripedBlockChecksumReconstructor {
+  private MD5Hash md5;
+  private MessageDigest digester;
+
+  public StripedBlockChecksumMd5CrcReconstructor(ErasureCodingWorker worker,
+      StripedReconstructionInfo stripedReconInfo,
+      DataOutputBuffer checksumWriter,
+      long requestedBlockLength) throws IOException {
+    super(worker, stripedReconInfo, checksumWriter, requestedBlockLength);
+  }
+
+  @Override
+  public Object getDigestObject() {
+    return md5;
+  }
+
+  @Override
+  void prepareDigester() throws IOException {
+    digester = MD5Hash.getDigester();
+  }
+
+  @Override
+  void updateDigester(byte[] checksumBytes, int dataBytesPerChecksum)
+      throws IOException {
+    if (digester == null) {
+      throw new IOException(String.format(
+          "Called updatedDigester with checksumBytes.length=%d, "
+          + "dataBytesPerChecksum=%d but digester is null",
+          checksumBytes.length, dataBytesPerChecksum));
+    }
+    digester.update(checksumBytes, 0, checksumBytes.length);
+  }
+
+  @Override
+  void commitDigest() throws IOException {
+    if (digester == null) {
+      throw new IOException("Called commitDigest() but digester is null");
+    }
+    byte[] digest = digester.digest();
+    md5 = new MD5Hash(digest);
+    md5.write(getChecksumWriter());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
index d530a8e..b2e6496 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
@@ -19,12 +19,10 @@ package org.apache.hadoop.hdfs.server.datanode.erasurecode;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.security.MessageDigest;
 import java.util.Arrays;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.MD5Hash;
 
 /**
  * StripedBlockChecksumReconstructor reconstruct one or more missed striped
@@ -33,18 +31,17 @@ import org.apache.hadoop.io.MD5Hash;
  * using the newly reconstructed block.
  */
 @InterfaceAudience.Private
-public class StripedBlockChecksumReconstructor extends StripedReconstructor {
-
+public abstract class StripedBlockChecksumReconstructor
+    extends StripedReconstructor {
   private ByteBuffer targetBuffer;
   private final byte[] targetIndices;
 
   private byte[] checksumBuf;
   private DataOutputBuffer checksumWriter;
-  private MD5Hash md5;
   private long checksumDataLen;
   private long requestedLen;
 
-  public StripedBlockChecksumReconstructor(ErasureCodingWorker worker,
+  protected StripedBlockChecksumReconstructor(ErasureCodingWorker worker,
       StripedReconstructionInfo stripedReconInfo,
       DataOutputBuffer checksumWriter,
       long requestedBlockLength) throws IOException {
@@ -72,8 +69,9 @@ public class StripedBlockChecksumReconstructor extends 
StripedReconstructor {
     checksumBuf = new byte[tmpLen];
   }
 
+  @Override
   public void reconstruct() throws IOException {
-    MessageDigest digester = MD5Hash.getDigester();
+    prepareDigester();
     long maxTargetLength = getMaxTargetLength();
     try {
       while (requestedLen > 0 && getPositionInBlock() < maxTargetLength) {
@@ -88,24 +86,54 @@ public class StripedBlockChecksumReconstructor extends 
StripedReconstructor {
         reconstructTargets(toReconstructLen);
 
         // step3: calculate checksum
-        checksumDataLen += checksumWithTargetOutput(targetBuffer.array(),
-            toReconstructLen, digester);
+        checksumDataLen += checksumWithTargetOutput(
+            targetBuffer.array(), toReconstructLen);
 
         updatePositionInBlock(toReconstructLen);
         requestedLen -= toReconstructLen;
         clearBuffers();
       }
 
-      byte[] digest = digester.digest();
-      md5 = new MD5Hash(digest);
-      md5.write(checksumWriter);
+      commitDigest();
     } finally {
       cleanup();
     }
   }
 
-  private long checksumWithTargetOutput(byte[] outputData, int 
toReconstructLen,
-      MessageDigest digester) throws IOException {
+  /**
+   * Should return a representation of a completed/reconstructed digest which
+   * is suitable for debug printing.
+   */
+  public abstract Object getDigestObject();
+
+  /**
+   * This will be called before starting reconstruction.
+   */
+  abstract void prepareDigester() throws IOException;
+
+  /**
+   * This will be called repeatedly with chunked checksums computed in-flight
+   * over reconstructed data.
+   *
+   * @param dataBytesPerChecksum the number of underlying data bytes
+   *     corresponding to each checksum inside {@code checksumBytes}.
+   */
+  abstract void updateDigester(byte[] checksumBytes, int dataBytesPerChecksum)
+      throws IOException;
+
+  /**
+   * This will be called when reconstruction of entire requested length is
+   * complete and any final digests should be committed to
+   * implementation-specific output fields.
+   */
+  abstract void commitDigest() throws IOException;
+
+  protected DataOutputBuffer getChecksumWriter() {
+    return checksumWriter;
+  }
+
+  private long checksumWithTargetOutput(byte[] outputData, int 
toReconstructLen)
+      throws IOException {
     long checksumDataLength = 0;
     // Calculate partial block checksum. There are two cases.
     // case-1) length of data bytes which is fraction of bytesPerCRC
@@ -128,7 +156,7 @@ public class StripedBlockChecksumReconstructor extends 
StripedReconstructor {
         checksumBuf = new byte[checksumRemaining];
         getChecksum().calculateChunkedSums(outputData, dataOffset,
             remainingLen, checksumBuf, 0);
-        digester.update(checksumBuf, 0, checksumBuf.length);
+        updateDigester(checksumBuf, getChecksum().getBytesPerChecksum());
         checksumDataLength = checksumBuf.length;
         dataOffset = remainingLen;
       }
@@ -139,7 +167,7 @@ public class StripedBlockChecksumReconstructor extends 
StripedReconstructor {
         getChecksum().reset();
         getChecksum().update(outputData, dataOffset, partialLength);
         getChecksum().writeValue(partialCrc, 0, true);
-        digester.update(partialCrc);
+        updateDigester(partialCrc, partialLength);
         checksumDataLength += partialCrc.length;
       }
 
@@ -151,7 +179,7 @@ public class StripedBlockChecksumReconstructor extends 
StripedReconstructor {
         outputData.length, checksumBuf, 0);
 
     // updates digest using the checksum array of bytes
-    digester.update(checksumBuf, 0, checksumBuf.length);
+    updateDigester(checksumBuf, getChecksum().getBytesPerChecksum());
     return checksumBuf.length;
   }
 
@@ -176,10 +204,6 @@ public class StripedBlockChecksumReconstructor extends 
StripedReconstructor {
     targetBuffer.clear();
   }
 
-  public MD5Hash getMD5() {
-    return md5;
-  }
-
   public long getChecksumDataLen() {
     return checksumDataLen;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
index ce1254c..29c0078 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
@@ -79,6 +79,7 @@ class StripedBlockReconstructor extends StripedReconstructor
     }
   }
 
+  @Override
   void reconstruct() throws IOException {
     while (getPositionInBlock() < getMaxTargetLength()) {
       DataNodeFaultInjector.get().stripedBlockReconstruction();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 1e83325..f8cce60 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3541,6 +3541,17 @@
 </property>
 
 <property>
+  <name>dfs.checksum.combine.mode</name>
+  <value>MD5MD5CRC</value>
+  <description>
+    Defines how lower-level chunk/block checksums are combined into file-level
+    checksums; the original MD5MD5CRC mode is not comparable between files
+    with different block layouts, while modes like COMPOSITE_CRC are
+    comparable independently of block layout.
+  </description>
+</property>
+
+<property>
   <name>dfs.client.block.write.locateFollowingBlock.retries</name>
   <value>5</value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 4f9f260..adede5f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -857,6 +857,20 @@ public class DFSTestUtil {
     }
   }
 
+  /* Write the given bytes to the given file using the specified blockSize */
+  public static void writeFile(
+      FileSystem fs, Path p, byte[] bytes, long blockSize)
+      throws IOException {
+    if (fs.exists(p)) {
+      fs.delete(p, true);
+    }
+    try (InputStream is = new ByteArrayInputStream(bytes);
+        FSDataOutputStream os = fs.create(
+            p, false, 4096, fs.getDefaultReplication(p), blockSize)) {
+      IOUtils.copyBytes(is, os, bytes.length);
+    }
+  }
+
   /* Write the given string to the given file */
   public static void writeFile(FileSystem fs, Path p, String s)
       throws IOException {
@@ -901,14 +915,27 @@ public class DFSTestUtil {
    */
   public static void appendFileNewBlock(DistributedFileSystem fs,
       Path p, int length) throws IOException {
-    assert fs.exists(p);
     assert length >= 0;
     byte[] toAppend = new byte[length];
     Random random = new Random();
     random.nextBytes(toAppend);
+    appendFileNewBlock(fs, p, toAppend);
+  }
+
+  /**
+   * Append specified bytes to a given file, starting with new block.
+   *
+   * @param fs The file system
+   * @param p Path of the file to append
+   * @param bytes The data to append
+   * @throws IOException
+   */
+  public static void appendFileNewBlock(DistributedFileSystem fs,
+      Path p, byte[] bytes) throws IOException {
+    assert fs.exists(p);
     try (FSDataOutputStream out = fs.append(p,
         EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null)) {
-      out.write(toAppend);
+      out.write(bytes);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
index d201ce1..0ff2d4b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -30,7 +31,9 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@@ -74,6 +77,9 @@ public class TestFileChecksum {
   private String stripedFile2 = ecDir + "/stripedFileChecksum2";
   private String replicatedFile = "/replicatedFileChecksum";
 
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
   @Before
   public void setup() throws IOException {
     int numDNs = dataBlocks + parityBlocks + 2;
@@ -83,6 +89,7 @@ public class TestFileChecksum {
         false);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
     conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    customizeConf(conf);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
     Path ecPath = new Path(ecDir);
     cluster.getFileSystem().mkdir(ecPath, FsPermission.getDirDefault());
@@ -106,6 +113,39 @@ public class TestFileChecksum {
     }
   }
 
+  /**
+   * Subclasses may customize the conf to run the full set of tests under
+   * different conditions.
+   */
+  protected void customizeConf(Configuration preparedConf) {
+  }
+
+  /**
+   * Subclasses may override this method to indicate whether equivalent files
+   * in striped and replicated formats are expected to have the same
+   * overall FileChecksum.
+   */
+  protected boolean expectComparableStripedAndReplicatedFiles() {
+    return false;
+  }
+
+  /**
+   * Subclasses may override this method to indicate whether equivalent files
+   * in replicated formats with different block sizes are expected to have the
+   * same overall FileChecksum.
+   */
+  protected boolean expectComparableDifferentBlockSizeReplicatedFiles() {
+    return false;
+  }
+
+  /**
+   * Subclasses may override this method to indicate whether checksums are
+   * supported for files where different blocks have different bytesPerCRC.
+   */
+  protected boolean expectSupportForSingleFileMixedBytesPerChecksum() {
+    return false;
+  }
+
   @Test(timeout = 90000)
   public void testStripedFileChecksum1() throws Exception {
     int length = 0;
@@ -182,7 +222,30 @@ public class TestFileChecksum {
     FileChecksum replicatedFileChecksum = getFileChecksum(replicatedFile,
         10, false);
 
-    Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum));
+    if (expectComparableStripedAndReplicatedFiles()) {
+      Assert.assertEquals(stripedFileChecksum1, replicatedFileChecksum);
+    } else {
+      Assert.assertNotEquals(stripedFileChecksum1, replicatedFileChecksum);
+    }
+  }
+
+  @Test(timeout = 90000)
+  public void testDifferentBlockSizeReplicatedFileChecksum() throws Exception {
+    byte[] fileData = StripedFileTestUtil.generateBytes(fileSize);
+    String replicatedFile1 = "/replicatedFile1";
+    String replicatedFile2 = "/replicatedFile2";
+    DFSTestUtil.writeFile(
+        fs, new Path(replicatedFile1), fileData, blockSize);
+    DFSTestUtil.writeFile(
+        fs, new Path(replicatedFile2), fileData, blockSize / 2);
+    FileChecksum checksum1 = getFileChecksum(replicatedFile1, -1, false);
+    FileChecksum checksum2 = getFileChecksum(replicatedFile2, -1, false);
+
+    if (expectComparableDifferentBlockSizeReplicatedFiles()) {
+      Assert.assertEquals(checksum1, checksum2);
+    } else {
+      Assert.assertNotEquals(checksum1, checksum2);
+    }
   }
 
   @Test(timeout = 90000)
@@ -471,6 +534,40 @@ public class TestFileChecksum {
         bytesPerCRC - 1);
   }
 
+  @Test(timeout = 90000)
+  public void testMixedBytesPerChecksum() throws Exception {
+    int fileLength = bytesPerCRC * 3;
+    byte[] fileData = StripedFileTestUtil.generateBytes(fileLength);
+    String replicatedFile1 = "/replicatedFile1";
+
+    // Split file into two parts.
+    byte[] fileDataPart1 = new byte[bytesPerCRC * 2];
+    System.arraycopy(fileData, 0, fileDataPart1, 0, fileDataPart1.length);
+    byte[] fileDataPart2 = new byte[fileData.length - fileDataPart1.length];
+    System.arraycopy(
+        fileData, fileDataPart1.length, fileDataPart2, 0, 
fileDataPart2.length);
+
+    DFSTestUtil.writeFile(fs, new Path(replicatedFile1), fileDataPart1);
+
+    // Modify bytesPerCRC for second part that we append as separate block.
+    conf.setInt(
+        HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytesPerCRC / 2);
+    DFSTestUtil.appendFileNewBlock(
+        ((DistributedFileSystem) FileSystem.newInstance(conf)),
+        new Path(replicatedFile1), fileDataPart2);
+
+    if (expectSupportForSingleFileMixedBytesPerChecksum()) {
+      String replicatedFile2 = "/replicatedFile2";
+      DFSTestUtil.writeFile(fs, new Path(replicatedFile2), fileData);
+      FileChecksum checksum1 = getFileChecksum(replicatedFile1, -1, false);
+      FileChecksum checksum2 = getFileChecksum(replicatedFile2, -1, false);
+      Assert.assertEquals(checksum1, checksum2);
+    } else {
+      exception.expect(IOException.class);
+      FileChecksum checksum = getFileChecksum(replicatedFile1, -1, false);
+    }
+  }
+
   private FileChecksum getFileChecksum(String filePath, int range,
                                        boolean killDn) throws Exception {
     int dnIdxToDie = -1;
@@ -537,4 +634,4 @@ public class TestFileChecksum {
 
     return -1;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksumCompositeCrc.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksumCompositeCrc.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksumCompositeCrc.java
new file mode 100644
index 0000000..87fb7da
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksumCompositeCrc.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+
+/**
+ * End-to-end tests for COMPOSITE_CRC combine mode.
+ */
+public class TestFileChecksumCompositeCrc extends TestFileChecksum {
+  @Override
+  protected void customizeConf(Configuration conf) {
+    conf.set(
+        HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_KEY, "COMPOSITE_CRC");
+  }
+
+  @Override
+  protected boolean expectComparableStripedAndReplicatedFiles() {
+    return true;
+  }
+
+  @Override
+  protected boolean expectComparableDifferentBlockSizeReplicatedFiles() {
+    return true;
+  }
+
+  @Override
+  protected boolean expectSupportForSingleFileMixedBytesPerChecksum() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
index da56c15..22f84c5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.StripedFileTestUtil;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockChecksumType;
 import org.apache.hadoop.hdfs.protocol.BlockType;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -683,6 +684,19 @@ public class TestPBHelper {
   }
 
   @Test
+  public void testBlockChecksumTypeProto() {
+    assertEquals(BlockChecksumType.MD5CRC,
+        PBHelperClient.convert(HdfsProtos.BlockChecksumTypeProto.MD5CRC));
+    assertEquals(BlockChecksumType.COMPOSITE_CRC,
+        PBHelperClient.convert(
+            HdfsProtos.BlockChecksumTypeProto.COMPOSITE_CRC));
+    assertEquals(PBHelperClient.convert(BlockChecksumType.MD5CRC),
+        HdfsProtos.BlockChecksumTypeProto.MD5CRC);
+    assertEquals(PBHelperClient.convert(BlockChecksumType.COMPOSITE_CRC),
+        HdfsProtos.BlockChecksumTypeProto.COMPOSITE_CRC);
+  }
+
+  @Test
   public void testAclEntryProto() {
     // All fields populated.
     AclEntry e1 = new AclEntry.Builder().setName("test")

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
index bbe7e8d..30e4683 100644
--- 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
+++ 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
@@ -73,20 +73,42 @@ public class TestCopyMapper {
   private static final String SOURCE_PATH = "/tmp/source";
   private static final String TARGET_PATH = "/tmp/target";
 
-  private static Configuration configuration;
-
   @BeforeClass
   public static void setup() throws Exception {
-    configuration = getConfigurationForCluster();
-    cluster = new MiniDFSCluster.Builder(configuration)
+    Configuration configuration = getConfigurationForCluster();
+    setCluster(new MiniDFSCluster.Builder(configuration)
                 .numDataNodes(1)
                 .format(true)
-                .build();
+                .build());
+  }
+
+  /**
+   * Subclasses may override this method to indicate whether copying files with
+   * non-default block sizes without setting BLOCKSIZE as a preserved attribute
+   * is expected to succeed with CRC checks enabled.
+   */
+  protected boolean expectDifferentBlockSizesMultipleBlocksToSucceed() {
+    return false;
+  }
+
+  /**
+   * Subclasses may override this method to indicate whether copying files with
+   * non-default bytes-per-crc without setting CHECKSUMTYPE as a preserved
+   * attribute is expected to succeed with CRC checks enabled.
+   */
+  protected boolean expectDifferentBytesPerCrcToSucceed() {
+    return false;
+  }
+
+  protected static void setCluster(MiniDFSCluster c) {
+    cluster = c;
   }
 
-  private static Configuration getConfigurationForCluster() throws IOException 
{
+  protected static Configuration getConfigurationForCluster()
+      throws IOException {
     Configuration configuration = new Configuration();
-    System.setProperty("test.build.data", 
"target/tmp/build/TEST_COPY_MAPPER/data");
+    System.setProperty(
+        "test.build.data", "target/tmp/build/TEST_COPY_MAPPER/data");
     configuration.set("hadoop.log.dir", "target/tmp");
     configuration.set("dfs.namenode.fs-limits.min-block-size", "0");
     LOG.debug("fs.default.name  == " + configuration.get("fs.default.name"));
@@ -136,7 +158,8 @@ public class TestCopyMapper {
     }
   }
 
-  private static void createSourceDataWithDifferentBlockSize() throws 
Exception {
+  private static void createSourceDataWithDifferentBlockSize()
+      throws Exception {
     mkdirs(SOURCE_PATH + "/1");
     mkdirs(SOURCE_PATH + "/2");
     mkdirs(SOURCE_PATH + "/2/3/4");
@@ -163,6 +186,21 @@ public class TestCopyMapper {
         512));
   }
 
+  private static void createSourceDataWithDifferentBytesPerCrc()
+      throws Exception {
+    mkdirs(SOURCE_PATH + "/1");
+    mkdirs(SOURCE_PATH + "/2");
+    mkdirs(SOURCE_PATH + "/2/3/4");
+    mkdirs(SOURCE_PATH + "/2/3");
+    mkdirs(SOURCE_PATH + "/5");
+    touchFile(SOURCE_PATH + "/5/6", false,
+        new ChecksumOpt(DataChecksum.Type.CRC32C, 32));
+    mkdirs(SOURCE_PATH + "/7");
+    mkdirs(SOURCE_PATH + "/7/8");
+    touchFile(SOURCE_PATH + "/7/8/9", false,
+        new ChecksumOpt(DataChecksum.Type.CRC32C, 64));
+  }
+
   private static void mkdirs(String path) throws Exception {
     FileSystem fileSystem = cluster.getFileSystem();
     final Path qualifiedPath = new 
Path(path).makeQualified(fileSystem.getUri(),
@@ -281,7 +319,7 @@ public class TestCopyMapper {
                   path)), context);
     }
 
-    verifyCopy(fs, false);
+    verifyCopy(fs, false, true);
     // verify that we only copied new appended data
     Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE * 2, stubContext
         .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
@@ -317,6 +355,11 @@ public class TestCopyMapper {
     EnumSet<DistCpOptions.FileAttribute> fileAttributes
             = EnumSet.of(DistCpOptions.FileAttribute.REPLICATION);
     if (preserveChecksum) {
+      // We created source files with both different checksum types and
+      // non-default block sizes; here we don't explicitly add BLOCKSIZE
+      // as a preserved attribute, but the current behavior is that
+      // preserving CHECKSUMTYPE also automatically implies preserving
+      // BLOCKSIZE.
       fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE);
     }
     configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
@@ -339,7 +382,7 @@ public class TestCopyMapper {
     }
 
     // Check that the maps worked.
-    verifyCopy(fs, preserveChecksum);
+    verifyCopy(fs, preserveChecksum, true);
     Assert.assertEquals(numFiles, stubContext.getReporter()
         .getCounter(CopyMapper.Counter.COPY).getValue());
     Assert.assertEquals(numDirs, stubContext.getReporter()
@@ -361,7 +404,8 @@ public class TestCopyMapper {
     }
   }
 
-  private void verifyCopy(FileSystem fs, boolean preserveChecksum)
+  private void verifyCopy(
+      FileSystem fs, boolean preserveChecksum, boolean preserveReplication)
       throws Exception {
     for (Path path : pathList) {
       final Path targetPath = new Path(path.toString().replaceAll(SOURCE_PATH,
@@ -370,8 +414,10 @@ public class TestCopyMapper {
       Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
       FileStatus sourceStatus = fs.getFileStatus(path);
       FileStatus targetStatus = fs.getFileStatus(targetPath);
-      Assert.assertEquals(sourceStatus.getReplication(),
-          targetStatus.getReplication());
+      if (preserveReplication) {
+        Assert.assertEquals(sourceStatus.getReplication(),
+            targetStatus.getReplication());
+      }
       if (preserveChecksum) {
         Assert.assertEquals(sourceStatus.getBlockSize(),
             targetStatus.getBlockSize());
@@ -505,7 +551,7 @@ public class TestCopyMapper {
         @Override
         public FileSystem run() {
           try {
-            return FileSystem.get(configuration);
+            return FileSystem.get(cluster.getConfiguration(0));
           } catch (IOException e) {
             LOG.error("Exception encountered ", e);
             Assert.fail("Test failed: " + e.getMessage());
@@ -574,7 +620,7 @@ public class TestCopyMapper {
         @Override
         public FileSystem run() {
           try {
-            return FileSystem.get(configuration);
+            return FileSystem.get(cluster.getConfiguration(0));
           } catch (IOException e) {
             LOG.error("Exception encountered ", e);
             Assert.fail("Test failed: " + e.getMessage());
@@ -649,7 +695,7 @@ public class TestCopyMapper {
         @Override
         public FileSystem run() {
           try {
-            return FileSystem.get(configuration);
+            return FileSystem.get(cluster.getConfiguration(0));
           } catch (IOException e) {
             LOG.error("Exception encountered ", e);
             Assert.fail("Test failed: " + e.getMessage());
@@ -730,7 +776,7 @@ public class TestCopyMapper {
         @Override
         public FileSystem run() {
           try {
-            return FileSystem.get(configuration);
+            return FileSystem.get(cluster.getConfiguration(0));
           } catch (IOException e) {
             LOG.error("Exception encountered ", e);
             Assert.fail("Test failed: " + e.getMessage());
@@ -887,7 +933,7 @@ public class TestCopyMapper {
         @Override
         public FileSystem run() {
           try {
-            return FileSystem.get(configuration);
+            return FileSystem.get(cluster.getConfiguration(0));
           } catch (IOException e) {
             LOG.error("Exception encountered when get FileSystem.", e);
             throw new RuntimeException(e);
@@ -938,12 +984,13 @@ public class TestCopyMapper {
   }
 
   @Test(timeout=40000)
-  public void testCopyFailOnBlockSizeDifference() throws Exception {
+  public void testCopyWithDifferentBlockSizes() throws Exception {
     try {
       deleteState();
       createSourceDataWithDifferentBlockSize();
 
       FileSystem fs = cluster.getFileSystem();
+
       CopyMapper copyMapper = new CopyMapper();
       StubContext stubContext = new StubContext(getConfiguration(), null, 0);
       Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
@@ -959,17 +1006,79 @@ public class TestCopyMapper {
 
       for (Path path : pathList) {
         final FileStatus fileStatus = fs.getFileStatus(path);
-        copyMapper.map(new Text(DistCpUtils.getRelativePath(new 
Path(SOURCE_PATH),
-            path)), new CopyListingFileStatus(fileStatus), context);
+        copyMapper.map(
+            new Text(
+                DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+            new CopyListingFileStatus(fileStatus), context);
       }
 
-      Assert.fail("Copy should have failed because of block-size difference.");
+      if (expectDifferentBlockSizesMultipleBlocksToSucceed()) {
+        verifyCopy(fs, false, false);
+      } else {
+        Assert.fail(
+            "Copy should have failed because of block-size difference.");
+      }
+    } catch (Exception exception) {
+      if (expectDifferentBlockSizesMultipleBlocksToSucceed()) {
+        throw exception;
+      } else {
+        // Check that the exception suggests the use of -pb/-skipcrccheck.
+        // This could be refactored to use LambdaTestUtils if we add support
+        // for listing multiple different independent substrings to expect
+        // in the exception message and add support for LambdaTestUtils to
+        // inspect the transitive cause and/or suppressed exceptions as well.
+        Throwable cause = exception.getCause().getCause();
+        GenericTestUtils.assertExceptionContains("-pb", cause);
+        GenericTestUtils.assertExceptionContains("-skipcrccheck", cause);
+      }
     }
-    catch (IOException exception) {
-      // Check that the exception suggests the use of -pb/-skipcrccheck.
-      Throwable cause = exception.getCause().getCause();
-      GenericTestUtils.assertExceptionContains("-pb", cause);
-      GenericTestUtils.assertExceptionContains("-skipcrccheck", cause);
+  }
+
+  @Test(timeout=40000)
+  public void testCopyWithDifferentBytesPerCrc() throws Exception {
+    try {
+      deleteState();
+      createSourceDataWithDifferentBytesPerCrc();
+
+      FileSystem fs = cluster.getFileSystem();
+
+      CopyMapper copyMapper = new CopyMapper();
+      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+      Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
+          = stubContext.getContext();
+
+      Configuration configuration = context.getConfiguration();
+      EnumSet<DistCpOptions.FileAttribute> fileAttributes
+          = EnumSet.noneOf(DistCpOptions.FileAttribute.class);
+      configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+          DistCpUtils.packAttributes(fileAttributes));
+
+      copyMapper.setup(context);
+
+      for (Path path : pathList) {
+        final FileStatus fileStatus = fs.getFileStatus(path);
+        copyMapper.map(
+            new Text(
+                DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+            new CopyListingFileStatus(fileStatus), context);
+      }
+
+      if (expectDifferentBytesPerCrcToSucceed()) {
+        verifyCopy(fs, false, false);
+      } else {
+        Assert.fail(
+            "Copy should have failed because of bytes-per-crc difference.");
+      }
+    } catch (Exception exception) {
+      if (expectDifferentBytesPerCrcToSucceed()) {
+        throw exception;
+      } else {
+        // This could be refactored to use LambdaTestUtils if we add support
+        // for LambdaTestUtils to inspect the transitive cause and/or
+        // suppressed exceptions as well.
+        Throwable cause = exception.getCause().getCause();
+        GenericTestUtils.assertExceptionContains("mismatch", cause);
+      }
     }
   }
 
@@ -980,6 +1089,7 @@ public class TestCopyMapper {
       createSourceData();
 
       FileSystem fs = cluster.getFileSystem();
+
       CopyMapper copyMapper = new CopyMapper();
       StubContext stubContext = new StubContext(getConfiguration(), null, 0);
       Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
@@ -1010,6 +1120,12 @@ public class TestCopyMapper {
         final FileStatus source = fs.getFileStatus(path);
         final FileStatus target = fs.getFileStatus(targetPath);
         if (!source.isDirectory() ) {
+          // The reason the checksum check succeeds despite block sizes not
+          // matching between the two is that when only one block is ever
+          // written (partial or complete), the crcPerBlock is not included
+          // in the FileChecksum algorithmName. If we had instead written
+          // a large enough file to exceed the blocksize, then the copy
+          // would not have succeeded.
           Assert.assertTrue(preserve ||
                   source.getBlockSize() != target.getBlockSize());
           Assert.assertTrue(preserve ||
@@ -1020,8 +1136,7 @@ public class TestCopyMapper {
                   source.getReplication() == target.getReplication());
         }
       }
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       Assert.assertTrue("Unexpected exception: " + e.getMessage(), false);
       e.printStackTrace();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapperCompositeCrc.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapperCompositeCrc.java
 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapperCompositeCrc.java
new file mode 100644
index 0000000..6ed86e3
--- /dev/null
+++ 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapperCompositeCrc.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+
+import org.junit.BeforeClass;
+
+/**
+ * End-to-end tests for COMPOSITE_CRC combine mode.
+ */
+public class TestCopyMapperCompositeCrc extends TestCopyMapper {
+  @BeforeClass
+  public static void setup() throws Exception {
+    Configuration configuration = TestCopyMapper.getConfigurationForCluster();
+    configuration.set(
+        HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_KEY, "COMPOSITE_CRC");
+    TestCopyMapper.setCluster(new MiniDFSCluster.Builder(configuration)
+                .numDataNodes(1)
+                .format(true)
+                .build());
+  }
+
+  @Override
+  protected boolean expectDifferentBlockSizesMultipleBlocksToSucceed() {
+    return true;
+  }
+
+  @Override
+  protected boolean expectDifferentBytesPerCrcToSucceed() {
+    return true;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to