http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto index 2356201..384da54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto @@ -148,8 +148,9 @@ message OpCopyBlockProto { required BaseHeaderProto header = 1; } -message OpBlockChecksumProto { +message OpBlockChecksumProto { required BaseHeaderProto header = 1; + optional BlockChecksumOptionsProto blockChecksumOptions = 2; } message OpBlockGroupChecksumProto { @@ -160,6 +161,7 @@ message OpBlockGroupChecksumProto { required ErasureCodingPolicyProto ecPolicy = 4; repeated uint32 blockIndices = 5; required uint64 requestedNumBytes = 6; + optional BlockChecksumOptionsProto blockChecksumOptions = 7; } /** @@ -313,8 +315,9 @@ message DNTransferAckProto { message OpBlockChecksumResponseProto { required uint32 bytesPerCrc = 1; required uint64 crcPerBlock = 2; - required bytes md5 = 3; + required bytes blockChecksum = 3; optional ChecksumTypeProto crcType = 4; + optional BlockChecksumOptionsProto blockChecksumOptions = 5; } message OpCustomProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto index 29d0b4e..441b9d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto @@ -480,6 +480,27 @@ enum ChecksumTypeProto { CHECKSUM_CRC32C = 2; } +enum BlockChecksumTypeProto { + MD5CRC = 1; // BlockChecksum obtained by taking the MD5 digest of chunk CRCs + COMPOSITE_CRC = 2; // Chunk-independent CRC, optionally striped +} + +/** + * Algorithms/types denoting how block-level checksums are computed using + * lower-level chunk checksums/CRCs. + * These options should be kept in sync with + * org.apache.hadoop.hdfs.protocol.BlockChecksumOptions. + */ +message BlockChecksumOptionsProto { + optional BlockChecksumTypeProto blockChecksumType = 1 [default = MD5CRC]; + + // Only used if blockChecksumType specifies a striped format, such as + // COMPOSITE_CRC. If so, then the blockChecksum in the response is expected + // to be the concatenation of N crcs, where + // N == ((requestedLength - 1) / stripedLength) + 1 + optional uint64 stripeLength = 2; +} + /** * HDFS Server Defaults */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index bab2e8d..5d2d1f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -301,8 +301,9 @@ public abstract class Receiver implements DataTransferProtocol { TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName()); try { - blockChecksum(PBHelperClient.convert(proto.getHeader().getBlock()), - PBHelperClient.convert(proto.getHeader().getToken())); + blockChecksum(PBHelperClient.convert(proto.getHeader().getBlock()), + PBHelperClient.convert(proto.getHeader().getToken()), + PBHelperClient.convert(proto.getBlockChecksumOptions())); } finally { if (traceScope != null) traceScope.close(); } @@ -325,7 +326,8 @@ public abstract class Receiver implements DataTransferProtocol { try { blockGroupChecksum(stripedBlockInfo, PBHelperClient.convert(proto.getHeader().getToken()), - proto.getRequestedNumBytes()); + proto.getRequestedNumBytes(), + PBHelperClient.convert(proto.getBlockChecksumOptions())); } finally { if (traceScope != null) { traceScope.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java index e99911b..3388855 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java @@ -21,6 +21,8 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions; +import org.apache.hadoop.hdfs.protocol.BlockChecksumType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -32,6 +34,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedBlockChecksumCompositeCrcReconstructor; +import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedBlockChecksumMd5CrcReconstructor; import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedBlockChecksumReconstructor; import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedReconstructionInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; @@ -40,6 +44,8 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.CrcComposer; +import org.apache.hadoop.util.CrcUtil; import org.apache.hadoop.util.DataChecksum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +76,7 @@ final class BlockChecksumHelper { */ static abstract class AbstractBlockChecksumComputer { private final DataNode datanode; + private final BlockChecksumOptions blockChecksumOptions; private byte[] outBytes; private int bytesPerCRC = -1; @@ -77,8 +84,11 @@ final class BlockChecksumHelper { private long crcPerBlock = -1; private int checksumSize = -1; - AbstractBlockChecksumComputer(DataNode datanode) throws IOException { + AbstractBlockChecksumComputer( + DataNode datanode, + BlockChecksumOptions blockChecksumOptions) throws IOException { this.datanode = datanode; + this.blockChecksumOptions = blockChecksumOptions; } abstract void compute() throws IOException; @@ -92,6 +102,10 @@ final class BlockChecksumHelper { return datanode; } + BlockChecksumOptions getBlockChecksumOptions() { + return blockChecksumOptions; + } + InputStream getBlockInputStream(ExtendedBlock block, long seekOffset) throws IOException { return datanode.data.getBlockInputStream(block, seekOffset); @@ -155,8 +169,10 @@ final class BlockChecksumHelper { private DataChecksum checksum; BlockChecksumComputer(DataNode datanode, - ExtendedBlock block) throws IOException { - super(datanode); + ExtendedBlock block, + BlockChecksumOptions blockChecksumOptions) + throws IOException { + super(datanode, blockChecksumOptions); this.block = block; this.requestLength = block.getNumBytes(); Preconditions.checkArgument(requestLength >= 0); @@ -268,8 +284,10 @@ final class BlockChecksumHelper { static class ReplicatedBlockChecksumComputer extends BlockChecksumComputer { ReplicatedBlockChecksumComputer(DataNode datanode, - ExtendedBlock block) throws IOException { - super(datanode, block); + ExtendedBlock block, + BlockChecksumOptions blockChecksumOptions) + throws IOException { + super(datanode, block, blockChecksumOptions); } @Override @@ -277,22 +295,38 @@ final class BlockChecksumHelper { try { readHeader(); - MD5Hash md5out; - if (isPartialBlk() && getCrcPerBlock() > 0) { - md5out = checksumPartialBlock(); - } else { - md5out = checksumWholeBlock(); + BlockChecksumType type = + getBlockChecksumOptions().getBlockChecksumType(); + switch (type) { + case MD5CRC: + computeMd5Crc(); + break; + case COMPOSITE_CRC: + computeCompositeCrc(getBlockChecksumOptions().getStripeLength()); + break; + default: + throw new IOException(String.format( + "Unrecognized BlockChecksumType: %s", type)); } - setOutBytes(md5out.getDigest()); - - LOG.debug("block={}, bytesPerCRC={}, crcPerBlock={}, md5out={}", - getBlock(), getBytesPerCRC(), getCrcPerBlock(), md5out); } finally { IOUtils.closeStream(getChecksumIn()); IOUtils.closeStream(getMetadataIn()); } } + private void computeMd5Crc() throws IOException { + MD5Hash md5out; + if (isPartialBlk() && getCrcPerBlock() > 0) { + md5out = checksumPartialBlock(); + } else { + md5out = checksumWholeBlock(); + } + setOutBytes(md5out.getDigest()); + + LOG.debug("block={}, bytesPerCRC={}, crcPerBlock={}, md5out={}", + getBlock(), getBytesPerCRC(), getCrcPerBlock(), md5out); + } + private MD5Hash checksumWholeBlock() throws IOException { MD5Hash md5out = MD5Hash.digest(getChecksumIn()); return md5out; @@ -320,6 +354,56 @@ final class BlockChecksumHelper { return new MD5Hash(digester.digest()); } + + private void computeCompositeCrc(long stripeLength) throws IOException { + long checksumDataLength = + Math.min(getVisibleLength(), getRequestLength()); + if (stripeLength <= 0 || stripeLength > checksumDataLength) { + stripeLength = checksumDataLength; + } + + CrcComposer crcComposer = CrcComposer.newStripedCrcComposer( + getCrcType(), getBytesPerCRC(), stripeLength); + DataInputStream checksumIn = getChecksumIn(); + + // Whether getting the checksum for the entire block (which itself may + // not be a full block size and may have a final chunk smaller than + // getBytesPerCRC()), we begin with a number of full chunks, all of size + // getBytesPerCRC(). + long numFullChunks = checksumDataLength / getBytesPerCRC(); + crcComposer.update(checksumIn, numFullChunks, getBytesPerCRC()); + + // There may be a final partial chunk that is not full-sized. Unlike the + // MD5 case, we still consider this a "partial chunk" even if + // getRequestLength() == getVisibleLength(), since the CRC composition + // depends on the byte size of that final chunk, even if it already has a + // precomputed CRC stored in metadata. So there are two cases: + // 1. Reading only part of a block via getRequestLength(); we get the + // crcPartialBlock() explicitly. + // 2. Reading full visible length; the partial chunk already has a CRC + // stored in block metadata, so we just continue reading checksumIn. + long partialChunkSize = checksumDataLength % getBytesPerCRC(); + if (partialChunkSize > 0) { + if (isPartialBlk()) { + byte[] partialChunkCrcBytes = crcPartialBlock(); + crcComposer.update( + partialChunkCrcBytes, 0, partialChunkCrcBytes.length, + partialChunkSize); + } else { + int partialChunkCrc = checksumIn.readInt(); + crcComposer.update(partialChunkCrc, partialChunkSize); + } + } + + byte[] composedCrcs = crcComposer.digest(); + setOutBytes(composedCrcs); + if (LOG.isDebugEnabled()) { + LOG.debug( + "block={}, getBytesPerCRC={}, crcPerBlock={}, compositeCrc={}", + getBlock(), getBytesPerCRC(), getCrcPerBlock(), + CrcUtil.toMultiCrcString(composedCrcs)); + } + } } /** @@ -335,19 +419,29 @@ final class BlockChecksumHelper { private final byte[] blockIndices; private final long requestedNumBytes; - private final DataOutputBuffer md5writer = new DataOutputBuffer(); + private final DataOutputBuffer blockChecksumBuf = new DataOutputBuffer(); + + // Keeps track of the positions within blockChecksumBuf where each data + // block's checksum begins; for fixed-size block checksums this is easily + // calculated as a multiple of the checksum size, but for striped block + // CRCs, it's less error-prone to simply keep track of exact byte offsets + // before each block checksum is populated into the buffer. + private final int[] blockChecksumPositions; - BlockGroupNonStripedChecksumComputer(DataNode datanode, - StripedBlockInfo stripedBlockInfo, - long requestedNumBytes) + BlockGroupNonStripedChecksumComputer( + DataNode datanode, + StripedBlockInfo stripedBlockInfo, + long requestedNumBytes, + BlockChecksumOptions blockChecksumOptions) throws IOException { - super(datanode); + super(datanode, blockChecksumOptions); this.blockGroup = stripedBlockInfo.getBlock(); this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy(); this.datanodes = stripedBlockInfo.getDatanodes(); this.blockTokens = stripedBlockInfo.getBlockTokens(); this.blockIndices = stripedBlockInfo.getBlockIndices(); this.requestedNumBytes = requestedNumBytes; + this.blockChecksumPositions = new int[this.ecPolicy.getNumDataUnits()]; } private static class LiveBlockInfo { @@ -383,6 +477,9 @@ final class BlockChecksumHelper { } long checksumLen = 0; for (int idx = 0; idx < numDataUnits && idx < blkIndxLen; idx++) { + // Before populating the blockChecksum at this index, record the byte + // offset where it will begin. + blockChecksumPositions[idx] = blockChecksumBuf.getLength(); try { ExtendedBlock block = getInternalBlock(numDataUnits, idx); @@ -409,8 +506,75 @@ final class BlockChecksumHelper { } } - MD5Hash md5out = MD5Hash.digest(md5writer.getData()); - setOutBytes(md5out.getDigest()); + BlockChecksumType type = getBlockChecksumOptions().getBlockChecksumType(); + switch (type) { + case MD5CRC: + MD5Hash md5out = MD5Hash.digest(blockChecksumBuf.getData()); + setOutBytes(md5out.getDigest()); + break; + case COMPOSITE_CRC: + byte[] digest = reassembleNonStripedCompositeCrc(checksumLen); + setOutBytes(digest); + break; + default: + throw new IOException(String.format( + "Unrecognized BlockChecksumType: %s", type)); + } + } + + /** + * @param checksumLen The sum of bytes associated with the block checksum + * data being digested into a block-group level checksum. + */ + private byte[] reassembleNonStripedCompositeCrc(long checksumLen) + throws IOException { + int numDataUnits = ecPolicy.getNumDataUnits(); + CrcComposer crcComposer = CrcComposer.newCrcComposer( + getCrcType(), ecPolicy.getCellSize()); + + // This should hold all the cell-granularity checksums of blk0 + // followed by all cell checksums of blk1, etc. We must unstripe the + // cell checksums in order of logical file bytes. Also, note that the + // length of this array may not equal the the number of actually valid + // bytes in the buffer (blockChecksumBuf.getLength()). + byte[] flatBlockChecksumData = blockChecksumBuf.getData(); + + // Initialize byte-level cursors to where each block's checksum begins + // inside the combined flattened buffer. + int[] blockChecksumCursors = new int[numDataUnits]; + for (int idx = 0; idx < numDataUnits; ++idx) { + blockChecksumCursors[idx] = blockChecksumPositions[idx]; + } + + // Reassemble cell-level CRCs in the right order. + long numFullCells = checksumLen / ecPolicy.getCellSize(); + for (long cellIndex = 0; cellIndex < numFullCells; ++cellIndex) { + int blockIndex = (int) (cellIndex % numDataUnits); + int checksumCursor = blockChecksumCursors[blockIndex]; + int cellCrc = CrcUtil.readInt( + flatBlockChecksumData, checksumCursor); + blockChecksumCursors[blockIndex] += 4; + crcComposer.update(cellCrc, ecPolicy.getCellSize()); + } + if (checksumLen % ecPolicy.getCellSize() != 0) { + // Final partial cell. + int blockIndex = (int) (numFullCells % numDataUnits); + int checksumCursor = blockChecksumCursors[blockIndex]; + int cellCrc = CrcUtil.readInt( + flatBlockChecksumData, checksumCursor); + blockChecksumCursors[blockIndex] += 4; + crcComposer.update(cellCrc, checksumLen % ecPolicy.getCellSize()); + } + byte[] digest = crcComposer.digest(); + if (LOG.isDebugEnabled()) { + LOG.debug("flatBlockChecksumData.length={}, numDataUnits={}, " + + "checksumLen={}, digest={}", + flatBlockChecksumData.length, + numDataUnits, + checksumLen, + CrcUtil.toSingleCrcString(digest)); + } + return digest; } private ExtendedBlock getInternalBlock(int numDataUnits, int idx) { @@ -437,8 +601,26 @@ final class BlockChecksumHelper { LOG.debug("write to {}: {}, block={}", getDatanode(), Op.BLOCK_CHECKSUM, block); - // get block MD5 - createSender(pair).blockChecksum(block, blockToken); + // get block checksum + // A BlockGroupCheckum of type COMPOSITE_CRC uses underlying + // BlockChecksums also of type COMPOSITE_CRC but with + // stripeLength == ecPolicy.getCellSize(). + BlockChecksumOptions childOptions; + BlockChecksumType groupChecksumType = + getBlockChecksumOptions().getBlockChecksumType(); + switch (groupChecksumType) { + case MD5CRC: + childOptions = getBlockChecksumOptions(); + break; + case COMPOSITE_CRC: + childOptions = new BlockChecksumOptions( + BlockChecksumType.COMPOSITE_CRC, ecPolicy.getCellSize()); + break; + default: + throw new IOException( + "Unknown BlockChecksumType: " + groupChecksumType); + } + createSender(pair).blockChecksum(block, blockToken, childOptions); final DataTransferProtos.BlockOpResponseProto reply = DataTransferProtos.BlockOpResponseProto.parseFrom( @@ -463,10 +645,37 @@ final class BlockChecksumHelper { setOrVerifyChecksumProperties(blockIdx, checksumData.getBytesPerCrc(), checksumData.getCrcPerBlock(), ct); - //read md5 - final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray()); - md5.write(md5writer); - LOG.debug("got reply from datanode:{}, md5={}", targetDatanode, md5); + + switch (groupChecksumType) { + case MD5CRC: + //read md5 + final MD5Hash md5 = + new MD5Hash(checksumData.getBlockChecksum().toByteArray()); + md5.write(blockChecksumBuf); + LOG.debug("got reply from datanode:{}, md5={}", + targetDatanode, md5); + break; + case COMPOSITE_CRC: + BlockChecksumType returnedType = PBHelperClient.convert( + checksumData.getBlockChecksumOptions().getBlockChecksumType()); + if (returnedType != BlockChecksumType.COMPOSITE_CRC) { + throw new IOException(String.format( + "Unexpected blockChecksumType '%s', expecting COMPOSITE_CRC", + returnedType)); + } + byte[] checksumBytes = + checksumData.getBlockChecksum().toByteArray(); + blockChecksumBuf.write(checksumBytes, 0, checksumBytes.length); + if (LOG.isDebugEnabled()) { + LOG.debug("got reply from datanode:{} for blockIdx:{}, checksum:{}", + targetDatanode, blockIdx, + CrcUtil.toMultiCrcString(checksumBytes)); + } + break; + default: + throw new IOException( + "Unknown BlockChecksumType: " + groupChecksumType); + } } } @@ -489,10 +698,16 @@ final class BlockChecksumHelper { StripedReconstructionInfo stripedReconInfo = new StripedReconstructionInfo( blockGroup, ecPolicy, blockIndices, datanodes, errIndices); + BlockChecksumType groupChecksumType = + getBlockChecksumOptions().getBlockChecksumType(); final StripedBlockChecksumReconstructor checksumRecon = - new StripedBlockChecksumReconstructor( + groupChecksumType == BlockChecksumType.COMPOSITE_CRC ? + new StripedBlockChecksumCompositeCrcReconstructor( getDatanode().getErasureCodingWorker(), stripedReconInfo, - md5writer, blockLength); + blockChecksumBuf, blockLength) : + new StripedBlockChecksumMd5CrcReconstructor( + getDatanode().getErasureCodingWorker(), stripedReconInfo, + blockChecksumBuf, blockLength); checksumRecon.reconstruct(); DataChecksum checksum = checksumRecon.getChecksum(); @@ -501,8 +716,8 @@ final class BlockChecksumHelper { setOrVerifyChecksumProperties(errBlkIndex, checksum.getBytesPerChecksum(), crcPerBlock, checksum.getChecksumType()); - LOG.debug("Recalculated checksum for the block index:{}, md5={}", - errBlkIndex, checksumRecon.getMD5()); + LOG.debug("Recalculated checksum for the block index:{}, checksum={}", + errBlkIndex, checksumRecon.getDigestObject()); } private void setOrVerifyChecksumProperties(int blockIdx, int bpc, @@ -524,8 +739,16 @@ final class BlockChecksumHelper { setCrcType(ct); } else if (getCrcType() != DataChecksum.Type.MIXED && getCrcType() != ct) { - // if crc types are mixed in a file - setCrcType(DataChecksum.Type.MIXED); + BlockChecksumType groupChecksumType = + getBlockChecksumOptions().getBlockChecksumType(); + if (groupChecksumType == BlockChecksumType.COMPOSITE_CRC) { + throw new IOException(String.format( + "BlockChecksumType COMPOSITE_CRC doesn't support MIXED " + + "underlying types; previous block was %s, next block is %s", + getCrcType(), ct)); + } else { + setCrcType(DataChecksum.Type.MIXED); + } } if (blockIdx == 0) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index d0ded89..0bb1987 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -968,15 +969,16 @@ class DataXceiver extends Receiver implements Runnable { @Override public void blockChecksum(ExtendedBlock block, - Token<BlockTokenIdentifier> blockToken) + Token<BlockTokenIdentifier> blockToken, + BlockChecksumOptions blockChecksumOptions) throws IOException { updateCurrentThreadName("Getting checksum for block " + block); final DataOutputStream out = new DataOutputStream( getOutputStream()); checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM, BlockTokenIdentifier.AccessMode.READ); - BlockChecksumComputer maker = - new ReplicatedBlockChecksumComputer(datanode, block); + BlockChecksumComputer maker = new ReplicatedBlockChecksumComputer( + datanode, block, blockChecksumOptions); try { maker.compute(); @@ -987,8 +989,10 @@ class DataXceiver extends Receiver implements Runnable { .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder() .setBytesPerCrc(maker.getBytesPerCRC()) .setCrcPerBlock(maker.getCrcPerBlock()) - .setMd5(ByteString.copyFrom(maker.getOutBytes())) - .setCrcType(PBHelperClient.convert(maker.getCrcType()))) + .setBlockChecksum(ByteString.copyFrom(maker.getOutBytes())) + .setCrcType(PBHelperClient.convert(maker.getCrcType())) + .setBlockChecksumOptions( + PBHelperClient.convert(blockChecksumOptions))) .build() .writeDelimitedTo(out); out.flush(); @@ -1007,7 +1011,9 @@ class DataXceiver extends Receiver implements Runnable { @Override public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo, - final Token<BlockTokenIdentifier> blockToken, long requestedNumBytes) + final Token<BlockTokenIdentifier> blockToken, + long requestedNumBytes, + BlockChecksumOptions blockChecksumOptions) throws IOException { final ExtendedBlock block = stripedBlockInfo.getBlock(); updateCurrentThreadName("Getting checksum for block group" + @@ -1018,7 +1024,7 @@ class DataXceiver extends Receiver implements Runnable { AbstractBlockChecksumComputer maker = new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo, - requestedNumBytes); + requestedNumBytes, blockChecksumOptions); try { maker.compute(); @@ -1029,8 +1035,10 @@ class DataXceiver extends Receiver implements Runnable { .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder() .setBytesPerCrc(maker.getBytesPerCRC()) .setCrcPerBlock(maker.getCrcPerBlock()) - .setMd5(ByteString.copyFrom(maker.getOutBytes())) - .setCrcType(PBHelperClient.convert(maker.getCrcType()))) + .setBlockChecksum(ByteString.copyFrom(maker.getOutBytes())) + .setCrcType(PBHelperClient.convert(maker.getCrcType())) + .setBlockChecksumOptions( + PBHelperClient.convert(blockChecksumOptions))) .build() .writeDelimitedTo(out); out.flush(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumCompositeCrcReconstructor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumCompositeCrcReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumCompositeCrcReconstructor.java new file mode 100644 index 0000000..afcc8cb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumCompositeCrcReconstructor.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.util.CrcComposer; + +/** + * Computes striped composite CRCs over reconstructed chunk CRCs. + */ +@InterfaceAudience.Private +public class StripedBlockChecksumCompositeCrcReconstructor + extends StripedBlockChecksumReconstructor { + private final int ecPolicyCellSize; + + private byte[] digestValue; + private CrcComposer digester; + + public StripedBlockChecksumCompositeCrcReconstructor( + ErasureCodingWorker worker, + StripedReconstructionInfo stripedReconInfo, + DataOutputBuffer checksumWriter, + long requestedBlockLength) throws IOException { + super(worker, stripedReconInfo, checksumWriter, requestedBlockLength); + this.ecPolicyCellSize = stripedReconInfo.getEcPolicy().getCellSize(); + } + + @Override + public Object getDigestObject() { + return digestValue; + } + + @Override + void prepareDigester() throws IOException { + digester = CrcComposer.newStripedCrcComposer( + getChecksum().getChecksumType(), + getChecksum().getBytesPerChecksum(), + ecPolicyCellSize); + } + + @Override + void updateDigester(byte[] checksumBytes, int dataBytesPerChecksum) + throws IOException { + if (digester == null) { + throw new IOException(String.format( + "Called updatedDigester with checksumBytes.length=%d, " + + "dataBytesPerChecksum=%d but digester is null", + checksumBytes.length, dataBytesPerChecksum)); + } + digester.update( + checksumBytes, 0, checksumBytes.length, dataBytesPerChecksum); + } + + @Override + void commitDigest() throws IOException { + if (digester == null) { + throw new IOException("Called commitDigest() but digester is null"); + } + digestValue = digester.digest(); + getChecksumWriter().write(digestValue, 0, digestValue.length); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumMd5CrcReconstructor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumMd5CrcReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumMd5CrcReconstructor.java new file mode 100644 index 0000000..2f809b8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumMd5CrcReconstructor.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import java.io.IOException; +import java.security.MessageDigest; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.MD5Hash; + +/** + * Computes running MD5-of-CRC over reconstructed chunk CRCs. + */ +@InterfaceAudience.Private +public class StripedBlockChecksumMd5CrcReconstructor + extends StripedBlockChecksumReconstructor { + private MD5Hash md5; + private MessageDigest digester; + + public StripedBlockChecksumMd5CrcReconstructor(ErasureCodingWorker worker, + StripedReconstructionInfo stripedReconInfo, + DataOutputBuffer checksumWriter, + long requestedBlockLength) throws IOException { + super(worker, stripedReconInfo, checksumWriter, requestedBlockLength); + } + + @Override + public Object getDigestObject() { + return md5; + } + + @Override + void prepareDigester() throws IOException { + digester = MD5Hash.getDigester(); + } + + @Override + void updateDigester(byte[] checksumBytes, int dataBytesPerChecksum) + throws IOException { + if (digester == null) { + throw new IOException(String.format( + "Called updatedDigester with checksumBytes.length=%d, " + + "dataBytesPerChecksum=%d but digester is null", + checksumBytes.length, dataBytesPerChecksum)); + } + digester.update(checksumBytes, 0, checksumBytes.length); + } + + @Override + void commitDigest() throws IOException { + if (digester == null) { + throw new IOException("Called commitDigest() but digester is null"); + } + byte[] digest = digester.digest(); + md5 = new MD5Hash(digest); + md5.write(getChecksumWriter()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java index d530a8e..b2e6496 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java @@ -19,12 +19,10 @@ package org.apache.hadoop.hdfs.server.datanode.erasurecode; import java.io.IOException; import java.nio.ByteBuffer; -import java.security.MessageDigest; import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.MD5Hash; /** * StripedBlockChecksumReconstructor reconstruct one or more missed striped @@ -33,18 +31,17 @@ import org.apache.hadoop.io.MD5Hash; * using the newly reconstructed block. */ @InterfaceAudience.Private -public class StripedBlockChecksumReconstructor extends StripedReconstructor { - +public abstract class StripedBlockChecksumReconstructor + extends StripedReconstructor { private ByteBuffer targetBuffer; private final byte[] targetIndices; private byte[] checksumBuf; private DataOutputBuffer checksumWriter; - private MD5Hash md5; private long checksumDataLen; private long requestedLen; - public StripedBlockChecksumReconstructor(ErasureCodingWorker worker, + protected StripedBlockChecksumReconstructor(ErasureCodingWorker worker, StripedReconstructionInfo stripedReconInfo, DataOutputBuffer checksumWriter, long requestedBlockLength) throws IOException { @@ -72,8 +69,9 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor { checksumBuf = new byte[tmpLen]; } + @Override public void reconstruct() throws IOException { - MessageDigest digester = MD5Hash.getDigester(); + prepareDigester(); long maxTargetLength = getMaxTargetLength(); try { while (requestedLen > 0 && getPositionInBlock() < maxTargetLength) { @@ -88,24 +86,54 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor { reconstructTargets(toReconstructLen); // step3: calculate checksum - checksumDataLen += checksumWithTargetOutput(targetBuffer.array(), - toReconstructLen, digester); + checksumDataLen += checksumWithTargetOutput( + targetBuffer.array(), toReconstructLen); updatePositionInBlock(toReconstructLen); requestedLen -= toReconstructLen; clearBuffers(); } - byte[] digest = digester.digest(); - md5 = new MD5Hash(digest); - md5.write(checksumWriter); + commitDigest(); } finally { cleanup(); } } - private long checksumWithTargetOutput(byte[] outputData, int toReconstructLen, - MessageDigest digester) throws IOException { + /** + * Should return a representation of a completed/reconstructed digest which + * is suitable for debug printing. + */ + public abstract Object getDigestObject(); + + /** + * This will be called before starting reconstruction. + */ + abstract void prepareDigester() throws IOException; + + /** + * This will be called repeatedly with chunked checksums computed in-flight + * over reconstructed data. + * + * @param dataBytesPerChecksum the number of underlying data bytes + * corresponding to each checksum inside {@code checksumBytes}. + */ + abstract void updateDigester(byte[] checksumBytes, int dataBytesPerChecksum) + throws IOException; + + /** + * This will be called when reconstruction of entire requested length is + * complete and any final digests should be committed to + * implementation-specific output fields. + */ + abstract void commitDigest() throws IOException; + + protected DataOutputBuffer getChecksumWriter() { + return checksumWriter; + } + + private long checksumWithTargetOutput(byte[] outputData, int toReconstructLen) + throws IOException { long checksumDataLength = 0; // Calculate partial block checksum. There are two cases. // case-1) length of data bytes which is fraction of bytesPerCRC @@ -128,7 +156,7 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor { checksumBuf = new byte[checksumRemaining]; getChecksum().calculateChunkedSums(outputData, dataOffset, remainingLen, checksumBuf, 0); - digester.update(checksumBuf, 0, checksumBuf.length); + updateDigester(checksumBuf, getChecksum().getBytesPerChecksum()); checksumDataLength = checksumBuf.length; dataOffset = remainingLen; } @@ -139,7 +167,7 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor { getChecksum().reset(); getChecksum().update(outputData, dataOffset, partialLength); getChecksum().writeValue(partialCrc, 0, true); - digester.update(partialCrc); + updateDigester(partialCrc, partialLength); checksumDataLength += partialCrc.length; } @@ -151,7 +179,7 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor { outputData.length, checksumBuf, 0); // updates digest using the checksum array of bytes - digester.update(checksumBuf, 0, checksumBuf.length); + updateDigester(checksumBuf, getChecksum().getBytesPerChecksum()); return checksumBuf.length; } @@ -176,10 +204,6 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor { targetBuffer.clear(); } - public MD5Hash getMD5() { - return md5; - } - public long getChecksumDataLen() { return checksumDataLen; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java index ce1254c..29c0078 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java @@ -79,6 +79,7 @@ class StripedBlockReconstructor extends StripedReconstructor } } + @Override void reconstruct() throws IOException { while (getPositionInBlock() < getMaxTargetLength()) { DataNodeFaultInjector.get().stripedBlockReconstruction(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 1e83325..f8cce60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3541,6 +3541,17 @@ </property> <property> + <name>dfs.checksum.combine.mode</name> + <value>MD5MD5CRC</value> + <description> + Defines how lower-level chunk/block checksums are combined into file-level + checksums; the original MD5MD5CRC mode is not comparable between files + with different block layouts, while modes like COMPOSITE_CRC are + comparable independently of block layout. + </description> +</property> + +<property> <name>dfs.client.block.write.locateFollowingBlock.retries</name> <value>5</value> <description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 4f9f260..adede5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -857,6 +857,20 @@ public class DFSTestUtil { } } + /* Write the given bytes to the given file using the specified blockSize */ + public static void writeFile( + FileSystem fs, Path p, byte[] bytes, long blockSize) + throws IOException { + if (fs.exists(p)) { + fs.delete(p, true); + } + try (InputStream is = new ByteArrayInputStream(bytes); + FSDataOutputStream os = fs.create( + p, false, 4096, fs.getDefaultReplication(p), blockSize)) { + IOUtils.copyBytes(is, os, bytes.length); + } + } + /* Write the given string to the given file */ public static void writeFile(FileSystem fs, Path p, String s) throws IOException { @@ -901,14 +915,27 @@ public class DFSTestUtil { */ public static void appendFileNewBlock(DistributedFileSystem fs, Path p, int length) throws IOException { - assert fs.exists(p); assert length >= 0; byte[] toAppend = new byte[length]; Random random = new Random(); random.nextBytes(toAppend); + appendFileNewBlock(fs, p, toAppend); + } + + /** + * Append specified bytes to a given file, starting with new block. + * + * @param fs The file system + * @param p Path of the file to append + * @param bytes The data to append + * @throws IOException + */ + public static void appendFileNewBlock(DistributedFileSystem fs, + Path p, byte[] bytes) throws IOException { + assert fs.exists(p); try (FSDataOutputStream out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null)) { - out.write(toAppend); + out.write(bytes); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java index d201ce1..0ff2d4b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -30,7 +31,9 @@ import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -74,6 +77,9 @@ public class TestFileChecksum { private String stripedFile2 = ecDir + "/stripedFileChecksum2"; private String replicatedFile = "/replicatedFileChecksum"; + @Rule + public ExpectedException exception = ExpectedException.none(); + @Before public void setup() throws IOException { int numDNs = dataBlocks + parityBlocks + 2; @@ -83,6 +89,7 @@ public class TestFileChecksum { false); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + customizeConf(conf); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); Path ecPath = new Path(ecDir); cluster.getFileSystem().mkdir(ecPath, FsPermission.getDirDefault()); @@ -106,6 +113,39 @@ public class TestFileChecksum { } } + /** + * Subclasses may customize the conf to run the full set of tests under + * different conditions. + */ + protected void customizeConf(Configuration preparedConf) { + } + + /** + * Subclasses may override this method to indicate whether equivalent files + * in striped and replicated formats are expected to have the same + * overall FileChecksum. + */ + protected boolean expectComparableStripedAndReplicatedFiles() { + return false; + } + + /** + * Subclasses may override this method to indicate whether equivalent files + * in replicated formats with different block sizes are expected to have the + * same overall FileChecksum. + */ + protected boolean expectComparableDifferentBlockSizeReplicatedFiles() { + return false; + } + + /** + * Subclasses may override this method to indicate whether checksums are + * supported for files where different blocks have different bytesPerCRC. + */ + protected boolean expectSupportForSingleFileMixedBytesPerChecksum() { + return false; + } + @Test(timeout = 90000) public void testStripedFileChecksum1() throws Exception { int length = 0; @@ -182,7 +222,30 @@ public class TestFileChecksum { FileChecksum replicatedFileChecksum = getFileChecksum(replicatedFile, 10, false); - Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum)); + if (expectComparableStripedAndReplicatedFiles()) { + Assert.assertEquals(stripedFileChecksum1, replicatedFileChecksum); + } else { + Assert.assertNotEquals(stripedFileChecksum1, replicatedFileChecksum); + } + } + + @Test(timeout = 90000) + public void testDifferentBlockSizeReplicatedFileChecksum() throws Exception { + byte[] fileData = StripedFileTestUtil.generateBytes(fileSize); + String replicatedFile1 = "/replicatedFile1"; + String replicatedFile2 = "/replicatedFile2"; + DFSTestUtil.writeFile( + fs, new Path(replicatedFile1), fileData, blockSize); + DFSTestUtil.writeFile( + fs, new Path(replicatedFile2), fileData, blockSize / 2); + FileChecksum checksum1 = getFileChecksum(replicatedFile1, -1, false); + FileChecksum checksum2 = getFileChecksum(replicatedFile2, -1, false); + + if (expectComparableDifferentBlockSizeReplicatedFiles()) { + Assert.assertEquals(checksum1, checksum2); + } else { + Assert.assertNotEquals(checksum1, checksum2); + } } @Test(timeout = 90000) @@ -471,6 +534,40 @@ public class TestFileChecksum { bytesPerCRC - 1); } + @Test(timeout = 90000) + public void testMixedBytesPerChecksum() throws Exception { + int fileLength = bytesPerCRC * 3; + byte[] fileData = StripedFileTestUtil.generateBytes(fileLength); + String replicatedFile1 = "/replicatedFile1"; + + // Split file into two parts. + byte[] fileDataPart1 = new byte[bytesPerCRC * 2]; + System.arraycopy(fileData, 0, fileDataPart1, 0, fileDataPart1.length); + byte[] fileDataPart2 = new byte[fileData.length - fileDataPart1.length]; + System.arraycopy( + fileData, fileDataPart1.length, fileDataPart2, 0, fileDataPart2.length); + + DFSTestUtil.writeFile(fs, new Path(replicatedFile1), fileDataPart1); + + // Modify bytesPerCRC for second part that we append as separate block. + conf.setInt( + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytesPerCRC / 2); + DFSTestUtil.appendFileNewBlock( + ((DistributedFileSystem) FileSystem.newInstance(conf)), + new Path(replicatedFile1), fileDataPart2); + + if (expectSupportForSingleFileMixedBytesPerChecksum()) { + String replicatedFile2 = "/replicatedFile2"; + DFSTestUtil.writeFile(fs, new Path(replicatedFile2), fileData); + FileChecksum checksum1 = getFileChecksum(replicatedFile1, -1, false); + FileChecksum checksum2 = getFileChecksum(replicatedFile2, -1, false); + Assert.assertEquals(checksum1, checksum2); + } else { + exception.expect(IOException.class); + FileChecksum checksum = getFileChecksum(replicatedFile1, -1, false); + } + } + private FileChecksum getFileChecksum(String filePath, int range, boolean killDn) throws Exception { int dnIdxToDie = -1; @@ -537,4 +634,4 @@ public class TestFileChecksum { return -1; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksumCompositeCrc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksumCompositeCrc.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksumCompositeCrc.java new file mode 100644 index 0000000..87fb7da --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksumCompositeCrc.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; + +/** + * End-to-end tests for COMPOSITE_CRC combine mode. + */ +public class TestFileChecksumCompositeCrc extends TestFileChecksum { + @Override + protected void customizeConf(Configuration conf) { + conf.set( + HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_KEY, "COMPOSITE_CRC"); + } + + @Override + protected boolean expectComparableStripedAndReplicatedFiles() { + return true; + } + + @Override + protected boolean expectComparableDifferentBlockSizeReplicatedFiles() { + return true; + } + + @Override + protected boolean expectSupportForSingleFileMixedBytesPerChecksum() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index da56c15..22f84c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockChecksumType; import org.apache.hadoop.hdfs.protocol.BlockType; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -683,6 +684,19 @@ public class TestPBHelper { } @Test + public void testBlockChecksumTypeProto() { + assertEquals(BlockChecksumType.MD5CRC, + PBHelperClient.convert(HdfsProtos.BlockChecksumTypeProto.MD5CRC)); + assertEquals(BlockChecksumType.COMPOSITE_CRC, + PBHelperClient.convert( + HdfsProtos.BlockChecksumTypeProto.COMPOSITE_CRC)); + assertEquals(PBHelperClient.convert(BlockChecksumType.MD5CRC), + HdfsProtos.BlockChecksumTypeProto.MD5CRC); + assertEquals(PBHelperClient.convert(BlockChecksumType.COMPOSITE_CRC), + HdfsProtos.BlockChecksumTypeProto.COMPOSITE_CRC); + } + + @Test public void testAclEntryProto() { // All fields populated. AclEntry e1 = new AclEntry.Builder().setName("test") http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java index bbe7e8d..30e4683 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java @@ -73,20 +73,42 @@ public class TestCopyMapper { private static final String SOURCE_PATH = "/tmp/source"; private static final String TARGET_PATH = "/tmp/target"; - private static Configuration configuration; - @BeforeClass public static void setup() throws Exception { - configuration = getConfigurationForCluster(); - cluster = new MiniDFSCluster.Builder(configuration) + Configuration configuration = getConfigurationForCluster(); + setCluster(new MiniDFSCluster.Builder(configuration) .numDataNodes(1) .format(true) - .build(); + .build()); + } + + /** + * Subclasses may override this method to indicate whether copying files with + * non-default block sizes without setting BLOCKSIZE as a preserved attribute + * is expected to succeed with CRC checks enabled. + */ + protected boolean expectDifferentBlockSizesMultipleBlocksToSucceed() { + return false; + } + + /** + * Subclasses may override this method to indicate whether copying files with + * non-default bytes-per-crc without setting CHECKSUMTYPE as a preserved + * attribute is expected to succeed with CRC checks enabled. + */ + protected boolean expectDifferentBytesPerCrcToSucceed() { + return false; + } + + protected static void setCluster(MiniDFSCluster c) { + cluster = c; } - private static Configuration getConfigurationForCluster() throws IOException { + protected static Configuration getConfigurationForCluster() + throws IOException { Configuration configuration = new Configuration(); - System.setProperty("test.build.data", "target/tmp/build/TEST_COPY_MAPPER/data"); + System.setProperty( + "test.build.data", "target/tmp/build/TEST_COPY_MAPPER/data"); configuration.set("hadoop.log.dir", "target/tmp"); configuration.set("dfs.namenode.fs-limits.min-block-size", "0"); LOG.debug("fs.default.name == " + configuration.get("fs.default.name")); @@ -136,7 +158,8 @@ public class TestCopyMapper { } } - private static void createSourceDataWithDifferentBlockSize() throws Exception { + private static void createSourceDataWithDifferentBlockSize() + throws Exception { mkdirs(SOURCE_PATH + "/1"); mkdirs(SOURCE_PATH + "/2"); mkdirs(SOURCE_PATH + "/2/3/4"); @@ -163,6 +186,21 @@ public class TestCopyMapper { 512)); } + private static void createSourceDataWithDifferentBytesPerCrc() + throws Exception { + mkdirs(SOURCE_PATH + "/1"); + mkdirs(SOURCE_PATH + "/2"); + mkdirs(SOURCE_PATH + "/2/3/4"); + mkdirs(SOURCE_PATH + "/2/3"); + mkdirs(SOURCE_PATH + "/5"); + touchFile(SOURCE_PATH + "/5/6", false, + new ChecksumOpt(DataChecksum.Type.CRC32C, 32)); + mkdirs(SOURCE_PATH + "/7"); + mkdirs(SOURCE_PATH + "/7/8"); + touchFile(SOURCE_PATH + "/7/8/9", false, + new ChecksumOpt(DataChecksum.Type.CRC32C, 64)); + } + private static void mkdirs(String path) throws Exception { FileSystem fileSystem = cluster.getFileSystem(); final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(), @@ -281,7 +319,7 @@ public class TestCopyMapper { path)), context); } - verifyCopy(fs, false); + verifyCopy(fs, false, true); // verify that we only copied new appended data Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE * 2, stubContext .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED) @@ -317,6 +355,11 @@ public class TestCopyMapper { EnumSet<DistCpOptions.FileAttribute> fileAttributes = EnumSet.of(DistCpOptions.FileAttribute.REPLICATION); if (preserveChecksum) { + // We created source files with both different checksum types and + // non-default block sizes; here we don't explicitly add BLOCKSIZE + // as a preserved attribute, but the current behavior is that + // preserving CHECKSUMTYPE also automatically implies preserving + // BLOCKSIZE. fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE); } configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(), @@ -339,7 +382,7 @@ public class TestCopyMapper { } // Check that the maps worked. - verifyCopy(fs, preserveChecksum); + verifyCopy(fs, preserveChecksum, true); Assert.assertEquals(numFiles, stubContext.getReporter() .getCounter(CopyMapper.Counter.COPY).getValue()); Assert.assertEquals(numDirs, stubContext.getReporter() @@ -361,7 +404,8 @@ public class TestCopyMapper { } } - private void verifyCopy(FileSystem fs, boolean preserveChecksum) + private void verifyCopy( + FileSystem fs, boolean preserveChecksum, boolean preserveReplication) throws Exception { for (Path path : pathList) { final Path targetPath = new Path(path.toString().replaceAll(SOURCE_PATH, @@ -370,8 +414,10 @@ public class TestCopyMapper { Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path)); FileStatus sourceStatus = fs.getFileStatus(path); FileStatus targetStatus = fs.getFileStatus(targetPath); - Assert.assertEquals(sourceStatus.getReplication(), - targetStatus.getReplication()); + if (preserveReplication) { + Assert.assertEquals(sourceStatus.getReplication(), + targetStatus.getReplication()); + } if (preserveChecksum) { Assert.assertEquals(sourceStatus.getBlockSize(), targetStatus.getBlockSize()); @@ -505,7 +551,7 @@ public class TestCopyMapper { @Override public FileSystem run() { try { - return FileSystem.get(configuration); + return FileSystem.get(cluster.getConfiguration(0)); } catch (IOException e) { LOG.error("Exception encountered ", e); Assert.fail("Test failed: " + e.getMessage()); @@ -574,7 +620,7 @@ public class TestCopyMapper { @Override public FileSystem run() { try { - return FileSystem.get(configuration); + return FileSystem.get(cluster.getConfiguration(0)); } catch (IOException e) { LOG.error("Exception encountered ", e); Assert.fail("Test failed: " + e.getMessage()); @@ -649,7 +695,7 @@ public class TestCopyMapper { @Override public FileSystem run() { try { - return FileSystem.get(configuration); + return FileSystem.get(cluster.getConfiguration(0)); } catch (IOException e) { LOG.error("Exception encountered ", e); Assert.fail("Test failed: " + e.getMessage()); @@ -730,7 +776,7 @@ public class TestCopyMapper { @Override public FileSystem run() { try { - return FileSystem.get(configuration); + return FileSystem.get(cluster.getConfiguration(0)); } catch (IOException e) { LOG.error("Exception encountered ", e); Assert.fail("Test failed: " + e.getMessage()); @@ -887,7 +933,7 @@ public class TestCopyMapper { @Override public FileSystem run() { try { - return FileSystem.get(configuration); + return FileSystem.get(cluster.getConfiguration(0)); } catch (IOException e) { LOG.error("Exception encountered when get FileSystem.", e); throw new RuntimeException(e); @@ -938,12 +984,13 @@ public class TestCopyMapper { } @Test(timeout=40000) - public void testCopyFailOnBlockSizeDifference() throws Exception { + public void testCopyWithDifferentBlockSizes() throws Exception { try { deleteState(); createSourceDataWithDifferentBlockSize(); FileSystem fs = cluster.getFileSystem(); + CopyMapper copyMapper = new CopyMapper(); StubContext stubContext = new StubContext(getConfiguration(), null, 0); Mapper<Text, CopyListingFileStatus, Text, Text>.Context context @@ -959,17 +1006,79 @@ public class TestCopyMapper { for (Path path : pathList) { final FileStatus fileStatus = fs.getFileStatus(path); - copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), - path)), new CopyListingFileStatus(fileStatus), context); + copyMapper.map( + new Text( + DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)), + new CopyListingFileStatus(fileStatus), context); } - Assert.fail("Copy should have failed because of block-size difference."); + if (expectDifferentBlockSizesMultipleBlocksToSucceed()) { + verifyCopy(fs, false, false); + } else { + Assert.fail( + "Copy should have failed because of block-size difference."); + } + } catch (Exception exception) { + if (expectDifferentBlockSizesMultipleBlocksToSucceed()) { + throw exception; + } else { + // Check that the exception suggests the use of -pb/-skipcrccheck. + // This could be refactored to use LambdaTestUtils if we add support + // for listing multiple different independent substrings to expect + // in the exception message and add support for LambdaTestUtils to + // inspect the transitive cause and/or suppressed exceptions as well. + Throwable cause = exception.getCause().getCause(); + GenericTestUtils.assertExceptionContains("-pb", cause); + GenericTestUtils.assertExceptionContains("-skipcrccheck", cause); + } } - catch (IOException exception) { - // Check that the exception suggests the use of -pb/-skipcrccheck. - Throwable cause = exception.getCause().getCause(); - GenericTestUtils.assertExceptionContains("-pb", cause); - GenericTestUtils.assertExceptionContains("-skipcrccheck", cause); + } + + @Test(timeout=40000) + public void testCopyWithDifferentBytesPerCrc() throws Exception { + try { + deleteState(); + createSourceDataWithDifferentBytesPerCrc(); + + FileSystem fs = cluster.getFileSystem(); + + CopyMapper copyMapper = new CopyMapper(); + StubContext stubContext = new StubContext(getConfiguration(), null, 0); + Mapper<Text, CopyListingFileStatus, Text, Text>.Context context + = stubContext.getContext(); + + Configuration configuration = context.getConfiguration(); + EnumSet<DistCpOptions.FileAttribute> fileAttributes + = EnumSet.noneOf(DistCpOptions.FileAttribute.class); + configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(), + DistCpUtils.packAttributes(fileAttributes)); + + copyMapper.setup(context); + + for (Path path : pathList) { + final FileStatus fileStatus = fs.getFileStatus(path); + copyMapper.map( + new Text( + DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)), + new CopyListingFileStatus(fileStatus), context); + } + + if (expectDifferentBytesPerCrcToSucceed()) { + verifyCopy(fs, false, false); + } else { + Assert.fail( + "Copy should have failed because of bytes-per-crc difference."); + } + } catch (Exception exception) { + if (expectDifferentBytesPerCrcToSucceed()) { + throw exception; + } else { + // This could be refactored to use LambdaTestUtils if we add support + // for LambdaTestUtils to inspect the transitive cause and/or + // suppressed exceptions as well. + Throwable cause = exception.getCause().getCause(); + GenericTestUtils.assertExceptionContains("mismatch", cause); + } } } @@ -980,6 +1089,7 @@ public class TestCopyMapper { createSourceData(); FileSystem fs = cluster.getFileSystem(); + CopyMapper copyMapper = new CopyMapper(); StubContext stubContext = new StubContext(getConfiguration(), null, 0); Mapper<Text, CopyListingFileStatus, Text, Text>.Context context @@ -1010,6 +1120,12 @@ public class TestCopyMapper { final FileStatus source = fs.getFileStatus(path); final FileStatus target = fs.getFileStatus(targetPath); if (!source.isDirectory() ) { + // The reason the checksum check succeeds despite block sizes not + // matching between the two is that when only one block is ever + // written (partial or complete), the crcPerBlock is not included + // in the FileChecksum algorithmName. If we had instead written + // a large enough file to exceed the blocksize, then the copy + // would not have succeeded. Assert.assertTrue(preserve || source.getBlockSize() != target.getBlockSize()); Assert.assertTrue(preserve || @@ -1020,8 +1136,7 @@ public class TestCopyMapper { source.getReplication() == target.getReplication()); } } - } - catch (Exception e) { + } catch (Exception e) { Assert.assertTrue("Unexpected exception: " + e.getMessage(), false); e.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c9cdad6/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapperCompositeCrc.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapperCompositeCrc.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapperCompositeCrc.java new file mode 100644 index 0000000..6ed86e3 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapperCompositeCrc.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; + +import org.junit.BeforeClass; + +/** + * End-to-end tests for COMPOSITE_CRC combine mode. + */ +public class TestCopyMapperCompositeCrc extends TestCopyMapper { + @BeforeClass + public static void setup() throws Exception { + Configuration configuration = TestCopyMapper.getConfigurationForCluster(); + configuration.set( + HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_KEY, "COMPOSITE_CRC"); + TestCopyMapper.setCluster(new MiniDFSCluster.Builder(configuration) + .numDataNodes(1) + .format(true) + .build()); + } + + @Override + protected boolean expectDifferentBlockSizesMultipleBlocksToSucceed() { + return true; + } + + @Override + protected boolean expectDifferentBytesPerCrcToSucceed() { + return true; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org