HDFS-7056. Snapshot support for truncate. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/453a1754 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/453a1754 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/453a1754
Branch: refs/heads/truncate Commit: 453a175416a1f8d43b84d8ca5580fe0afba78d73 Parents: 7edc2d2 Author: Konstantin V Shvachko <s...@apache.org> Authored: Tue Dec 23 04:05:32 2014 -0800 Committer: Konstantin V Shvachko <s...@apache.org> Committed: Mon Jan 12 21:41:11 2015 -0800 ---------------------------------------------------------------------- .../hadoop/hdfs/protocol/ClientProtocol.java | 2 +- ...rDatanodeProtocolServerSideTranslatorPB.java | 6 +- .../InterDatanodeProtocolTranslatorPB.java | 5 +- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 13 +- .../BlockInfoUnderConstruction.java | 20 +- .../server/blockmanagement/BlockManager.java | 5 +- .../server/blockmanagement/DatanodeManager.java | 32 +- .../hdfs/server/common/HdfsServerConstants.java | 7 - .../hadoop/hdfs/server/datanode/DataNode.java | 37 +- .../server/datanode/fsdataset/FsDatasetSpi.java | 2 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 81 ++- .../server/namenode/FSDirStatAndListingOp.java | 2 +- .../hdfs/server/namenode/FSDirectory.java | 36 +- .../hadoop/hdfs/server/namenode/FSEditLog.java | 6 +- .../hdfs/server/namenode/FSEditLogLoader.java | 3 +- .../hdfs/server/namenode/FSEditLogOp.java | 22 + .../hdfs/server/namenode/FSNamesystem.java | 203 +++++-- .../hadoop/hdfs/server/namenode/INode.java | 15 +- .../hadoop/hdfs/server/namenode/INodeFile.java | 165 ++++-- .../server/namenode/NameNodeLayoutVersion.java | 5 +- .../snapshot/AbstractINodeDiffList.java | 15 +- .../snapshot/FSImageFormatPBSnapshot.java | 23 + .../hdfs/server/namenode/snapshot/FileDiff.java | 42 +- .../server/namenode/snapshot/FileDiffList.java | 98 ++++ .../snapshot/FileWithSnapshotFeature.java | 24 +- .../server/protocol/BlockRecoveryCommand.java | 21 +- .../server/protocol/InterDatanodeProtocol.java | 3 +- .../src/main/proto/InterDatanodeProtocol.proto | 2 + .../hadoop-hdfs/src/main/proto/fsimage.proto | 1 + .../hadoop-hdfs/src/main/proto/hdfs.proto | 6 +- .../blockmanagement/TestReplicationPolicy.java | 2 +- .../server/datanode/SimulatedFSDataset.java | 1 + .../hdfs/server/datanode/TestBlockRecovery.java | 53 +- .../impl/TestInterDatanodeProtocol.java | 9 +- .../TestCommitBlockSynchronization.java | 3 + .../hdfs/server/namenode/TestFileTruncate.java | 525 +++++++++++++++++-- 36 files changed, 1226 insertions(+), 269 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 749f387..cfd1c67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -537,7 +537,7 @@ public interface ClientProtocol { * @param src existing file * @param newLength the target size * - * @return true if and client does not need to wait for block recovery, + * @return true if client does not need to wait for block recovery, * false if client needs to wait for block recovery. * * @throws AccessControlException If access is denied http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java index 087c697..ba0a8fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java @@ -76,12 +76,12 @@ public class InterDatanodeProtocolServerSideTranslatorPB implements final String storageID; try { storageID = impl.updateReplicaUnderRecovery( - PBHelper.convert(request.getBlock()), - request.getRecoveryId(), request.getNewLength()); + PBHelper.convert(request.getBlock()), request.getRecoveryId(), + request.getNewBlockId(), request.getNewLength()); } catch (IOException e) { throw new ServiceException(e); } return UpdateReplicaUnderRecoveryResponseProto.newBuilder() .setStorageUuid(storageID).build(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java index 5174d86..fee62a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java @@ -102,11 +102,12 @@ public class InterDatanodeProtocolTranslatorPB implements @Override public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, - long recoveryId, long newLength) throws IOException { + long recoveryId, long newBlockId, long newLength) throws IOException { UpdateReplicaUnderRecoveryRequestProto req = UpdateReplicaUnderRecoveryRequestProto.newBuilder() .setBlock(PBHelper.convert(oldBlock)) - .setNewLength(newLength).setRecoveryId(recoveryId).build(); + .setNewLength(newLength).setNewBlockId(newBlockId) + .setRecoveryId(recoveryId).build(); try { return rpcProxy.updateReplicaUnderRecovery(NULL_CONTROLLER, req ).getStorageUuid(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 3f6a7f3..7187838 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -607,16 +607,19 @@ public class PBHelper { return null; } LocatedBlockProto lb = PBHelper.convert((LocatedBlock)b); - return RecoveringBlockProto.newBuilder().setBlock(lb) - .setNewGenStamp(b.getNewGenerationStamp()) - .setTruncateFlag(b.getTruncateFlag()).build(); + RecoveringBlockProto.Builder builder = RecoveringBlockProto.newBuilder(); + builder.setBlock(lb).setNewGenStamp(b.getNewGenerationStamp()); + if(b.getNewBlock() != null) + builder.setTruncateBlock(PBHelper.convert(b.getNewBlock())); + return builder.build(); } public static RecoveringBlock convert(RecoveringBlockProto b) { ExtendedBlock block = convert(b.getBlock().getB()); DatanodeInfo[] locs = convert(b.getBlock().getLocsList()); - return new RecoveringBlock(block, locs, b.getNewGenStamp(), - b.getTruncateFlag()); + return (b.hasTruncateBlock()) ? + new RecoveringBlock(block, locs, PBHelper.convert(b.getTruncateBlock())) : + new RecoveringBlock(block, locs, b.getNewGenStamp()); } public static DatanodeInfoProto.AdminState convert( http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java index 28b179d..8a811ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java @@ -55,6 +55,11 @@ public class BlockInfoUnderConstruction extends BlockInfo { private long blockRecoveryId = 0; /** + * The block source to use in the event of copy-on-write truncate. + */ + private Block truncateBlock; + + /** * ReplicaUnderConstruction contains information about replicas while * they are under construction. * The GS, the length and the state of the replica is as reported by @@ -229,6 +234,15 @@ public class BlockInfoUnderConstruction extends BlockInfo { return blockRecoveryId; } + /** Get recover block */ + public Block getTruncateBlock() { + return truncateBlock; + } + + public void setTruncateBlock(Block recoveryBlock) { + this.truncateBlock = recoveryBlock; + } + /** * Process the recorded replicas. When about to commit or finish the * pipeline recovery sort out bad replicas. @@ -273,11 +287,7 @@ public class BlockInfoUnderConstruction extends BlockInfo { * make it primary. */ public void initializeBlockRecovery(long recoveryId) { - initializeBlockRecovery(BlockUCState.UNDER_RECOVERY, recoveryId); - } - - public void initializeBlockRecovery(BlockUCState s, long recoveryId) { - setBlockUCState(s); + setBlockUCState(BlockUCState.UNDER_RECOVERY); blockRecoveryId = recoveryId; if (replicas.size() == 0) { NameNode.blockStateChangeLog.warn("BLOCK*" http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 7e54c0f..0386107 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -700,13 +700,14 @@ public class BlockManager { * The client is supposed to allocate a new block with the next call. * * @param bc file + * @param bytesToRemove num of bytes to remove from block * @return the last block locations if the block is partial or null otherwise */ public LocatedBlock convertLastBlockToUnderConstruction( - BlockCollection bc) throws IOException { + BlockCollection bc, long bytesToRemove) throws IOException { BlockInfo oldBlock = bc.getLastBlock(); if(oldBlock == null || - bc.getPreferredBlockSize() == oldBlock.getNumBytes()) + bc.getPreferredBlockSize() == oldBlock.getNumBytes() - bytesToRemove) return null; assert oldBlock == getStoredBlock(oldBlock) : "last block of the file is not in blocksMap"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 918b8d9..7ef6521 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -1433,26 +1432,37 @@ public class DatanodeManager { recoveryLocations.add(storages[i]); } } + // If we are performing a truncate recovery than set recovery fields + // to old block. + boolean truncateRecovery = b.getTruncateBlock() != null; + boolean copyOnTruncateRecovery = truncateRecovery && + b.getTruncateBlock().getBlockId() != b.getBlockId(); + ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ? + new ExtendedBlock(blockPoolId, b.getTruncateBlock()) : + new ExtendedBlock(blockPoolId, b); // If we only get 1 replica after eliminating stale nodes, then choose all // replicas for recovery and let the primary data node handle failures. + DatanodeInfo[] recoveryInfos; if (recoveryLocations.size() > 1) { if (recoveryLocations.size() != storages.length) { LOG.info("Skipped stale nodes for recovery : " + (storages.length - recoveryLocations.size())); } - boolean isTruncate = b.getBlockUCState().equals( - HdfsServerConstants.BlockUCState.BEING_TRUNCATED); - brCommand.add(new RecoveringBlock( - new ExtendedBlock(blockPoolId, b), - DatanodeStorageInfo.toDatanodeInfos(recoveryLocations), - b.getBlockRecoveryId(), isTruncate)); + recoveryInfos = + DatanodeStorageInfo.toDatanodeInfos(recoveryLocations); } else { // If too many replicas are stale, then choose all replicas to participate // in block recovery. - brCommand.add(new RecoveringBlock( - new ExtendedBlock(blockPoolId, b), - DatanodeStorageInfo.toDatanodeInfos(storages), - b.getBlockRecoveryId())); + recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages); + } + if(truncateRecovery) { + Block recoveryBlock = (copyOnTruncateRecovery) ? b : + b.getTruncateBlock(); + brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos, + recoveryBlock)); + } else { + brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos, + b.getBlockRecoveryId())); } } return new DatanodeCommand[] { brCommand }; http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java index f2e7ff4..9bba2c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java @@ -300,13 +300,6 @@ public final class HdfsServerConstants { */ UNDER_RECOVERY, /** - * The block is being truncated.<br> - * When a file is truncated its last block may need to be truncated - * and needs to go through a recovery procedure, - * which synchronizes the existing replicas contents. - */ - BEING_TRUNCATED, - /** * The block is committed.<br> * The client reported that all bytes are written to data-nodes * with the given generation stamp and block length, but no http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 7f95f33..84528e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2530,14 +2530,16 @@ public class DataNode extends ReconfigurableBase */ @Override // InterDatanodeProtocol public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock, - final long recoveryId, final long newLength) throws IOException { + final long recoveryId, final long newBlockId, final long newLength) + throws IOException { final String storageID = data.updateReplicaUnderRecovery(oldBlock, - recoveryId, newLength); + recoveryId, newBlockId, newLength); // Notify the namenode of the updated block info. This is important // for HA, since otherwise the standby node may lose track of the // block locations until the next block report. ExtendedBlock newBlock = new ExtendedBlock(oldBlock); newBlock.setGenerationStamp(recoveryId); + newBlock.setBlockId(newBlockId); newBlock.setNumBytes(newLength); notifyNamenodeReceivedBlock(newBlock, "", storageID); return storageID; @@ -2559,10 +2561,12 @@ public class DataNode extends ReconfigurableBase this.rInfo = rInfo; } - void updateReplicaUnderRecovery(String bpid, long recoveryId, long newLength - ) throws IOException { + void updateReplicaUnderRecovery(String bpid, long recoveryId, + long newBlockId, long newLength) + throws IOException { final ExtendedBlock b = new ExtendedBlock(bpid, rInfo); - storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newLength); + storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newBlockId, + newLength); } @Override @@ -2644,8 +2648,12 @@ public class DataNode extends ReconfigurableBase final String bpid = block.getBlockPoolId(); DatanodeProtocolClientSideTranslatorPB nn = getActiveNamenodeForBP(block.getBlockPoolId()); - + long recoveryId = rBlock.getNewGenerationStamp(); + boolean isTruncateRecovery = rBlock.getNewBlock() != null; + long blockId = (isTruncateRecovery) ? + rBlock.getNewBlock().getBlockId() : block.getBlockId(); + if (LOG.isDebugEnabled()) { LOG.debug("block=" + block + ", (length=" + block.getNumBytes() + "), syncList=" + syncList); @@ -2679,7 +2687,7 @@ public class DataNode extends ReconfigurableBase // Calculate list of nodes that will participate in the recovery // and the new block size List<BlockRecord> participatingList = new ArrayList<BlockRecord>(); - final ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(), + final ExtendedBlock newBlock = new ExtendedBlock(bpid, blockId, -1, recoveryId); switch(bestState) { case FINALIZED: @@ -2691,10 +2699,7 @@ public class DataNode extends ReconfigurableBase r.rInfo.getNumBytes() == finalizedLength) participatingList.add(r); } - if(rBlock.getTruncateFlag()) - newBlock.setNumBytes(rBlock.getBlock().getNumBytes()); - else - newBlock.setNumBytes(finalizedLength); + newBlock.setNumBytes(finalizedLength); break; case RBW: case RWR: @@ -2706,21 +2711,21 @@ public class DataNode extends ReconfigurableBase participatingList.add(r); } } - if(rBlock.getTruncateFlag()) - newBlock.setNumBytes(rBlock.getBlock().getNumBytes()); - else - newBlock.setNumBytes(minLength); + newBlock.setNumBytes(minLength); break; case RUR: case TEMPORARY: assert false : "bad replica state: " + bestState; } + if(isTruncateRecovery) + newBlock.setNumBytes(rBlock.getNewBlock().getNumBytes()); List<DatanodeID> failedList = new ArrayList<DatanodeID>(); final List<BlockRecord> successList = new ArrayList<BlockRecord>(); for(BlockRecord r : participatingList) { try { - r.updateReplicaUnderRecovery(bpid, recoveryId, newBlock.getNumBytes()); + r.updateReplicaUnderRecovery(bpid, recoveryId, blockId, + newBlock.getNumBytes()); successList.add(r); } catch (IOException e) { InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock=" http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 462ad31..f2dddfd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -418,7 +418,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { * @return the ID of storage that stores the block */ public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, - long recoveryId, long newLength) throws IOException; + long recoveryId, long newBlockId, long newLength) throws IOException; /** * add new block pool ID http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 3eda38e..e62986f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -670,6 +670,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId); final File dstFile = new File(destDir, srcFile.getName()); final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp); + return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum); + } + + static File[] copyBlockFiles(File srcMeta, File srcFile, File dstMeta, + File dstFile, boolean calculateChecksum) + throws IOException { if (calculateChecksum) { computeChecksum(srcMeta, dstMeta, srcFile); } else { @@ -2157,6 +2163,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { public synchronized String updateReplicaUnderRecovery( final ExtendedBlock oldBlock, final long recoveryId, + final long newBlockId, final long newlength) throws IOException { //get replica final String bpid = oldBlock.getBlockPoolId(); @@ -2189,13 +2196,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { //update replica final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock - .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId, newlength); - assert finalized.getBlockId() == oldBlock.getBlockId() - && finalized.getGenerationStamp() == recoveryId - && finalized.getNumBytes() == newlength - : "Replica information mismatched: oldBlock=" + oldBlock - + ", recoveryId=" + recoveryId + ", newlength=" + newlength - + ", finalized=" + finalized; + .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId, + newBlockId, newlength); + + boolean copyTruncate = newBlockId != oldBlock.getBlockId(); + if(!copyTruncate) { + assert finalized.getBlockId() == oldBlock.getBlockId() + && finalized.getGenerationStamp() == recoveryId + && finalized.getNumBytes() == newlength + : "Replica information mismatched: oldBlock=" + oldBlock + + ", recoveryId=" + recoveryId + ", newlength=" + newlength + + ", newBlockId=" + newBlockId + ", finalized=" + finalized; + } else { + assert finalized.getBlockId() == oldBlock.getBlockId() + && finalized.getGenerationStamp() == oldBlock.getGenerationStamp() + && finalized.getNumBytes() == oldBlock.getNumBytes() + : "Finalized and old information mismatched: oldBlock=" + oldBlock + + ", genStamp=" + oldBlock.getGenerationStamp() + + ", len=" + oldBlock.getNumBytes() + + ", finalized=" + finalized; + } //check replica files after update checkReplicaFiles(finalized); @@ -2208,6 +2228,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { String bpid, ReplicaUnderRecovery rur, long recoveryId, + long newBlockId, long newlength) throws IOException { //check recovery id if (rur.getRecoveryID() != recoveryId) { @@ -2215,26 +2236,62 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { + ", rur=" + rur); } + boolean copyOnTruncate = newBlockId > 0L && rur.getBlockId() != newBlockId; + File blockFile; + File metaFile; // bump rur's GS to be recovery id - bumpReplicaGS(rur, recoveryId); + if(!copyOnTruncate) { + bumpReplicaGS(rur, recoveryId); + blockFile = rur.getBlockFile(); + metaFile = rur.getMetaFile(); + } else { + File[] copiedReplicaFiles = + copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId); + blockFile = copiedReplicaFiles[1]; + metaFile = copiedReplicaFiles[0]; + } //update length - final File replicafile = rur.getBlockFile(); if (rur.getNumBytes() < newlength) { throw new IOException("rur.getNumBytes() < newlength = " + newlength + ", rur=" + rur); } if (rur.getNumBytes() > newlength) { rur.unlinkBlock(1); - truncateBlock(replicafile, rur.getMetaFile(), rur.getNumBytes(), newlength); - // update RUR with the new length - rur.setNumBytes(newlength); + truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength); + if(!copyOnTruncate) { + // update RUR with the new length + rur.setNumBytes(newlength); + } else { + // Copying block to a new block with new blockId. + // Not truncating original block. + ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( + newBlockId, recoveryId, rur.getVolume(), blockFile.getParentFile(), + newlength); + newReplicaInfo.setNumBytes(newlength); + volumeMap.add(bpid, newReplicaInfo); + finalizeReplica(bpid, newReplicaInfo); + } } // finalize the block return finalizeReplica(bpid, rur); } + private File[] copyReplicaWithNewBlockIdAndGS( + ReplicaUnderRecovery replicaInfo, String bpid, long newBlkId, long newGS) + throws IOException { + String blockFileName = Block.BLOCK_FILE_PREFIX + newBlkId; + FsVolumeImpl v = volumes.getNextVolume( + replicaInfo.getVolume().getStorageType(), replicaInfo.getNumBytes()); + final File tmpDir = v.getBlockPoolSlice(bpid).getTmpDir(); + final File destDir = DatanodeUtil.idToBlockDir(tmpDir, newBlkId); + final File dstBlockFile = new File(destDir, blockFileName); + final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS); + return copyBlockFiles(replicaInfo.getMetaFile(), replicaInfo.getBlockFile(), + dstMetaFile, dstBlockFile, true); + } + @Override // FsDatasetSpi public synchronized long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java index dc0fe1f..cb3da19 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java @@ -424,7 +424,7 @@ class FSDirStatAndListingOp { fileNode.computeFileSizeNotIncludingLastUcBlock() : size; loc = fsd.getFSNamesystem().getBlockManager().createLocatedBlocks( - fileNode.getBlocks(), fileSize, isUc, 0L, size, false, + fileNode.getBlocks(snapshot), fileSize, isUc, 0L, size, false, inSnapshot, feInfo); if (loc == null) { loc = new LocatedBlocks(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 1948099..9c33e06 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -1093,18 +1093,31 @@ public class FSDirectory implements Closeable { * Unlike FSNamesystem.truncate, this will not schedule block recovery. */ void unprotectedTruncate(String src, String clientName, String clientMachine, - long newLength, long mtime) + long newLength, long mtime, Block truncateBlock) throws UnresolvedLinkException, QuotaExceededException, SnapshotAccessControlException, IOException { INodesInPath iip = getINodesInPath(src, true); + INodeFile file = iip.getLastINode().asFile(); BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo(); boolean onBlockBoundary = unprotectedTruncate(iip, newLength, collectedBlocks, mtime); if(! onBlockBoundary) { - getFSNamesystem().prepareFileForWrite(src, - iip, clientName, clientMachine, false, false); + BlockInfo oldBlock = file.getLastBlock(); + Block tBlk = + getFSNamesystem().prepareFileForTruncate(iip, + clientName, clientMachine, file.computeFileSize() - newLength, + truncateBlock); + assert Block.matchingIdAndGenStamp(tBlk, truncateBlock) && + tBlk.getNumBytes() == truncateBlock.getNumBytes() : + "Should be the same block."; + if(oldBlock.getBlockId() != tBlk.getBlockId() && + !file.isBlockInLatestSnapshot(oldBlock)) { + getBlockManager().removeBlockFromMap(oldBlock); + } } + assert onBlockBoundary == (truncateBlock == null) : + "truncateBlock is null iff on block boundary: " + truncateBlock; getFSNamesystem().removeBlocksAndUpdateSafemodeTotal(collectedBlocks); } @@ -1123,7 +1136,8 @@ public class FSDirectory implements Closeable { /** * Truncate has the following properties: * 1.) Any block deletions occur now. - * 2.) INode length is truncated now â clients can only read up to new length. + * 2.) INode length is truncated now â new clients can only read up to + * the truncated length. * 3.) INode will be set to UC and lastBlock set to UNDER_RECOVERY. * 4.) NN will trigger DN truncation recovery and waits for DNs to report. * 5.) File is considered UNDER_RECOVERY until truncation recovery completes. @@ -1136,20 +1150,16 @@ public class FSDirectory implements Closeable { long mtime) throws IOException { assert hasWriteLock(); INodeFile file = iip.getLastINode().asFile(); + int latestSnapshot = iip.getLatestSnapshotId(); + file.recordModification(latestSnapshot, true); long oldDiskspace = file.diskspaceConsumed(); long remainingLength = file.collectBlocksBeyondMax(newLength, collectedBlocks); + file.excludeSnapshotBlocks(latestSnapshot, collectedBlocks); file.setModificationTime(mtime); updateCount(iip, 0, file.diskspaceConsumed() - oldDiskspace, true); - // If on block boundary, then return - long lastBlockDelta = remainingLength - newLength; - if(lastBlockDelta == 0) - return true; - // Set new last block length - BlockInfo lastBlock = file.getLastBlock(); - assert lastBlock.getNumBytes() - lastBlockDelta > 0 : "wrong block size"; - lastBlock.setNumBytes(lastBlock.getNumBytes() - lastBlockDelta); - return false; + // return whether on a block boundary + return (remainingLength - newLength) == 0; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index d32aad9..144be37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -902,13 +903,14 @@ public class FSEditLog implements LogsPurgeable { * Add truncate file record to edit log */ void logTruncate(String src, String clientName, String clientMachine, - long size, long timestamp) { + long size, long timestamp, Block truncateBlock) { TruncateOp op = TruncateOp.getInstance(cache.get()) .setPath(src) .setClientName(clientName) .setClientMachine(clientMachine) .setNewLength(size) - .setTimestamp(timestamp); + .setTimestamp(timestamp) + .setTruncateBlock(truncateBlock); logEdit(op); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 2ff3b77..1f4d1a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -857,7 +857,8 @@ public class FSEditLogLoader { case OP_TRUNCATE: { TruncateOp truncateOp = (TruncateOp) op; fsDir.unprotectedTruncate(truncateOp.src, truncateOp.clientName, - truncateOp.clientMachine, truncateOp.newLength, truncateOp.timestamp); + truncateOp.clientMachine, truncateOp.newLength, truncateOp.timestamp, + truncateOp.truncateBlock); break; } case OP_SET_STORAGE_POLICY: { http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java index 396fb08..9424156 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java @@ -2611,6 +2611,7 @@ public abstract class FSEditLogOp { String clientMachine; long newLength; long timestamp; + Block truncateBlock; private TruncateOp() { super(OP_TRUNCATE); @@ -2654,6 +2655,11 @@ public abstract class FSEditLogOp { return this; } + TruncateOp setTruncateBlock(Block truncateBlock) { + this.truncateBlock = truncateBlock; + return this; + } + @Override void readFields(DataInputStream in, int logVersion) throws IOException { src = FSImageSerialization.readString(in); @@ -2661,6 +2667,10 @@ public abstract class FSEditLogOp { clientMachine = FSImageSerialization.readString(in); newLength = FSImageSerialization.readLong(in); timestamp = FSImageSerialization.readLong(in); + Block[] blocks = + FSImageSerialization.readCompactBlockArray(in, logVersion); + assert blocks.length <= 1 : "Truncate op should have 1 or 0 blocks"; + truncateBlock = (blocks.length == 0) ? null : blocks[0]; } @Override @@ -2670,6 +2680,12 @@ public abstract class FSEditLogOp { FSImageSerialization.writeString(clientMachine, out); FSImageSerialization.writeLong(newLength, out); FSImageSerialization.writeLong(timestamp, out); + int size = truncateBlock != null ? 1 : 0; + Block[] blocks = new Block[size]; + if (truncateBlock != null) { + blocks[0] = truncateBlock; + } + FSImageSerialization.writeCompactBlockArray(blocks, out); } @Override @@ -2681,6 +2697,8 @@ public abstract class FSEditLogOp { Long.toString(newLength)); XMLUtils.addSaxString(contentHandler, "TIMESTAMP", Long.toString(timestamp)); + if(truncateBlock != null) + FSEditLogOp.blockToXml(contentHandler, truncateBlock); } @Override @@ -2690,6 +2708,8 @@ public abstract class FSEditLogOp { this.clientMachine = st.getValue("CLIENTMACHINE"); this.newLength = Long.parseLong(st.getValue("NEWLENGTH")); this.timestamp = Long.parseLong(st.getValue("TIMESTAMP")); + if (st.hasChildren("BLOCK")) + this.truncateBlock = FSEditLogOp.blockFromXml(st); } @Override @@ -2705,6 +2725,8 @@ public abstract class FSEditLogOp { builder.append(newLength); builder.append(", timestamp="); builder.append(timestamp); + builder.append(", truncateBlock="); + builder.append(truncateBlock); builder.append(", opCode="); builder.append(opCode); builder.append(", txid="); http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index c250838..bcc0f5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1837,8 +1837,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, : dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip); final LocatedBlocks blocks = blockManager.createLocatedBlocks( - inode.getBlocks(), fileSize, isUc, offset, length, needBlockToken, - iip.isSnapshot(), feInfo); + inode.getBlocks(iip.getPathSnapshotId()), fileSize, + isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo); // Set caching information for the located blocks. for (LocatedBlock lb : blocks.getLocatedBlocks()) { @@ -1912,7 +1912,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * Truncation at block boundary is atomic, otherwise it requires * block recovery to truncate the last block of the file. * - * @return true if and client does not need to wait for block recovery, + * @return true if client does not need to wait for block recovery, * false if client needs to wait for block recovery. */ boolean truncate(String src, long newLength, @@ -1974,44 +1974,119 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, dir.checkPathAccess(pc, iip, FsAction.WRITE); } INodeFile file = iip.getLastINode().asFile(); - // Data will be lost after truncate occurs so it cannot support snapshots. - if(file.isInLatestSnapshot(iip.getLatestSnapshotId())) - throw new HadoopIllegalArgumentException( - "Cannot truncate file with snapshot."); // Opening an existing file for write. May need lease recovery. recoverLeaseInternal(iip, src, clientName, clientMachine, false); - // Refresh INode as the file could have been closed - iip = dir.getINodesInPath4Write(src, true); file = INodeFile.valueOf(iip.getLastINode(), src); // Truncate length check. long oldLength = file.computeFileSize(); - if(oldLength == newLength) + if(oldLength == newLength) { return true; - if(oldLength < newLength) + } + if(oldLength < newLength) { throw new HadoopIllegalArgumentException( "Cannot truncate to a larger file size. Current size: " + oldLength + ", truncate size: " + newLength + "."); + } // Perform INodeFile truncation. BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo(); boolean onBlockBoundary = dir.truncate(iip, newLength, collectedBlocks, mtime); - + Block truncateBlock = null; if(! onBlockBoundary) { // Open file for write, but don't log into edits - prepareFileForWrite(src, iip, clientName, clientMachine, false, false); - file = INodeFile.valueOf(dir.getINode4Write(src), src); - initializeBlockRecovery(file); + long lastBlockDelta = file.computeFileSize() - newLength; + assert lastBlockDelta > 0 : "delta is 0 only if on block bounday"; + truncateBlock = prepareFileForTruncate(iip, clientName, clientMachine, + lastBlockDelta, null); } - getEditLog().logTruncate(src, clientName, clientMachine, newLength, mtime); + getEditLog().logTruncate(src, clientName, clientMachine, newLength, mtime, + truncateBlock); removeBlocks(collectedBlocks); return onBlockBoundary; } - void initializeBlockRecovery(INodeFile inodeFile) throws IOException { - BlockInfo lastBlock = inodeFile.getLastBlock(); - long recoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(lastBlock)); - ((BlockInfoUnderConstruction)lastBlock).initializeBlockRecovery( - BlockUCState.BEING_TRUNCATED, recoveryId); + /** + * Convert current INode to UnderConstruction. + * Recreate lease. + * Create new block for the truncated copy. + * Schedule truncation of the replicas. + * + * @return the returned block will be written to editLog and passed back into + * this method upon loading. + */ + Block prepareFileForTruncate(INodesInPath iip, + String leaseHolder, + String clientMachine, + long lastBlockDelta, + Block newBlock) + throws IOException { + INodeFile file = iip.getLastINode().asFile(); + String src = iip.getPath(); + file.recordModification(iip.getLatestSnapshotId()); + file.toUnderConstruction(leaseHolder, clientMachine); + assert file.isUnderConstruction() : "inode should be under construction."; + leaseManager.addLease( + file.getFileUnderConstructionFeature().getClientName(), src); + boolean shouldRecoverNow = (newBlock == null); + BlockInfo oldBlock = file.getLastBlock(); + boolean shouldCopyOnTruncate = shouldCopyOnTruncate(file, oldBlock); + if(newBlock == null) { + newBlock = (shouldCopyOnTruncate) ? createNewBlock() : + new Block(oldBlock.getBlockId(), oldBlock.getNumBytes(), + nextGenerationStamp(blockIdManager.isLegacyBlock(oldBlock))); + } + + BlockInfoUnderConstruction truncatedBlockUC; + if(shouldCopyOnTruncate) { + // Add new truncateBlock into blocksMap and + // use oldBlock as a source for copy-on-truncate recovery + truncatedBlockUC = new BlockInfoUnderConstruction(newBlock, + file.getBlockReplication()); + truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta); + truncatedBlockUC.setTruncateBlock(oldBlock); + file.setLastBlock(truncatedBlockUC, blockManager.getStorages(oldBlock)); + getBlockManager().addBlockCollection(truncatedBlockUC, file); + + NameNode.stateChangeLog.info("BLOCK* prepareFileForTruncate: " + + "Scheduling copy-on-truncate to new size " + + truncatedBlockUC.getNumBytes() + " new block " + newBlock + + " old block " + truncatedBlockUC.getTruncateBlock()); + } else { + // Use new generation stamp for in-place truncate recovery + blockManager.convertLastBlockToUnderConstruction(file, lastBlockDelta); + oldBlock = file.getLastBlock(); + assert !oldBlock.isComplete() : "oldBlock should be under construction"; + truncatedBlockUC = (BlockInfoUnderConstruction) oldBlock; + truncatedBlockUC.setTruncateBlock(new Block(oldBlock)); + truncatedBlockUC.getTruncateBlock().setNumBytes( + oldBlock.getNumBytes() - lastBlockDelta); + truncatedBlockUC.getTruncateBlock().setGenerationStamp( + newBlock.getGenerationStamp()); + + NameNode.stateChangeLog.debug("BLOCK* prepareFileForTruncate: " + + "Scheduling in-place block truncate to new size " + + truncatedBlockUC.getTruncateBlock().getNumBytes() + + " block=" + truncatedBlockUC); + } + if(shouldRecoverNow) + truncatedBlockUC.initializeBlockRecovery(newBlock.getGenerationStamp()); + + // update the quota: use the preferred block size for UC block + final long diff = + file.getPreferredBlockSize() - truncatedBlockUC.getNumBytes(); + dir.updateSpaceConsumed(iip, 0, diff * file.getBlockReplication()); + return newBlock; + } + + /** + * Defines if a replica needs to be copied on truncate or + * can be truncated in place. + */ + boolean shouldCopyOnTruncate(INodeFile file, BlockInfo blk) { + if(!isUpgradeFinalized()) { + return true; + } + return file.isBlockInLatestSnapshot(blk); } /** @@ -2598,7 +2673,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, leaseManager.addLease( file.getFileUnderConstructionFeature().getClientName(), src); - LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(file); + LocatedBlock ret = + blockManager.convertLastBlockToUnderConstruction(file, 0); if (ret != null) { // update the quota: use the preferred block size for UC block final long diff = file.getPreferredBlockSize() - ret.getBlockSize(); @@ -2661,7 +2737,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return false; } - private void recoverLeaseInternal(INodesInPath iip, + void recoverLeaseInternal(INodesInPath iip, String src, String holder, String clientMachine, boolean force) throws IOException { assert hasWriteLock(); @@ -2723,8 +2799,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } else { final BlockInfo lastBlock = file.getLastBlock(); if (lastBlock != null - && (lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY || - lastBlock.getBlockUCState() == BlockUCState.BEING_TRUNCATED)) { + && lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) { throw new RecoveryInProgressException("Recovery in progress, file [" + src + "], " + "lease owner [" + lease.getHolder() + "]"); } else { @@ -3942,8 +4017,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, throw new AlreadyBeingCreatedException(message); case UNDER_CONSTRUCTION: case UNDER_RECOVERY: - case BEING_TRUNCATED: final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)lastBlock; + // determine if last block was intended to be truncated + Block recoveryBlock = uc.getTruncateBlock(); + boolean truncateRecovery = recoveryBlock != null; + boolean copyOnTruncate = truncateRecovery && + recoveryBlock.getBlockId() != uc.getBlockId(); + assert !copyOnTruncate || + recoveryBlock.getBlockId() < uc.getBlockId() && + recoveryBlock.getGenerationStamp() < uc.getGenerationStamp() && + recoveryBlock.getNumBytes() > uc.getNumBytes() : + "wrong recoveryBlock"; + // setup the last block locations from the blockManager if not known if (uc.getNumExpectedLocations() == 0) { uc.setExpectedLocations(blockManager.getStorages(lastBlock)); @@ -3964,9 +4049,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // start recovery of the last block for this file long blockRecoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(uc)); lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile); - if (uc.getBlockUCState() != BlockUCState.BEING_TRUNCATED) { - uc.initializeBlockRecovery(blockRecoveryId); + if(copyOnTruncate) { + uc.setGenerationStamp(blockRecoveryId); + } else if(truncateRecovery) { + recoveryBlock.setGenerationStamp(blockRecoveryId); } + uc.initializeBlockRecovery(blockRecoveryId); leaseManager.renewLease(lease); // Cannot close file right now, since the last block requires recovery. // This may potentially cause infinite loop in lease recovery @@ -4076,11 +4164,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return true; } - void commitBlockSynchronization(ExtendedBlock lastblock, + void commitBlockSynchronization(ExtendedBlock oldBlock, long newgenerationstamp, long newlength, boolean closeFile, boolean deleteblock, DatanodeID[] newtargets, String[] newtargetstorages) throws IOException { - LOG.info("commitBlockSynchronization(lastblock=" + lastblock + LOG.info("commitBlockSynchronization(oldBlock=" + oldBlock + ", newgenerationstamp=" + newgenerationstamp + ", newlength=" + newlength + ", newtargets=" + Arrays.asList(newtargets) @@ -4099,17 +4187,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, checkNameNodeSafeMode( "Cannot commitBlockSynchronization while in safe mode"); final BlockInfo storedBlock = getStoredBlock( - ExtendedBlock.getLocalBlock(lastblock)); + ExtendedBlock.getLocalBlock(oldBlock)); if (storedBlock == null) { if (deleteblock) { // This may be a retry attempt so ignore the failure // to locate the block. if (LOG.isDebugEnabled()) { - LOG.debug("Block (=" + lastblock + ") not found"); + LOG.debug("Block (=" + oldBlock + ") not found"); } return; } else { - throw new IOException("Block (=" + lastblock + ") not found"); + throw new IOException("Block (=" + oldBlock + ") not found"); } } // @@ -4136,34 +4224,40 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, + iFile.getFullPathName() + ", likely due to delayed block" + " removal"); } - if (!iFile.isUnderConstruction() || storedBlock.isComplete()) { + if ((!iFile.isUnderConstruction() || storedBlock.isComplete()) && + iFile.getLastBlock().isComplete()) { if (LOG.isDebugEnabled()) { - LOG.debug("Unexpected block (=" + lastblock + LOG.debug("Unexpected block (=" + oldBlock + ") since the file (=" + iFile.getLocalName() + ") is not under construction"); } return; } - long recoveryId = - ((BlockInfoUnderConstruction)storedBlock).getBlockRecoveryId(); + BlockInfoUnderConstruction truncatedBlock = + (BlockInfoUnderConstruction) iFile.getLastBlock(); + long recoveryId = truncatedBlock.getBlockRecoveryId(); + boolean copyTruncate = + truncatedBlock.getBlockId() != storedBlock.getBlockId(); if(recoveryId != newgenerationstamp) { throw new IOException("The recovery id " + newgenerationstamp + " does not match current recovery id " - + recoveryId + " for block " + lastblock); + + recoveryId + " for block " + oldBlock); } if (deleteblock) { - Block blockToDel = ExtendedBlock.getLocalBlock(lastblock); + Block blockToDel = ExtendedBlock.getLocalBlock(oldBlock); boolean remove = iFile.removeLastBlock(blockToDel); if (remove) { - blockManager.removeBlockFromMap(storedBlock); + blockManager.removeBlock(storedBlock); } } else { // update last block - storedBlock.setGenerationStamp(newgenerationstamp); - storedBlock.setNumBytes(newlength); + if(!copyTruncate) { + storedBlock.setGenerationStamp(newgenerationstamp); + storedBlock.setNumBytes(newlength); + } // find the DatanodeDescriptor objects // There should be no locations in the blockManager till now because the @@ -4193,7 +4287,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, DatanodeStorageInfo storageInfo = trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i)); if (storageInfo != null) { - storageInfo.addBlock(storedBlock); + if(copyTruncate) { + storageInfo.addBlock(truncatedBlock); + } else { + storageInfo.addBlock(storedBlock); + } } } } @@ -4203,11 +4301,22 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, blockManager.getDatanodeManager().getDatanodeStorageInfos( trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]), trimmedStorages.toArray(new String[trimmedStorages.size()])); - iFile.setLastBlock(storedBlock, trimmedStorageInfos); + if(copyTruncate) { + iFile.setLastBlock(truncatedBlock, trimmedStorageInfos); + } else { + iFile.setLastBlock(storedBlock, trimmedStorageInfos); + } } if (closeFile) { - src = closeFileCommitBlocks(iFile, storedBlock); + if(copyTruncate) { + src = closeFileCommitBlocks(iFile, truncatedBlock); + if(!iFile.isBlockInLatestSnapshot(storedBlock)) { + blockManager.removeBlock(storedBlock); + } + } else { + src = closeFileCommitBlocks(iFile, storedBlock); + } } else { // If this commit does not want to close the file, persist blocks src = iFile.getFullPathName(); @@ -4218,13 +4327,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } getEditLog().logSync(); if (closeFile) { - LOG.info("commitBlockSynchronization(newblock=" + lastblock + LOG.info("commitBlockSynchronization(oldBlock=" + oldBlock + ", file=" + src + ", newgenerationstamp=" + newgenerationstamp + ", newlength=" + newlength + ", newtargets=" + Arrays.asList(newtargets) + ") successful"); } else { - LOG.info("commitBlockSynchronization(" + lastblock + ") successful"); + LOG.info("commitBlockSynchronization(" + oldBlock + ") successful"); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java index 41b2391..58ef536 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java @@ -228,7 +228,8 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> { /** Is this inode in the latest snapshot? */ public final boolean isInLatestSnapshot(final int latestSnapshotId) { - if (latestSnapshotId == Snapshot.CURRENT_STATE_ID) { + if (latestSnapshotId == Snapshot.CURRENT_STATE_ID || + latestSnapshotId == Snapshot.NO_SNAPSHOT_ID) { return false; } // if parent is a reference node, parent must be a renamed node. We can @@ -817,11 +818,15 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> { * @param toDelete the to-be-deleted block */ public void addDeleteBlock(Block toDelete) { - if (toDelete != null) { - toDeleteList.add(toDelete); - } + assert toDelete != null : "toDelete is null"; + toDeleteList.add(toDelete); } - + + public void removeDeleteBlock(Block block) { + assert block != null : "block is null"; + toDeleteList.remove(block); + } + /** * Clear {@link BlocksMapUpdateInfo#toDeleteList} */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index d1ff2f7..64887e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -24,7 +24,9 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.permission.PermissionStatus; @@ -304,6 +306,11 @@ public class INodeFile extends INodeWithAdditionalFields @Override public void recordModification(final int latestSnapshotId) throws QuotaExceededException { + recordModification(latestSnapshotId, false); + } + + public void recordModification(final int latestSnapshotId, boolean withBlocks) + throws QuotaExceededException { if (isInLatestSnapshot(latestSnapshotId) && !shouldRecordInSrcSnapshot(latestSnapshotId)) { // the file is in snapshot, create a snapshot feature if it does not have @@ -312,10 +319,10 @@ public class INodeFile extends INodeWithAdditionalFields sf = addSnapshotFeature(null); } // record self in the diff list if necessary - sf.getDiffs().saveSelf2Snapshot(latestSnapshotId, this, null); + sf.getDiffs().saveSelf2Snapshot(latestSnapshotId, this, null, withBlocks); } } - + public FileDiffList getDiffs() { FileWithSnapshotFeature sf = this.getFileWithSnapshotFeature(); if (sf != null) { @@ -415,6 +422,20 @@ public class INodeFile extends INodeWithAdditionalFields return this.blocks; } + /** @return blocks of the file corresponding to the snapshot. */ + public BlockInfo[] getBlocks(int snapshot) { + if(snapshot == CURRENT_STATE_ID || getDiffs() == null) + return getBlocks(); + FileDiff diff = getDiffs().getDiffById(snapshot); + BlockInfo[] snapshotBlocks = diff == null ? getBlocks() : diff.getBlocks(); + if(snapshotBlocks != null) + return snapshotBlocks; + // Blocks are not in the current snapshot + // Find next snapshot with blocks present or return current file blocks + snapshotBlocks = getDiffs().findLaterSnapshotBlocks(diff.getSnapshotId()); + return (snapshotBlocks == null) ? getBlocks() : snapshotBlocks; + } + void updateBlockCollection() { if (blocks != null) { for(BlockInfo b : blocks) { @@ -509,13 +530,13 @@ public class INodeFile extends INodeWithAdditionalFields } clear(); removedINodes.add(this); - FileWithSnapshotFeature sf = getFileWithSnapshotFeature(); if (sf != null) { + sf.getDiffs().destroyAndCollectSnapshotBlocks(collectedBlocks); sf.clearDiffs(); } } - + @Override public String getName() { // Get the full path name of this inode. @@ -554,39 +575,23 @@ public class INodeFile extends INodeWithAdditionalFields @Override public final ContentSummaryComputationContext computeContentSummary( final ContentSummaryComputationContext summary) { - computeContentSummary4Snapshot(summary.getCounts()); - computeContentSummary4Current(summary.getCounts()); - return summary; - } - - private void computeContentSummary4Snapshot(final Content.Counts counts) { - // file length and diskspace only counted for the latest state of the file - // i.e. either the current state or the last snapshot + final Content.Counts counts = summary.getCounts(); FileWithSnapshotFeature sf = getFileWithSnapshotFeature(); - if (sf != null) { + if (sf == null) { + counts.add(Content.LENGTH, computeFileSize()); + counts.add(Content.FILE, 1); + } else { final FileDiffList diffs = sf.getDiffs(); final int n = diffs.asList().size(); counts.add(Content.FILE, n); if (n > 0 && sf.isCurrentFileDeleted()) { counts.add(Content.LENGTH, diffs.getLast().getFileSize()); - } - - if (sf.isCurrentFileDeleted()) { - final long lastFileSize = diffs.getLast().getFileSize(); - counts.add(Content.DISKSPACE, lastFileSize * getBlockReplication()); + } else { + counts.add(Content.LENGTH, computeFileSize()); } } - } - - private void computeContentSummary4Current(final Content.Counts counts) { - FileWithSnapshotFeature sf = this.getFileWithSnapshotFeature(); - if (sf != null && sf.isCurrentFileDeleted()) { - return; - } - - counts.add(Content.LENGTH, computeFileSize()); - counts.add(Content.FILE, 1); counts.add(Content.DISKSPACE, diskspaceConsumed()); + return summary; } /** The same as computeFileSize(null). */ @@ -651,9 +656,36 @@ public class INodeFile extends INodeWithAdditionalFields return size; } + /** + * Compute size consumed by all blocks of the current file, + * including blocks in its snapshots. + * Use preferred block size for the last block if it is under construction. + */ public final long diskspaceConsumed() { - // use preferred block size for the last block if it is under construction - return computeFileSize(true, true) * getBlockReplication(); + FileWithSnapshotFeature sf = getFileWithSnapshotFeature(); + if(sf == null) { + return computeFileSize(true, true) * getBlockReplication(); + } + + // Collect all distinct blocks + long size = 0; + Set<Block> allBlocks = new HashSet<Block>(Arrays.asList(getBlocks())); + List<FileDiff> diffs = sf.getDiffs().asList(); + for(FileDiff diff : diffs) { + BlockInfo[] diffBlocks = diff.getBlocks(); + if (diffBlocks != null) { + allBlocks.addAll(Arrays.asList(diffBlocks)); + } + } + for(Block block : allBlocks) { + size += block.getNumBytes(); + } + // check if the last block is under construction + BlockInfo lastBlock = getLastBlock(); + if(lastBlock != null && lastBlock instanceof BlockInfoUnderConstruction) { + size += getPreferredBlockSize() - lastBlock.getNumBytes(); + } + return size * getBlockReplication(); } public final long diskspaceConsumed(int lastSnapshotId) { @@ -706,7 +738,7 @@ public class INodeFile extends INodeWithAdditionalFields final BlockInfo[] oldBlocks = getBlocks(); if (oldBlocks == null) return 0; - //find the minimum n such that the size of the first n blocks > max + // find the minimum n such that the size of the first n blocks > max int n = 0; long size = 0; for(; n < oldBlocks.length && max > size; n++) { @@ -716,23 +748,78 @@ public class INodeFile extends INodeWithAdditionalFields return size; // starting from block n, the data is beyond max. - // resize the array. + // resize the array. + truncateBlocksTo(n); + + // collect the blocks beyond max + if (collectedBlocks != null) { + for(; n < oldBlocks.length; n++) { + collectedBlocks.addDeleteBlock(oldBlocks[n]); + } + } + return size; + } + + void truncateBlocksTo(int n) { final BlockInfo[] newBlocks; if (n == 0) { newBlocks = BlockInfo.EMPTY_ARRAY; } else { newBlocks = new BlockInfo[n]; - System.arraycopy(oldBlocks, 0, newBlocks, 0, n); + System.arraycopy(getBlocks(), 0, newBlocks, 0, n); } // set new blocks setBlocks(newBlocks); + } - // collect the blocks beyond max - if (collectedBlocks != null) { - for(; n < oldBlocks.length; n++) { - collectedBlocks.addDeleteBlock(oldBlocks[n]); - } + public void collectBlocksBeyondSnapshot(BlockInfo[] snapshotBlocks, + BlocksMapUpdateInfo collectedBlocks) { + BlockInfo[] oldBlocks = getBlocks(); + if(snapshotBlocks == null || oldBlocks == null) + return; + // Skip blocks in common between the file and the snapshot + int n = 0; + while(n < oldBlocks.length && n < snapshotBlocks.length && + oldBlocks[n] == snapshotBlocks[n]) { + n++; } - return size; + truncateBlocksTo(n); + // Collect the remaining blocks of the file + while(n < oldBlocks.length) { + collectedBlocks.addDeleteBlock(oldBlocks[n++]); + } + } + + /** Exclude blocks collected for deletion that belong to a snapshot. */ + void excludeSnapshotBlocks(int snapshotId, + BlocksMapUpdateInfo collectedBlocks) { + if(collectedBlocks == null || collectedBlocks.getToDeleteList().isEmpty()) + return; + FileWithSnapshotFeature sf = getFileWithSnapshotFeature(); + if(sf == null) + return; + BlockInfo[] snapshotBlocks = + getDiffs().findEarlierSnapshotBlocks(snapshotId); + if(snapshotBlocks == null) + return; + List<Block> toDelete = collectedBlocks.getToDeleteList(); + for(Block blk : snapshotBlocks) { + if(toDelete.contains(blk)) + collectedBlocks.removeDeleteBlock(blk); + } + } + + /** + * @return true if the block is contained in a snapshot or false otherwise. + */ + boolean isBlockInLatestSnapshot(BlockInfo block) { + FileWithSnapshotFeature sf = this.getFileWithSnapshotFeature(); + if (sf == null || sf.getDiffs() == null) + return false; + BlockInfo[] snapshotBlocks = + getDiffs().findEarlierSnapshotBlocks(getDiffs().getLastSnapshotId()); + if(snapshotBlocks == null) + return false; + return Arrays.asList(snapshotBlocks).contains(block); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java index 512913b..d742c6d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java @@ -69,8 +69,9 @@ public class NameNodeLayoutVersion { CREATE_OVERWRITE(-58, "Use single editlog record for " + "creating file with overwrite"), XATTRS_NAMESPACE_EXT(-59, "Increase number of xattr namespaces"), - BLOCK_STORAGE_POLICY(-60, "Block Storage policy"); - + BLOCK_STORAGE_POLICY(-60, "Block Storage policy"), + TRUNCATE(-61, "Truncate"); + private final FeatureInfo info; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java index d918495..b330215 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java @@ -163,9 +163,12 @@ abstract class AbstractINodeDiffList<N extends INode, * id, otherwise <=. * @return The id of the latest snapshot before the given snapshot. */ - private final int getPrior(int anchorId, boolean exclusive) { + public final int getPrior(int anchorId, boolean exclusive) { if (anchorId == Snapshot.CURRENT_STATE_ID) { - return getLastSnapshotId(); + int last = getLastSnapshotId(); + if(exclusive && last == anchorId) + return Snapshot.NO_SNAPSHOT_ID; + return last; } final int i = Collections.binarySearch(diffs, anchorId); if (exclusive) { // must be the one before @@ -290,10 +293,11 @@ abstract class AbstractINodeDiffList<N extends INode, } /** Save the snapshot copy to the latest snapshot. */ - public void saveSelf2Snapshot(int latestSnapshotId, N currentINode, + public D saveSelf2Snapshot(int latestSnapshotId, N currentINode, A snapshotCopy) throws QuotaExceededException { + D diff = null; if (latestSnapshotId != Snapshot.CURRENT_STATE_ID) { - D diff = checkAndAddLatestSnapshotDiff(latestSnapshotId, currentINode); + diff = checkAndAddLatestSnapshotDiff(latestSnapshotId, currentINode); if (diff.snapshotINode == null) { if (snapshotCopy == null) { snapshotCopy = createSnapshotCopy(currentINode); @@ -301,6 +305,7 @@ abstract class AbstractINodeDiffList<N extends INode, diff.saveSnapshotCopy(snapshotCopy); } } + return diff; } @Override @@ -312,4 +317,4 @@ abstract class AbstractINodeDiffList<N extends INode, public String toString() { return getClass().getSimpleName() + ": " + diffs; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java index 86ba03d..7ac85cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java @@ -36,6 +36,10 @@ import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.namenode.AclEntryStatusFormat; import org.apache.hadoop.hdfs.server.namenode.AclFeature; import org.apache.hadoop.hdfs.server.namenode.FSDirectory; @@ -229,6 +233,20 @@ public class FSImageFormatPBSnapshot { FileDiff diff = new FileDiff(pbf.getSnapshotId(), copy, null, pbf.getFileSize()); + List<BlockProto> bpl = pbf.getBlocksList(); + BlockInfo[] blocks = new BlockInfo[bpl.size()]; + for(int j = 0, e = bpl.size(); j < e; ++j) { + Block blk = PBHelper.convert(bpl.get(j)); + BlockInfo storedBlock = fsn.getBlockManager().getStoredBlock(blk); + if(storedBlock == null) { + storedBlock = fsn.getBlockManager().addBlockCollection( + new BlockInfo(blk, copy.getFileReplication()), file); + } + blocks[j] = storedBlock; + } + if(blocks.length > 0) { + diff.setBlocks(blocks); + } diffs.addFirst(diff); } file.addSnapshotFeature(diffs); @@ -472,6 +490,11 @@ public class FSImageFormatPBSnapshot { SnapshotDiffSection.FileDiff.Builder fb = SnapshotDiffSection.FileDiff .newBuilder().setSnapshotId(diff.getSnapshotId()) .setFileSize(diff.getFileSize()); + if(diff.getBlocks() != null) { + for(Block block : diff.getBlocks()) { + fb.addBlocks(PBHelper.convert(block)); + } + } INodeFileAttributes copy = diff.snapshotINode; if (copy != null) { fb.setName(ByteString.copyFrom(copy.getLocalNameBytes())) http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java index 919ab56..7b52dc9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs.server.namenode.snapshot; import java.io.DataOutput; import java.io.IOException; +import java.util.Arrays; import java.util.List; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization; import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; @@ -37,10 +39,13 @@ public class FileDiff extends /** The file size at snapshot creation time. */ private final long fileSize; + /** A copy of the INodeFile block list. Used in truncate. */ + private BlockInfo[] blocks; FileDiff(int snapshotId, INodeFile file) { super(snapshotId, null, null); fileSize = file.computeFileSize(); + blocks = null; } /** Constructor used by FSImage loading */ @@ -48,20 +53,40 @@ public class FileDiff extends FileDiff posteriorDiff, long fileSize) { super(snapshotId, snapshotINode, posteriorDiff); this.fileSize = fileSize; + blocks = null; } /** @return the file size in the snapshot. */ public long getFileSize() { return fileSize; } - + + /** + * Copy block references into the snapshot + * up to the current {@link #fileSize}. + * Should be done only once. + */ + public void setBlocks(BlockInfo[] blocks) { + if(this.blocks != null) + return; + int numBlocks = 0; + for(long s = 0; numBlocks < blocks.length && s < fileSize; numBlocks++) + s += blocks[numBlocks].getNumBytes(); + this.blocks = Arrays.copyOf(blocks, numBlocks); + } + + public BlockInfo[] getBlocks() { + return blocks; + } + @Override Quota.Counts combinePosteriorAndCollectBlocks(INodeFile currentINode, FileDiff posterior, BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) { - return currentINode.getFileWithSnapshotFeature() - .updateQuotaAndCollectBlocks(currentINode, posterior, collectedBlocks, - removedINodes); + FileWithSnapshotFeature sf = currentINode.getFileWithSnapshotFeature(); + assert sf != null : "FileWithSnapshotFeature is null"; + return sf.updateQuotaAndCollectBlocks( + currentINode, posterior, collectedBlocks, removedINodes); } @Override @@ -91,4 +116,13 @@ public class FileDiff extends .updateQuotaAndCollectBlocks(currentINode, this, collectedBlocks, removedINodes); } + + public void destroyAndCollectSnapshotBlocks( + BlocksMapUpdateInfo collectedBlocks) { + if(blocks == null || collectedBlocks == null) + return; + for(BlockInfo blk : blocks) + collectedBlocks.addDeleteBlock(blk); + blocks = null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java index b0a973d..07652f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java @@ -17,6 +17,13 @@ */ package org.apache.hadoop.hdfs.server.namenode.snapshot; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hdfs.protocol.QuotaExceededException; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.namenode.INode; +import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes; @@ -33,4 +40,95 @@ public class FileDiffList extends INodeFileAttributes createSnapshotCopy(INodeFile currentINode) { return new INodeFileAttributes.SnapshotCopy(currentINode); } + + public void destroyAndCollectSnapshotBlocks( + BlocksMapUpdateInfo collectedBlocks) { + for(FileDiff d : asList()) + d.destroyAndCollectSnapshotBlocks(collectedBlocks); + } + + public void saveSelf2Snapshot(int latestSnapshotId, INodeFile iNodeFile, + INodeFileAttributes snapshotCopy, boolean withBlocks) + throws QuotaExceededException { + final FileDiff diff = + super.saveSelf2Snapshot(latestSnapshotId, iNodeFile, snapshotCopy); + if(withBlocks) // Store blocks if this is the first update + diff.setBlocks(iNodeFile.getBlocks()); + } + + public BlockInfo[] findEarlierSnapshotBlocks(int snapshotId) { + assert snapshotId != Snapshot.NO_SNAPSHOT_ID : "Wrong snapshot id"; + if(snapshotId == Snapshot.CURRENT_STATE_ID) { + return null; + } + List<FileDiff> diffs = this.asList(); + int i = Collections.binarySearch(diffs, snapshotId); + BlockInfo[] blocks = null; + for(i = i >= 0 ? i : -i; i < diffs.size(); i--) { + blocks = diffs.get(i).getBlocks(); + if(blocks != null) { + break; + } + } + return blocks; + } + + public BlockInfo[] findLaterSnapshotBlocks(int snapshotId) { + assert snapshotId != Snapshot.NO_SNAPSHOT_ID : "Wrong snapshot id"; + if(snapshotId == Snapshot.CURRENT_STATE_ID) { + return null; + } + List<FileDiff> diffs = this.asList(); + int i = Collections.binarySearch(diffs, snapshotId); + BlockInfo[] blocks = null; + for(i = i >= 0 ? i+1 : -i-1; i < diffs.size(); i++) { + blocks = diffs.get(i).getBlocks(); + if(blocks != null) { + break; + } + } + return blocks; + } + + /** + * Copy blocks from the removed snapshot into the previous snapshot + * up to the file length of the latter. + * Collect unused blocks of the removed snapshot. + */ + void combineAndCollectSnapshotBlocks(INodeFile file, + FileDiff removed, + BlocksMapUpdateInfo collectedBlocks, + List<INode> removedINodes) { + BlockInfo[] removedBlocks = removed.getBlocks(); + if(removedBlocks == null) { + FileWithSnapshotFeature sf = file.getFileWithSnapshotFeature(); + assert sf != null : "FileWithSnapshotFeature is null"; + if(sf.isCurrentFileDeleted()) + sf.collectBlocksAndClear(file, collectedBlocks, removedINodes); + return; + } + int p = getPrior(removed.getSnapshotId(), true); + FileDiff earlierDiff = p == Snapshot.NO_SNAPSHOT_ID ? null : getDiffById(p); + // Copy blocks to the previous snapshot if not set already + if(earlierDiff != null) + earlierDiff.setBlocks(removedBlocks); + BlockInfo[] earlierBlocks = + (earlierDiff == null ? new BlockInfo[]{} : earlierDiff.getBlocks()); + // Find later snapshot (or file itself) with blocks + BlockInfo[] laterBlocks = findLaterSnapshotBlocks(removed.getSnapshotId()); + laterBlocks = (laterBlocks==null) ? file.getBlocks() : laterBlocks; + // Skip blocks, which belong to either the earlier or the later lists + int i = 0; + for(; i < removedBlocks.length; i++) { + if(i < earlierBlocks.length && removedBlocks[i] == earlierBlocks[i]) + continue; + if(i < laterBlocks.length && removedBlocks[i] == laterBlocks[i]) + continue; + break; + } + // Collect the remaining blocks of the file + while(i < removedBlocks.length) { + collectedBlocks.addDeleteBlock(removedBlocks[i++]); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/453a1754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java index 16f534f..e348231 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.namenode.AclFeature; import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; @@ -154,18 +155,19 @@ public class FileWithSnapshotFeature implements INode.Feature { AclStorage.removeAclFeature(aclFeature); } } - - collectBlocksAndClear(file, collectedBlocks, removedINodes); - + + getDiffs().combineAndCollectSnapshotBlocks( + file, removed, collectedBlocks, removedINodes); + long dsDelta = oldDiskspace - file.diskspaceConsumed(); return Quota.Counts.newInstance(0, dsDelta); } - + /** * If some blocks at the end of the block list no longer belongs to * any inode, collect them and update the block list. */ - private void collectBlocksAndClear(final INodeFile file, + public void collectBlocksAndClear(final INodeFile file, final BlocksMapUpdateInfo info, final List<INode> removedINodes) { // check if everything is deleted. if (isCurrentFileDeleted() && getDiffs().asList().isEmpty()) { @@ -174,13 +176,19 @@ public class FileWithSnapshotFeature implements INode.Feature { } // find max file size. final long max; + FileDiff diff = getDiffs().getLast(); if (isCurrentFileDeleted()) { - final FileDiff last = getDiffs().getLast(); - max = last == null? 0: last.getFileSize(); + max = diff == null? 0: diff.getFileSize(); } else { max = file.computeFileSize(); } - file.collectBlocksBeyondMax(max, info); + // Collect blocks that should be deleted + FileDiff last = diffs.getLast(); + BlockInfo[] snapshotBlocks = last == null ? null : last.getBlocks(); + if(snapshotBlocks == null) + file.collectBlocksBeyondMax(max, info); + else + file.collectBlocksBeyondSnapshot(snapshotBlocks, info); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org