This is an automated email from the ASF dual-hosted git repository. shv pushed a commit to branch fgl in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 0f1a9a81d58f41d80f2925de7b0945c678e086eb Author: Renukaprasad C <prasad_a...@yahoo.co.in> AuthorDate: Fri Jul 23 15:24:34 2021 -0700 HDFS-16130. [FGL] Implement CREATE File with FGL. Contributed by Renukaprasad C. (#3205) --- .../hadoop/hdfs/server/namenode/FSDirMkdirOp.java | 36 ++++++--- .../hdfs/server/namenode/FSDirWriteFileOp.java | 94 +++++++++++++++------- 2 files changed, 87 insertions(+), 43 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java index ef08e9e..f6febe2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java @@ -70,18 +70,7 @@ class FSDirMkdirOp { // create multiple inodes. fsn.checkFsObjectLimit(); - // create all missing directories along the path, - // but don't add them to the INodeMap yet - permissions = addImplicitUwx(permissions, permissions); // SHV !!! - INode[] missing = createPathDirectories(fsd, iip, permissions); - iip = iip.getExistingINodes(); - // switch the locks - fsd.getINodeMap().latchWriteLock(iip, missing); - // Add missing inodes to the INodeMap - for(INode dir : missing) { - iip = addSingleDirectory(fsd, iip, dir, permissions); - assert iip != null : "iip should not be null"; - } + iip = createMissingDirs(fsd, iip, permissions); } return fsd.getAuditFileInfo(iip); } finally { @@ -89,6 +78,26 @@ class FSDirMkdirOp { } } + static INodesInPath createMissingDirs(FSDirectory fsd, + INodesInPath iip, PermissionStatus permissions) throws IOException { + // create all missing directories along the path, + // but don't add them to the INodeMap yet + permissions = addImplicitUwx(permissions, permissions); // SHV !!! + INode[] missing = createPathDirectories(fsd, iip, permissions); + iip = iip.getExistingINodes(); + if (missing.length == 0) { + return iip; + } + // switch the locks + fsd.getINodeMap().latchWriteLock(iip, missing); + // Add missing inodes to the INodeMap + for (INode dir : missing) { + iip = addSingleDirectory(fsd, iip, dir, permissions); + assert iip != null : "iip should not be null"; + } + return iip; + } + /** * For a given absolute path, create all ancestors as directories along the * path. All ancestors inherit their parent's permission plus an implicit @@ -253,6 +262,9 @@ class FSDirMkdirOp { return dir; } + /** + * Find-out missing iNodes for the current mkdir OP. + */ private static INode[] createPathDirectories(FSDirectory fsd, INodesInPath iip, PermissionStatus perm) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 0d9c6ae..f2cca7b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -228,6 +228,13 @@ class FSDirWriteFileOp { // while chooseTarget() was executing. LocatedBlock[] onRetryBlock = new LocatedBlock[1]; INodesInPath iip = fsn.dir.resolvePath(null, src, fileId); + + INode[] missing = new INode[]{iip.getLastINode()}; + INodesInPath existing = iip.getParentINodesInPath(); + FSDirectory fsd = fsn.getFSDirectory(); + // switch the locks + fsd.getINodeMap().latchWriteLock(existing, missing); + FileState fileState = analyzeFileState(fsn, iip, fileId, clientName, previous, onRetryBlock); final INodeFile pendingFile = fileState.inode; @@ -392,8 +399,8 @@ class FSDirWriteFileOp { } fsn.checkFsObjectLimit(); INodeFile newNode = null; - INodesInPath parent = - FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions); + INodesInPath parent = FSDirMkdirOp.createMissingDirs(fsd, + iip.getParentINodesInPath(), permissions); if (parent != null) { iip = addFile(fsd, parent, iip.getLastLocalName(), permissions, replication, blockSize, holder, clientMachine, shouldReplicate, @@ -541,41 +548,22 @@ class FSDirWriteFileOp { FSDirectory fsd, INodesInPath existing, byte[] localName, PermissionStatus permissions, short replication, long preferredBlockSize, String clientName, String clientMachine, boolean shouldReplicate, - String ecPolicyName, String storagePolicy) throws IOException { + String ecPolicyName, String storagePolicy) + throws IOException { Preconditions.checkNotNull(existing); long modTime = now(); INodesInPath newiip; fsd.writeLock(); try { - boolean isStriped = false; - ErasureCodingPolicy ecPolicy = null; - byte storagepolicyid = 0; - if (storagePolicy != null && !storagePolicy.isEmpty()) { - BlockStoragePolicy policy = - fsd.getBlockManager().getStoragePolicy(storagePolicy); - if (policy == null) { - throw new HadoopIllegalArgumentException( - "Cannot find a block policy with the name " + storagePolicy); - } - storagepolicyid = policy.getId(); - } - if (!shouldReplicate) { - ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy( - fsd.getFSNamesystem(), ecPolicyName, existing); - if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) { - isStriped = true; - } - } - final BlockType blockType = isStriped ? - BlockType.STRIPED : BlockType.CONTIGUOUS; - final Short replicationFactor = (!isStriped ? replication : null); - final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null); - INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions, - modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize, - storagepolicyid, blockType); - newNode.setLocalName(localName); - newNode.toUnderConstruction(clientName, clientMachine); + INodeFile newNode = createINodeFile(fsd, existing, localName, + permissions, replication, preferredBlockSize, clientName, + clientMachine, shouldReplicate, ecPolicyName, storagePolicy, modTime); + + INode[] missing = new INode[] {newNode}; + // switch the locks + fsd.getINodeMap().latchWriteLock(existing, missing); + newiip = fsd.addINode(existing, newNode, permissions.getPermission()); } finally { fsd.writeUnlock(); @@ -593,6 +581,42 @@ class FSDirWriteFileOp { return newiip; } + private static INodeFile createINodeFile(FSDirectory fsd, + INodesInPath existing, byte[] localName, PermissionStatus permissions, + short replication, long preferredBlockSize, String clientName, + String clientMachine, boolean shouldReplicate, String ecPolicyName, + String storagePolicy, long modTime) throws IOException { + boolean isStriped = false; + ErasureCodingPolicy ecPolicy = null; + byte storagepolicyid = 0; + if (storagePolicy != null && !storagePolicy.isEmpty()) { + BlockStoragePolicy policy = + fsd.getBlockManager().getStoragePolicy(storagePolicy); + if (policy == null) { + throw new HadoopIllegalArgumentException( + "Cannot find a block policy with the name " + storagePolicy); + } + storagepolicyid = policy.getId(); + } + if (!shouldReplicate) { + ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy( + fsd.getFSNamesystem(), ecPolicyName, existing); + if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) { + isStriped = true; + } + } + final BlockType blockType = isStriped ? + BlockType.STRIPED : BlockType.CONTIGUOUS; + final Short replicationFactor = (!isStriped ? replication : null); + final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null); + INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions, + modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize, + storagepolicyid, blockType); + newNode.setLocalName(localName); + newNode.toUnderConstruction(clientName, clientMachine); + return newNode; + } + private static FileState analyzeFileState( FSNamesystem fsn, INodesInPath iip, long fileId, String clientName, ExtendedBlock previous, LocatedBlock[] onRetryBlock) @@ -687,6 +711,14 @@ class FSDirWriteFileOp { } checkBlock(fsn, last); INodesInPath iip = fsn.dir.resolvePath(pc, src, fileId); + + assert (iip.getLastINode() instanceof INodeFile); + INode[] missing = new INode[] {iip.getLastINode()}; + INodesInPath existing = iip.getParentINodesInPath(); + // switch the locks + FSDirectory fsd = fsn.getFSDirectory(); + fsd.getINodeMap().latchWriteLock(existing, missing); + return completeFileInternal(fsn, iip, holder, ExtendedBlock.getLocalBlock(last), fileId); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org