Repository: hadoop Updated Branches: refs/heads/branch-2 4c6a1509c -> d2d038b0a
HDFS-4660. Block corruption can happen during pipeline recovery. Contributed by Kihwal Lee. (cherry picked from commit c74517c46bf00af408ed866b6577623cdec02de1) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d2d038b0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d2d038b0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d2d038b0 Branch: refs/heads/branch-2 Commit: d2d038b0a0599175db2e7e28174da6e4edac9a25 Parents: 4c6a150 Author: Kihwal Lee <kih...@apache.org> Authored: Tue Jun 16 15:42:00 2015 -0500 Committer: Kihwal Lee <kih...@apache.org> Committed: Tue Jun 16 15:42:00 2015 -0500 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hdfs/server/datanode/BlockReceiver.java | 126 ++++++++++++++----- 2 files changed, 96 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2d038b0/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 16f880d..dd6aff7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -689,6 +689,8 @@ Release 2.7.1 - UNRELEASED HDFS-8597. Fix TestFSImage#testZeroBlockSize on Windows. (Xiaoyu Yao) + HDFS-4660. Block corruption can happen during pipeline recovery (kihwal) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2d038b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index c46892d..2468f43 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -588,29 +588,59 @@ class BlockReceiver implements Closeable { try { long onDiskLen = replicaInfo.getBytesOnDisk(); if (onDiskLen<offsetInBlock) { - //finally write to the disk : - - if (onDiskLen % bytesPerChecksum != 0) { - // prepare to overwrite last checksum - adjustCrcFilePosition(); + // Normally the beginning of an incoming packet is aligned with the + // existing data on disk. If the beginning packet data offset is not + // checksum chunk aligned, the end of packet will not go beyond the + // next chunk boundary. + // When a failure-recovery is involved, the client state and the + // the datanode state may not exactly agree. I.e. the client may + // resend part of data that is already on disk. Correct number of + // bytes should be skipped when writing the data and checksum + // buffers out to disk. + long partialChunkSizeOnDisk = onDiskLen % bytesPerChecksum; + boolean alignedOnDisk = partialChunkSizeOnDisk == 0; + boolean alignedInPacket = firstByteInBlock % bytesPerChecksum == 0; + + // Since data is always appended, not overwritten, partial CRC + // recalculation is necessary if the on-disk data is not chunk- + // aligned, regardless of whether the beginning of the data in + // the packet is chunk-aligned. + boolean doPartialCrc = !alignedOnDisk && !shouldNotWriteChecksum; + + // If this is a partial chunk, then verify that this is the only + // chunk in the packet. If the starting offset is not chunk + // aligned, the packet should terminate at or before the next + // chunk boundary. + if (!alignedInPacket && len > bytesPerChecksum) { + throw new IOException("Unexpected packet data length for " + + block + " from " + inAddr + ": a partial chunk must be " + + " sent in an individual packet (data length = " + len + + " > bytesPerChecksum = " + bytesPerChecksum + ")"); } - - // If this is a partial chunk, then read in pre-existing checksum + + // If the last portion of the block file is not a full chunk, + // then read in pre-existing partial data chunk and recalculate + // the checksum so that the checksum calculation can continue + // from the right state. Checksum partialCrc = null; - if (!shouldNotWriteChecksum && firstByteInBlock % bytesPerChecksum != 0) { + if (doPartialCrc) { if (LOG.isDebugEnabled()) { LOG.debug("receivePacket for " + block - + ": bytesPerChecksum=" + bytesPerChecksum - + " does not divide firstByteInBlock=" + firstByteInBlock); + + ": previous write did not end at the chunk boundary." + + " onDiskLen=" + onDiskLen); } long offsetInChecksum = BlockMetadataHeader.getHeaderSize() + onDiskLen / bytesPerChecksum * checksumSize; partialCrc = computePartialChunkCrc(onDiskLen, offsetInChecksum); } + // The data buffer position where write will begin. If the packet + // data and on-disk data have no overlap, this will not be at the + // beginning of the buffer. int startByteToDisk = (int)(onDiskLen-firstByteInBlock) + dataBuf.arrayOffset() + dataBuf.position(); + // Actual number of data bytes to write. int numBytesToDisk = (int)(offsetInBlock-onDiskLen); // Write data to disk. @@ -625,31 +655,63 @@ class BlockReceiver implements Closeable { final byte[] lastCrc; if (shouldNotWriteChecksum) { lastCrc = null; - } else if (partialCrc != null) { - // If this is a partial chunk, then verify that this is the only - // chunk in the packet. Calculate new crc for this chunk. - if (len > bytesPerChecksum) { - throw new IOException("Unexpected packet data length for " - + block + " from " + inAddr + ": a partial chunk must be " - + " sent in an individual packet (data length = " + len - + " > bytesPerChecksum = " + bytesPerChecksum + ")"); + } else { + int skip = 0; + byte[] crcBytes = null; + + // First, overwrite the partial crc at the end, if necessary. + if (doPartialCrc) { // not chunk-aligned on disk + // Calculate new crc for this chunk. + int bytesToReadForRecalc = + (int)(bytesPerChecksum - partialChunkSizeOnDisk); + if (numBytesToDisk < bytesToReadForRecalc) { + bytesToReadForRecalc = numBytesToDisk; + } + + partialCrc.update(dataBuf.array(), startByteToDisk, + bytesToReadForRecalc); + byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, + checksumSize); + crcBytes = copyLastChunkChecksum(buf, checksumSize, buf.length); + // prepare to overwrite last checksum + adjustCrcFilePosition(); + checksumOut.write(buf); + if(LOG.isDebugEnabled()) { + LOG.debug("Writing out partial crc for data len " + len + + ", skip=" + skip); + } + skip++; // For the partial chunk that was just read. } - partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk); - byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize); - lastCrc = copyLastChunkChecksum(buf, checksumSize, buf.length); - checksumOut.write(buf); - if(LOG.isDebugEnabled()) { - LOG.debug("Writing out partial crc for data len " + len); + + // Determine how many checksums need to be skipped up to the last + // boundary. The checksum after the boundary was already counted + // above. Only count the number of checksums skipped up to the + // boundary here. + long lastChunkBoundary = onDiskLen - (onDiskLen%bytesPerChecksum); + long skippedDataBytes = lastChunkBoundary - firstByteInBlock; + + if (skippedDataBytes > 0) { + skip += (int)(skippedDataBytes / bytesPerChecksum) + + ((skippedDataBytes % bytesPerChecksum == 0) ? 0 : 1); } - partialCrc = null; - } else { - // write checksum + skip *= checksumSize; // Convert to number of bytes + + // write the rest of checksum final int offset = checksumBuf.arrayOffset() + - checksumBuf.position(); - final int end = offset + checksumLen; - lastCrc = copyLastChunkChecksum(checksumBuf.array(), checksumSize, - end); - checksumOut.write(checksumBuf.array(), offset, checksumLen); + checksumBuf.position() + skip; + final int end = offset + checksumLen - skip; + // If offset > end, there is no more checksum to write. + // I.e. a partial chunk checksum rewrite happened and there is no + // more to write after that. + if (offset > end) { + assert crcBytes != null; + lastCrc = crcBytes; + } else { + final int remainingBytes = checksumLen - skip; + lastCrc = copyLastChunkChecksum(checksumBuf.array(), + checksumSize, end); + checksumOut.write(checksumBuf.array(), offset, remainingBytes); + } } /// flush entire packet, sync if requested