[partial-ns] Implement getAdditionalBlock().
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6a36f5bc Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6a36f5bc Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6a36f5bc Branch: refs/heads/feature-HDFS-8286 Commit: 6a36f5bcc4edfba4e9779b0fd19923179138b551 Parents: e9c9c72b Author: Haohui Mai <whe...@apache.org> Authored: Thu May 14 17:02:18 2015 -0700 Committer: Haohui Mai <whe...@apache.org> Committed: Fri Jun 12 13:56:57 2015 -0700 ---------------------------------------------------------------------- .../server/blockmanagement/BlockManager.java | 109 +++++--- .../hdfs/server/blockmanagement/BlocksMap.java | 8 +- .../hdfs/server/namenode/FSDirWriteFileOp.java | 247 ++++++++++--------- .../hadoop/hdfs/server/namenode/FSEditLog.java | 11 +- .../hdfs/server/namenode/FSNamesystem.java | 73 +++++- .../hdfs/server/namenode/RWTransaction.java | 4 + .../hadoop/hdfs/server/namenode/Resolver.java | 4 + 7 files changed, 298 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 6139e37..9b18f45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.util.ExitUtil.terminate; import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -64,6 +65,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBloc import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.namenode.FlatINodeFileFeature; import org.apache.hadoop.hdfs.server.namenode.INodeId; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; @@ -638,17 +640,42 @@ public class BlockManager { } /** - * Convert a specified block of the file to a complete block. - * @param bc file - * @param blkIndex block index in the file - * @throws IOException if the block does not have at least a minimal number - * of replicas reported from data-nodes. + * Commit or complete the last block. Return the length of the last block */ + public long commitOrCompleteLastBlock(FlatINodeFileFeature file, Block + commitBlock) throws IOException { + if(commitBlock == null) + return 0; // not committing, this is a block allocation retry + BlockInfoContiguous lastBlock = getStoredBlock(file.lastBlock()); + if(lastBlock == null) + return 0; // no blocks in file yet + if(lastBlock.isComplete()) + return lastBlock.getNumBytes(); // already completed (e.g. by syncBlock) + + commitBlock((BlockInfoContiguousUnderConstruction) lastBlock, commitBlock); + if(countNodes(lastBlock).liveReplicas() >= minReplication) { + return completeBlock(lastBlock, false).getNumBytes(); + } + return lastBlock.getNumBytes(); + } + private BlockInfoContiguous completeBlock(final BlockCollection bc, final int blkIndex, boolean force) throws IOException { if(blkIndex < 0) return null; - BlockInfoContiguous curBlock = bc.getBlocks()[blkIndex]; + BlockInfoContiguous block = completeBlock(bc.getBlocks()[blkIndex], force); + // replace penultimate block in file + bc.setBlock(blkIndex, block); + return block; + } + + /** + * Convert a specified block of the file to a complete block. + * @throws IOException if the block does not have at least a minimal number + * of replicas reported from data-nodes. + */ + private BlockInfoContiguous completeBlock(BlockInfoContiguous curBlock, + boolean force) throws IOException { if(curBlock.isComplete()) return curBlock; BlockInfoContiguousUnderConstruction ucBlock = @@ -661,9 +688,7 @@ public class BlockManager { throw new IOException( "Cannot complete block: block has not been COMMITTED by the client"); BlockInfoContiguous completeBlock = ucBlock.convertToCompleteBlock(); - // replace penultimate block in file - bc.setBlock(blkIndex, completeBlock); - + // Since safe-mode only counts complete blocks, and we now have // one more complete block, we need to adjust the total up, and // also count it as safe, if we have at least the minimum replica @@ -2800,10 +2825,10 @@ public class BlockManager { processOverReplicatedBlock(block, expectedReplication, null, null); return MisReplicationResult.OVER_REPLICATED; } - + return MisReplicationResult.OK; } - + /** Set replication for the blocks. */ public void setReplication(final short oldRepl, final short newRepl, final String src, final Block b) { @@ -2812,7 +2837,7 @@ public class BlockManager { } // update needReplication priority queues - updateNeededReplications(b, 0, newRepl-oldRepl); + updateNeededReplications(b, 0, newRepl - oldRepl); if (oldRepl > newRepl) { // old replication > the new one; need to remove copies @@ -2845,8 +2870,7 @@ public class BlockManager { if (storage.areBlockContentsStale()) { LOG.info("BLOCK* processOverReplicatedBlock: " + "Postponing processing of over-replicated " + - block + " since storage + " + storage - + "datanode " + cur + " does not yet have up-to-date " + + block + " since storage + " + storage + "datanode " + cur + " does not yet have up-to-date " + "block information."); postponeBlock(block); return; @@ -2862,8 +2886,8 @@ public class BlockManager { } } } - chooseExcessReplicates(nonExcess, block, replication, - addedNode, delNodeHint, blockplacement); + chooseExcessReplicates(nonExcess, block, replication, addedNode, + delNodeHint, blockplacement); } @@ -2969,15 +2993,17 @@ public class BlockManager { private void addToExcessReplicate(DatanodeInfo dn, Block block) { assert namesystem.hasWriteLock(); - LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid()); + LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get( + dn.getDatanodeUuid()); if (excessBlocks == null) { excessBlocks = new LightWeightLinkedSet<Block>(); excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks); } if (excessBlocks.add(block)) { excessBlocksCount.incrementAndGet(); - blockLog.debug("BLOCK* addToExcessReplicate: ({}, {}) is added to" - + " excessReplicateMap", dn, block); + blockLog.debug( + "BLOCK* addToExcessReplicate: ({}, {}) is added to" + " excessReplicateMap", + dn, block); } } @@ -2985,8 +3011,7 @@ public class BlockManager { DatanodeDescriptor node) { if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(block)) { - queueReportedBlock(storageInfo, block, null, - QUEUE_REASON_FUTURE_GENSTAMP); + queueReportedBlock(storageInfo, block, null, QUEUE_REASON_FUTURE_GENSTAMP); return; } removeStoredBlock(block, node); @@ -3093,7 +3118,7 @@ public class BlockManager { processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED, delHintNode); } - + private void processAndHandleReportedBlock( DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState, DatanodeDescriptor delHintNode) @@ -3400,21 +3425,44 @@ public class BlockManager { } } + public boolean checkBlocksProperlyReplicated(String src, + final BlockInfoContiguous[] blocks) { + return checkBlocksProperlyReplicated(src, new Iterable<Block>() { + @Override + public Iterator<Block> iterator() { + return new Iterator<Block>() { + private int index; + @Override + public boolean hasNext() { + return index < blocks.length; + } + + @Override + public Block next() { + return blocks[index++]; + } + }; + } + }); + } + /** * Check that the indicated blocks are present and * replicated. */ - public boolean checkBlocksProperlyReplicated( - String src, BlockInfoContiguous[] blocks) { - for (BlockInfoContiguous b: blocks) { + public boolean checkBlocksProperlyReplicated(String src, Iterable<Block> + blocks) { + for (Block bid : blocks) { + BlockInfoContiguous b = bid instanceof BlockInfoContiguous + ? (BlockInfoContiguous) bid : getStoredBlock(bid); if (!b.isComplete()) { final BlockInfoContiguousUnderConstruction uc = (BlockInfoContiguousUnderConstruction)b; final int numNodes = b.numNodes(); LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = " - + uc.getBlockUCState() + ", replication# = " + numNodes - + (numNodes < minReplication ? " < ": " >= ") - + " minimum = " + minReplication + ") in file " + src); + + uc.getBlockUCState() + ", replication# = " + numNodes + + (numNodes < minReplication ? " < ": " >= ") + + " minimum = " + minReplication + ") in file " + src); return false; } } @@ -3527,6 +3575,11 @@ public class BlockManager { return blocksMap.addBlockCollection(block, bc); } + public BlockInfoContiguous addBlockCollection(BlockInfoContiguous block, + long bcId) { + return blocksMap.addBlockCollection(block, bcId); + } + public long getBlockCollectionId(Block b) { BlockInfoContiguous bi = getStoredBlock(b); return bi == null ? INodeId.INVALID_INODE_ID : bi.getBlockCollectionId(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java index 9a1dc29..e18e384 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java @@ -97,16 +97,20 @@ class BlocksMap { } } + BlockInfoContiguous addBlockCollection(BlockInfoContiguous b, BlockCollection bc) { + return addBlockCollection(b, bc.getId()); + } + /** * Add block b belonging to the specified block collection to the map. */ - BlockInfoContiguous addBlockCollection(BlockInfoContiguous b, BlockCollection bc) { + BlockInfoContiguous addBlockCollection(BlockInfoContiguous b, long bcId) { BlockInfoContiguous info = blocks.get(b); if (info != b) { info = b; blocks.put(info); } - info.setBlockCollectionId(bc.getId()); + info.setBlockCollectionId(bcId); return info; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 33e31e7..a136b1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -32,6 +32,8 @@ import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.PermissionStatus; +import com.google.protobuf.ByteString; +import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; @@ -173,34 +175,34 @@ class FSDirWriteFileOp { final byte storagePolicyID; String clientMachine; - byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); - src = fsn.dir.resolvePath(pc, src, pathComponents); - FileState fileState = analyzeFileState(fsn, src, fileId, clientName, - previous, onRetryBlock); - final INodeFile pendingFile = fileState.inode; - // Check if the penultimate block is minimally replicated - if (!fsn.checkFileProgress(src, pendingFile, false)) { - throw new NotReplicatedYetException("Not replicated yet: " + src); - } + FSDirectory fsd = fsn.getFSDirectory(); + try (ROTransaction tx = fsd.newROTransaction().begin()) { + FileState fileState = analyzeFileState(tx, fsn, src, fileId, clientName, + previous, onRetryBlock); + final FlatINode pendingFile = fileState.inode; + FlatINodeFileFeature f = pendingFile.feature(FlatINodeFileFeature.class); + // Check if the penultimate block is minimally replicated + if (!fsn.checkFileProgress(src, f, false)) { + throw new NotReplicatedYetException("Not replicated yet: " + src); + } - if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) { - // This is a retry. No need to generate new locations. - // Use the last block if it has locations. - return null; - } - if (pendingFile.getBlocks().length >= fsn.maxBlocksPerFile) { - throw new IOException("File has reached the limit on maximum number of" - + " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY - + "): " + pendingFile.getBlocks().length + " >= " - + fsn.maxBlocksPerFile); + if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) { + // This is a retry. No need to generate new locations. + // Use the last block if it has locations. + return null; + } + if (f.numBlocks() >= fsn.maxBlocksPerFile) { + throw new IOException("File has reached the limit on maximum number of" + + " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY + + "): " + f.numBlocks() + " >= " + fsn.maxBlocksPerFile); + } + blockSize = f.blockSize(); + clientMachine = f.clientMachine(); + replication = f.replication(); + storagePolicyID = f.storagePolicyId(); + return new ValidateAddBlockResult(blockSize, replication, storagePolicyID, + clientMachine); } - blockSize = pendingFile.getPreferredBlockSize(); - clientMachine = pendingFile.getFileUnderConstructionFeature() - .getClientMachine(); - replication = pendingFile.getFileReplication(); - storagePolicyID = pendingFile.getStoragePolicyID(); - return new ValidateAddBlockResult(blockSize, replication, storagePolicyID, - clientMachine); } static LocatedBlock makeLocatedBlock(FSNamesystem fsn, Block blk, @@ -226,39 +228,50 @@ class FSDirWriteFileOp { // Run the full analysis again, since things could have changed // while chooseTarget() was executing. LocatedBlock[] onRetryBlock = new LocatedBlock[1]; - FileState fileState = analyzeFileState(fsn, src, fileId, clientName, - previous, onRetryBlock); - final INodeFile pendingFile = fileState.inode; - src = fileState.path; - - if (onRetryBlock[0] != null) { - if (onRetryBlock[0].getLocations().length > 0) { - // This is a retry. Just return the last block if having locations. - return onRetryBlock[0]; - } else { - // add new chosen targets to already allocated block and return - BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock(); - ((BlockInfoContiguousUnderConstruction) lastBlockInFile) - .setExpectedLocations(targets); - offset = pendingFile.computeFileSize(); - return makeLocatedBlock(fsn, lastBlockInFile, targets, offset); + FSDirectory fsd = fsn.getFSDirectory(); + BlockManager bm = fsn.getBlockManager(); + try (RWTransaction tx = fsd.newRWTransaction().begin()) { + FileState fileState = analyzeFileState(tx, fsn, src, fileId, clientName, + previous, onRetryBlock); + final FlatINode inode = fileState.inode; + FlatINodeFileFeature file = inode.feature(FlatINodeFileFeature.class); + src = fileState.path; + + if (onRetryBlock[0] != null) { + if (onRetryBlock[0].getLocations().length > 0) { + // This is a retry. Just return the last block if having locations. + return onRetryBlock[0]; + } else { + // add new chosen targets to already allocated block and return + Block lastBlock = file.lastBlock(); + BlockInfoContiguous lastBlockInFile = bm.getStoredBlock(lastBlock); + ((BlockInfoContiguousUnderConstruction) lastBlockInFile) + .setExpectedLocations(targets); + offset = file.fileSize(); + return makeLocatedBlock(fsn, lastBlockInFile, targets, offset); + } } - } - // commit the last block and complete it if it has minimum replicas - fsn.commitOrCompleteLastBlock(pendingFile, fileState.iip, - ExtendedBlock.getLocalBlock(previous)); + // commit the last block and complete it if it has minimum replicas + FlatINodeFileFeature.Builder newFile = fsn.commitOrCompleteLastBlock( + file, ExtendedBlock.getLocalBlock(previous)); - // allocate new block, record block locations in INode. - Block newBlock = fsn.createNewBlock(); - INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile); - saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets); + // allocate new block, record block locations in INode. + Block newBlock = fsn.createNewBlock(); + saveAllocatedBlock(fsn, src, inode, newBlock, targets); + FlatINode newInode = persistNewBlock(tx, src, inode, newFile, newBlock); + offset = newInode.<FlatINodeFileFeature>feature( + FlatINodeFileFeature.class).fileSize(); - persistNewBlock(fsn, src, pendingFile); - offset = pendingFile.computeFileSize(); + // TODO: Update quota + // check quota limits and updated space consumed +// fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), +// fileINode.getFileReplication(), true); - // Return located block - return makeLocatedBlock(fsn, newBlock, targets, offset); + tx.commit(); + // Return located block + return makeLocatedBlock(fsn, newBlock, targets, offset); + } } static DatanodeStorageInfo[] chooseTargetForNewBlock( @@ -553,72 +566,56 @@ class FSDirWriteFileOp { * Add a block to the file. Returns a reference to the added block. */ private static BlockInfoContiguous addBlock( - FSDirectory fsd, String path, INodesInPath inodesInPath, Block block, + BlockManager bm, String path, FlatINode inode, Block block, DatanodeStorageInfo[] targets) throws IOException { - fsd.writeLock(); - try { - final INodeFile fileINode = inodesInPath.getLastINode().asFile(); - Preconditions.checkState(fileINode.isUnderConstruction()); - - // check quota limits and updated space consumed - fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), - fileINode.getFileReplication(), true); - - // associate new last block for the file - BlockInfoContiguousUnderConstruction blockInfo = + FlatINodeFileFeature f = inode.feature(FlatINodeFileFeature.class); + Preconditions.checkState(f.inConstruction()); + // associate new last block for the file + BlockInfoContiguousUnderConstruction blockInfo = new BlockInfoContiguousUnderConstruction( - block, - fileINode.getFileReplication(), + block, f.replication(), HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets); - fsd.getBlockManager().addBlockCollection(blockInfo, fileINode); - fileINode.addBlock(blockInfo); - if(NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("DIR* FSDirectory.addBlock: " - + path + " with " + block - + " block is added to the in-memory " - + "file system"); - } - return blockInfo; - } finally { - fsd.writeUnlock(); + bm.addBlockCollection(blockInfo, inode.id()); + if(NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("DIR* FSDirectory.addBlock: " + path + + " with " + block + " block is added to the in-memory file system"); } + return blockInfo; } private static FileState analyzeFileState( - FSNamesystem fsn, String src, long fileId, String clientName, + Transaction tx, FSNamesystem fsn, String src, long fileId, String clientName, ExtendedBlock previous, LocatedBlock[] onRetryBlock) throws IOException { assert fsn.hasReadLock(); - + BlockManager bm = fsn.getBlockManager(); checkBlock(fsn, previous); onRetryBlock[0] = null; - fsn.checkNameNodeSafeMode("Cannot add block to " + src); - - // have we exceeded the configured limit of fs objects. - fsn.checkFsObjectLimit(); Block previousBlock = ExtendedBlock.getLocalBlock(previous); - final INode inode; - final INodesInPath iip; - if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) { + final Resolver.Result paths; + if (true || fileId == HdfsConstants.GRANDFATHER_INODE_ID) { // Older clients may not have given us an inode ID to work with. // In this case, we have to try to resolve the path and hope it // hasn't changed or been deleted since the file was opened for write. - iip = fsn.dir.getINodesInPath4Write(src); - inode = iip.getLastINode(); + paths = Resolver.resolve(tx, src); } else { // Newer clients pass the inode ID, so we can just get the inode // directly. - inode = fsn.dir.getInode(fileId); - iip = INodesInPath.fromINode(inode); - if (inode != null) { - src = iip.getPath(); - } + paths = Resolver.resolveById(tx, fileId); + } + if (paths.invalidPath()) { + throw new InvalidPathException(src); + } else if (paths.notFound()) { + throw new FileNotFoundException(src); } - final INodeFile file = fsn.checkLease(src, clientName, inode, fileId); - BlockInfoContiguous lastBlockInFile = file.getLastBlock(); + FlatINode inode = paths.inodesInPath().getLastINode(); + fsn.checkLease(src, clientName, inode); + FlatINodeFileFeature pendingFile = inode.feature(FlatINodeFileFeature.class); + BlockInfoContiguous lastBlockInFile = pendingFile.lastBlock() == null ? + null : bm.getStoredBlock(pendingFile.lastBlock()); if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) { // The block that the client claims is the current last block // doesn't match up with what we think is the last block. There are @@ -646,10 +643,11 @@ class FSDirWriteFileOp { // changed the namesystem state yet. // We run this analysis again in Part II where case 4 is impossible. - BlockInfoContiguous penultimateBlock = file.getPenultimateBlock(); + BlockInfoContiguous penultimateBlock = bm.getStoredBlock( + pendingFile.penultimateBlock()); if (previous == null && lastBlockInFile != null && - lastBlockInFile.getNumBytes() >= file.getPreferredBlockSize() && + lastBlockInFile.getNumBytes() >= pendingFile.blockSize() && lastBlockInFile.isComplete()) { // Case 1 if (NameNode.stateChangeLog.isDebugEnabled()) { @@ -668,15 +666,15 @@ class FSDirWriteFileOp { // Case 2 // Return the last block. - NameNode.stateChangeLog.info("BLOCK* allocateBlock: caught retry for " + - "allocation of a new block in " + src + ". Returning previously" + - " allocated block " + lastBlockInFile); - long offset = file.computeFileSize(); + NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + + "caught retry for allocation of a new block in " + + src + ". Returning previously allocated block " + lastBlockInFile); + long offset = pendingFile.fileSize(); BlockInfoContiguousUnderConstruction lastBlockUC = (BlockInfoContiguousUnderConstruction) lastBlockInFile; onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile, lastBlockUC.getExpectedStorageLocations(), offset); - return new FileState(file, src, iip); + return new FileState(inode, src); } else { // Case 3 throw new IOException("Cannot allocate block in " + src + ": " + @@ -684,7 +682,7 @@ class FSDirWriteFileOp { "last block in file " + lastBlockInFile); } } - return new FileState(file, src, iip); + return new FileState(inode, src); } static boolean completeFile(FSNamesystem fsn, FSPermissionChecker pc, @@ -779,15 +777,25 @@ class FSDirWriteFileOp { /** * Persist the new block (the last block of the given file). */ - private static void persistNewBlock( - FSNamesystem fsn, String path, INodeFile file) { - Preconditions.checkArgument(file.isUnderConstruction()); - fsn.getEditLog().logAddBlock(path, file); + private static FlatINode persistNewBlock( + RWTransaction tx, String path, FlatINode inode, + FlatINodeFileFeature.Builder newFile, Block newBlock) { + Preconditions.checkArgument(newFile.inConstruction()); + newFile.addBlock(newBlock); + FlatINodeFileFeature newFeature = FlatINodeFileFeature.wrap(newFile.build()); + + FlatINode.Builder builder = new FlatINode.Builder() + .mergeFrom(inode).replaceFeature(newFeature); + ByteString newFileBytes = builder.build(); + FlatINode newInode = FlatINode.wrap(newFileBytes); + tx.putINode(inode.id(), newFileBytes); + tx.logAddBlock(path, newFeature); if (NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("persistNewBlock: " - + path + " with new block " + file.getLastBlock().toString() - + ", current total block count is " + file.getBlocks().length); + NameNode.stateChangeLog.debug("persistNewBlock: " + path + + " with new block " + newBlock + ", current total block count is " + + newFeature.numBlocks()); } + return newInode; } /** @@ -795,19 +803,18 @@ class FSDirWriteFileOp { * * @param fsn FSNamesystem * @param src path to the file - * @param inodesInPath representing each of the components of src. - * The last INode is the INode for {@code src} file. + * @param inode the file * @param newBlock newly allocated block to be save * @param targets target datanodes where replicas of the new block is placed * @throws QuotaExceededException If addition of block exceeds space quota */ private static void saveAllocatedBlock( - FSNamesystem fsn, String src, INodesInPath inodesInPath, Block newBlock, + FSNamesystem fsn, String src, FlatINode inode, Block newBlock, DatanodeStorageInfo[] targets) throws IOException { assert fsn.hasWriteLock(); - BlockInfoContiguous b = addBlock(fsn.dir, src, inodesInPath, newBlock, - targets); + BlockManager bm = fsn.getBlockManager(); + BlockInfoContiguous b = addBlock(bm, src, inode, newBlock, targets); NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src); DatanodeStorageInfo.incrementBlocksScheduled(targets); } @@ -842,14 +849,12 @@ class FSDirWriteFileOp { } private static class FileState { - final INodeFile inode; + final FlatINode inode; final String path; - final INodesInPath iip; - FileState(INodeFile inode, String fullPath, INodesInPath iip) { + FileState(FlatINode inode, String fullPath) { this.inode = inode; this.path = fullPath; - this.iip = iip; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index 370050d..739b2d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -772,12 +772,11 @@ public class FSEditLog implements LogsPurgeable { logEdit(op); } - public void logAddBlock(String path, INodeFile file) { - Preconditions.checkArgument(file.isUnderConstruction()); - BlockInfoContiguous[] blocks = file.getBlocks(); - Preconditions.checkState(blocks != null && blocks.length > 0); - BlockInfoContiguous pBlock = blocks.length > 1 ? blocks[blocks.length - 2] : null; - BlockInfoContiguous lastBlock = blocks[blocks.length - 1]; + public void logAddBlock(String path, FlatINodeFileFeature file) { + Preconditions.checkArgument(file.inConstruction()); + Preconditions.checkState(file.numBlocks() > 0); + Block pBlock = file.penultimateBlock(); + Block lastBlock = file.lastBlock(); AddBlockOp op = AddBlockOp.getInstance(cache.get()).setPath(path) .setPenultimateBlock(pBlock).setLastBlock(lastBlock); logEdit(op); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index ed28547..5c12b50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2761,6 +2761,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, readLock(); try { checkOperation(OperationCategory.READ); + checkNameNodeSafeMode("Cannot add block to " + src); r = FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName, previous, onRetryBlock); } finally { @@ -2781,6 +2782,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, LocatedBlock lb; try { checkOperation(OperationCategory.WRITE); + checkNameNodeSafeMode("Cannot add block to " + src); lb = FSDirWriteFileOp.storeAllocatedBlock( this, src, fileId, clientName, previous, targets); } finally { @@ -2919,7 +2921,35 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } return file; } - + + void checkLease(String src, String holder, FlatINode inode) + throws LeaseExpiredException, FileNotFoundException { + assert hasReadLock(); + final String ident = src; + if (inode == null) { + throw new FileNotFoundException(src); + } else if (!inode.isFile()) { + Lease lease = leaseManager.getLease(holder); + throw new LeaseExpiredException( + "No lease on " + ident + ": INode is not a regular file. " + + (lease != null ? lease.toString() + : "Holder " + holder + " does not have any open files.")); + } + FlatINodeFileFeature f = inode.feature(FlatINodeFileFeature.class); + if (!f.inConstruction()) { + Lease lease = leaseManager.getLease(holder); + throw new LeaseExpiredException( + "No lease on " + ident + ": File is not open for writing. " + + (lease != null ? lease.toString() + : "Holder " + holder + " does not have any open files.")); + } + String clientName = f.clientName(); + if (holder != null && !clientName.equals(holder)) { + throw new LeaseExpiredException("Lease mismatch on " + ident + + " owned by " + clientName + " but is accessed by " + holder); + } + } + /** * Complete in-progress write to the given file. * @return true if successful, false if the client should continue to retry @@ -2977,6 +3007,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } /** + * Check that the indicated file's blocks are present and + * replicated. If not, return false. If checkall is true, then check + * all blocks, otherwise check only penultimate block. + */ + boolean checkFileProgress(String src, FlatINodeFileFeature v, + boolean checkall) { + assert hasReadLock(); + if (checkall) { + return blockManager.checkBlocksProperlyReplicated(src, v.blocks()); + } else { + // check the penultimate block of this file + Block b = v.penultimateBlock(); + return b == null || + blockManager.checkBlocksProperlyReplicated(src, Lists.newArrayList + (b)); + } + } + + /** * Change the indicated filename. * @deprecated Use {@link #renameTo(String, String, boolean, * Options.Rename...)} instead. @@ -3537,6 +3586,28 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } } + FlatINodeFileFeature.Builder commitOrCompleteLastBlock( + FlatINodeFileFeature f, Block commitBlock) + throws IOException { + assert hasWriteLock(); + Preconditions.checkArgument(f.inConstruction()); + if (commitBlock == null) { + return new FlatINodeFileFeature.Builder().mergeFrom(f); + } + + Preconditions.checkState(f.numBlocks() > 0); + long newBlockLength = + blockManager.commitOrCompleteLastBlock(f, commitBlock); + FlatINodeFileFeature.Builder b = new FlatINodeFileFeature.Builder() + .mergeFrom(f); + Block newBlock = new Block(commitBlock); + newBlock.setNumBytes(newBlockLength); + b.block(f.numBlocks() - 1, newBlock); + return b; + // TODO: Update quota + // Adjust disk space consumption if required + } + void finalizeINodeFileUnderConstruction( String src, INodeFile pendingFile, int latestSnapshot) throws IOException { assert hasWriteLock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java index b97b11f..4db8fad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java @@ -159,4 +159,8 @@ class RWTransaction extends Transaction { boolean overwrite, boolean logRetryCache) { fsd.getEditLog().logOpenFile(ugid, src, inode, overwrite, logRetryCache); } + + public void logAddBlock(String src, FlatINodeFileFeature file) { + fsd.getEditLog().logAddBlock(src, file); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Resolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Resolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Resolver.java index 5420084..06f9e90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Resolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Resolver.java @@ -127,6 +127,10 @@ class Resolver { return resolve(tx, path); } + public static Result resolveById(Transaction tx, long id) { + throw new IllegalArgumentException("Unimplemented"); + } + // public static Result getInodeById(Transaction tx, long id) // throws IOException { // byte[] inodeKey = Encoding.inodeIdKey(id);