Author: wheat9
Date: Tue Jun 24 06:56:27 2014
New Revision: 1605016

URL: http://svn.apache.org/r1605016
Log:
HDFS-6562. Refactor rename() in FSDirectory. Contributed by Haohui Mai.

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1605016&r1=1605015&r2=1605016&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Jun 24 
06:56:27 2014
@@ -467,6 +467,8 @@ Release 2.5.0 - UNRELEASED
     HDFS-6578. add toString method to DatanodeStorage for easier debugging.
     (Yongjun Zhang via Arpit Agarwal)
 
+    HDFS-6562. Refactor rename() in FSDirectory. (wheat9)
+
   OPTIMIZATIONS
 
     HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1605016&r1=1605015&r2=1605016&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
 Tue Jun 24 06:56:27 2014
@@ -67,7 +67,6 @@ import org.apache.hadoop.hdfs.protocol.S
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
@@ -469,55 +468,30 @@ public class FSDirectory implements Clos
     assert hasWriteLock();
     INodesInPath srcIIP = getINodesInPath4Write(src, false);
     final INode srcInode = srcIIP.getLastINode();
-    
-    // check the validation of the source
-    if (srcInode == null) {
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + "failed to rename " + src + " to " + dst
-          + " because source does not exist");
-      return false;
-    } 
-    if (srcIIP.getINodes().length == 1) {
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          +"failed to rename "+src+" to "+dst+ " because source is the root");
+    try {
+      validateRenameSource(src, srcIIP);
+    } catch (SnapshotException e) {
+      throw e;
+    } catch (IOException ignored) {
       return false;
     }
-    
-    // srcInode and its subtree cannot contain snapshottable directories with
-    // snapshots
-    List<INodeDirectorySnapshottable> snapshottableDirs = 
-        new ArrayList<INodeDirectorySnapshottable>();
-    checkSnapshot(srcInode, snapshottableDirs);
-    
+
     if (isDir(dst)) {
       dst += Path.SEPARATOR + new Path(src).getName();
     }
-    
-    // check the validity of the destination
+
+    // validate the destination
     if (dst.equals(src)) {
       return true;
     }
-    if (srcInode.isSymlink() && 
-        dst.equals(srcInode.asSymlink().getSymlinkString())) {
-      throw new FileAlreadyExistsException(
-          "Cannot rename symlink "+src+" to its target "+dst);
-    }
-    
-    // dst cannot be directory or a file under src
-    if (dst.startsWith(src) && 
-        dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + "failed to rename " + src + " to " + dst
-          + " because destination starts with src");
+
+    try {
+      validateRenameDestination(src, dst, srcInode);
+    } catch (IOException ignored) {
       return false;
     }
-    
-    byte[][] dstComponents = INode.getPathComponents(dst);
-    INodesInPath dstIIP = getExistingPathINodes(dstComponents);
-    if (dstIIP.isSnapshot()) {
-      throw new SnapshotAccessControlException(
-          "Modification on RO snapshot is disallowed");
-    }
+
+    INodesInPath dstIIP = getINodesInPath4Write(dst, false);
     if (dstIIP.getLastINode() != null) {
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
                                    +"failed to rename "+src+" to "+dst+ 
@@ -535,42 +509,10 @@ public class FSDirectory implements Clos
     // Ensure dst has quota to accommodate rename
     verifyFsLimitsForRename(srcIIP, dstIIP);
     verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
-    
+
+    RenameOperation tx = new RenameOperation(src, dst, srcIIP, dstIIP);
+
     boolean added = false;
-    INode srcChild = srcIIP.getLastINode();
-    final byte[] srcChildName = srcChild.getLocalNameBytes();
-    final boolean isSrcInSnapshot = srcChild.isInLatestSnapshot(
-        srcIIP.getLatestSnapshotId());
-    final boolean srcChildIsReference = srcChild.isReference();
-    
-    // Record the snapshot on srcChild. After the rename, before any new 
-    // snapshot is taken on the dst tree, changes will be recorded in the 
latest
-    // snapshot of the src tree.
-    if (isSrcInSnapshot) {
-      srcChild = srcChild.recordModification(srcIIP.getLatestSnapshotId());
-      srcIIP.setLastINode(srcChild);
-    }
-    
-    // check srcChild for reference
-    final INodeReference.WithCount withCount;
-    Quota.Counts oldSrcCounts = Quota.Counts.newInstance();
-    int srcRefDstSnapshot = srcChildIsReference ? srcChild.asReference()
-        .getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
-    if (isSrcInSnapshot) {
-      final INodeReference.WithName withName = 
-          srcIIP.getINode(-2).asDirectory().replaceChild4ReferenceWithName(
-              srcChild, srcIIP.getLatestSnapshotId()); 
-      withCount = (INodeReference.WithCount) withName.getReferredINode();
-      srcChild = withName;
-      srcIIP.setLastINode(srcChild);
-      // get the counts before rename
-      withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true);
-    } else if (srcChildIsReference) {
-      // srcChild is reference but srcChild is not in latest snapshot
-      withCount = (WithCount) srcChild.asReference().getReferredINode();
-    } else {
-      withCount = null;
-    }
 
     try {
       // remove src
@@ -581,84 +523,22 @@ public class FSDirectory implements Clos
             + " because the source can not be removed");
         return false;
       }
-      
-      if (dstParent.getParent() == null) {
-        // src and dst file/dir are in the same directory, and the dstParent 
has
-        // been replaced when we removed the src. Refresh the dstIIP and
-        // dstParent.
-        dstIIP = getExistingPathINodes(dstComponents);
-        dstParent = dstIIP.getINode(-2);
-      }
-      
-      // add src to the destination
-      
-      srcChild = srcIIP.getLastINode();
-      final byte[] dstChildName = dstIIP.getLastLocalName();
-      final INode toDst;
-      if (withCount == null) {
-        srcChild.setLocalName(dstChildName);
-        toDst = srcChild;
-      } else {
-        withCount.getReferredINode().setLocalName(dstChildName);
-        int dstSnapshotId = dstIIP.getLatestSnapshotId();
-        toDst = new INodeReference.DstReference(
-            dstParent.asDirectory(), withCount, dstSnapshotId);
-      }
-      
-      added = addLastINodeNoQuotaCheck(dstIIP, toDst);
+
+      added = tx.addSourceToDestination();
       if (added) {
         if (NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: 
" 
               + src + " is renamed to " + dst);
         }
-        // update modification time of dst and the parent of src
-        final INode srcParent = srcIIP.getINode(-2);
-        srcParent.updateModificationTime(timestamp, 
srcIIP.getLatestSnapshotId());
-        dstParent = dstIIP.getINode(-2); // refresh dstParent
-        dstParent.updateModificationTime(timestamp, 
dstIIP.getLatestSnapshotId());
-        // update moved leases with new filename
-        getFSNamesystem().unprotectedChangeLease(src, dst);     
-
-        // update the quota usage in src tree
-        if (isSrcInSnapshot) {
-          // get the counts after rename
-          Quota.Counts newSrcCounts = srcChild.computeQuotaUsage(
-              Quota.Counts.newInstance(), false);
-          newSrcCounts.subtract(oldSrcCounts);
-          srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE),
-              newSrcCounts.get(Quota.DISKSPACE), false);
-        }
+
+        tx.updateMtimeAndLease(timestamp);
+        tx.updateQuotasInSourceTree();
         
         return true;
       }
     } finally {
       if (!added) {
-        final INodeDirectory srcParent = srcIIP.getINode(-2).asDirectory();
-        final INode oldSrcChild = srcChild;
-        // put it back
-        if (withCount == null) {
-          srcChild.setLocalName(srcChildName);
-        } else if (!srcChildIsReference) { // src must be in snapshot
-          // the withCount node will no longer be used thus no need to update
-          // its reference number here
-          srcChild = withCount.getReferredINode();
-          srcChild.setLocalName(srcChildName);
-        } else {
-          withCount.removeReference(oldSrcChild.asReference());
-          srcChild = new INodeReference.DstReference(
-              srcParent, withCount, srcRefDstSnapshot);
-          withCount.getReferredINode().setLocalName(srcChildName);
-        }
-        
-        if (isSrcInSnapshot) {
-          // srcParent must have snapshot feature since isSrcInSnapshot is true
-          // and src node has been removed from srcParent 
-          srcParent.undoRename4ScrParent(oldSrcChild.asReference(), srcChild);
-        } else {
-          // original srcChild is not in latest snapshot, we only need to add
-          // the srcChild back
-          addLastINodeNoQuotaCheck(srcIIP, srcChild);
-        }
+        tx.restoreSource();
       }
     }
     NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@@ -681,53 +561,21 @@ public class FSDirectory implements Clos
       FileNotFoundException, ParentNotDirectoryException,
       QuotaExceededException, UnresolvedLinkException, IOException {
     assert hasWriteLock();
-    boolean overwrite = false;
-    if (null != options) {
-      for (Rename option : options) {
-        if (option == Rename.OVERWRITE) {
-          overwrite = true;
-        }
-      }
-    }
+    boolean overwrite = options != null && Arrays.asList(options).contains
+            (Rename.OVERWRITE);
+
     final String error;
     final INodesInPath srcIIP = getINodesInPath4Write(src, false);
     final INode srcInode = srcIIP.getLastINode();
-    // validate source
-    if (srcInode == null) {
-      error = "rename source " + src + " is not found.";
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
-      throw new FileNotFoundException(error);
-    }
-    if (srcIIP.getINodes().length == 1) {
-      error = "rename source cannot be the root";
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
-      throw new IOException(error);
-    }
-    // srcInode and its subtree cannot contain snapshottable directories with
-    // snapshots
-    checkSnapshot(srcInode, null);
-    
+    validateRenameSource(src, srcIIP);
+
     // validate the destination
     if (dst.equals(src)) {
       throw new FileAlreadyExistsException(
           "The source "+src+" and destination "+dst+" are the same");
     }
-    if (srcInode.isSymlink() && 
-        dst.equals(srcInode.asSymlink().getSymlinkString())) {
-      throw new FileAlreadyExistsException(
-          "Cannot rename symlink "+src+" to its target "+dst);
-    }
-    // dst cannot be a directory or a file under src
-    if (dst.startsWith(src) && 
-        dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
-      error = "Rename destination " + dst
-          + " is a directory or file under source " + src;
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + error);
-      throw new IOException(error);
-    }
+    validateRenameDestination(src, dst, srcInode);
+
     INodesInPath dstIIP = getINodesInPath4Write(dst, false);
     if (dstIIP.getINodes().length == 1) {
       error = "rename destination cannot be the root";
@@ -740,30 +588,7 @@ public class FSDirectory implements Clos
     List<INodeDirectorySnapshottable> snapshottableDirs = 
         new ArrayList<INodeDirectorySnapshottable>();
     if (dstInode != null) { // Destination exists
-      // It's OK to rename a file to a symlink and vice versa
-      if (dstInode.isDirectory() != srcInode.isDirectory()) {
-        error = "Source " + src + " and destination " + dst
-            + " must both be directories";
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
-        throw new IOException(error);
-      }
-      if (!overwrite) { // If destination exists, overwrite flag must be true
-        error = "rename destination " + dst + " already exists";
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
-        throw new FileAlreadyExistsException(error);
-      }
-      if (dstInode.isDirectory()) {
-        final ReadOnlyList<INode> children = dstInode.asDirectory()
-            .getChildrenList(Snapshot.CURRENT_STATE_ID);
-        if (!children.isEmpty()) {
-          error = "rename destination directory is not empty: " + dst;
-          NameNode.stateChangeLog.warn(
-              "DIR* FSDirectory.unprotectedRenameTo: " + error);
-          throw new IOException(error);
-        }
-      }
+      validateRenameOverwrite(src, dst, overwrite, srcInode, dstInode);
       checkSnapshot(dstInode, snapshottableDirs);
     }
 
@@ -785,40 +610,8 @@ public class FSDirectory implements Clos
     verifyFsLimitsForRename(srcIIP, dstIIP);
     verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
 
-    INode srcChild = srcIIP.getLastINode();
-    final byte[] srcChildName = srcChild.getLocalNameBytes();
-    final boolean isSrcInSnapshot = srcChild.isInLatestSnapshot(
-        srcIIP.getLatestSnapshotId());
-    final boolean srcChildIsReference = srcChild.isReference();
-    
-    // Record the snapshot on srcChild. After the rename, before any new 
-    // snapshot is taken on the dst tree, changes will be recorded in the 
latest
-    // snapshot of the src tree.
-    if (isSrcInSnapshot) {
-      srcChild = srcChild.recordModification(srcIIP.getLatestSnapshotId());
-      srcIIP.setLastINode(srcChild);
-    }
-    
-    // check srcChild for reference
-    final INodeReference.WithCount withCount;
-    int srcRefDstSnapshot = srcChildIsReference ? srcChild.asReference()
-        .getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
-    Quota.Counts oldSrcCounts = Quota.Counts.newInstance();    
-    if (isSrcInSnapshot) {
-      final INodeReference.WithName withName = 
srcIIP.getINode(-2).asDirectory()
-          .replaceChild4ReferenceWithName(srcChild, 
srcIIP.getLatestSnapshotId()); 
-      withCount = (INodeReference.WithCount) withName.getReferredINode();
-      srcChild = withName;
-      srcIIP.setLastINode(srcChild);
-      // get the counts before rename
-      withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true);
-    } else if (srcChildIsReference) {
-      // srcChild is reference but srcChild is not in latest snapshot
-      withCount = (WithCount) srcChild.asReference().getReferredINode();
-    } else {
-      withCount = null;
-    }
-    
+    RenameOperation tx = new RenameOperation(src, dst, srcIIP, dstIIP);
+
     boolean undoRemoveSrc = true;
     final long removedSrc = removeLastINode(srcIIP);
     if (removedSrc == -1) {
@@ -829,13 +622,6 @@ public class FSDirectory implements Clos
       throw new IOException(error);
     }
     
-    if (dstParent.getParent() == null) {
-      // src and dst file/dir are in the same directory, and the dstParent has
-      // been replaced when we removed the src. Refresh the dstIIP and
-      // dstParent.
-      dstIIP = getINodesInPath4Write(dst, false);
-    }
-    
     boolean undoRemoveDst = false;
     INode removedDst = null;
     long removedNum = 0;
@@ -846,23 +632,9 @@ public class FSDirectory implements Clos
           undoRemoveDst = true;
         }
       }
-      
-      srcChild = srcIIP.getLastINode();
-
-      final byte[] dstChildName = dstIIP.getLastLocalName();
-      final INode toDst;
-      if (withCount == null) {
-        srcChild.setLocalName(dstChildName);
-        toDst = srcChild;
-      } else {
-        withCount.getReferredINode().setLocalName(dstChildName);
-        int dstSnapshotId = dstIIP.getLatestSnapshotId();
-        toDst = new INodeReference.DstReference(
-            dstIIP.getINode(-2).asDirectory(), withCount, dstSnapshotId);
-      }
 
       // add src as dst to complete rename
-      if (addLastINodeNoQuotaCheck(dstIIP, toDst)) {
+      if (tx.addSourceToDestination()) {
         undoRemoveSrc = false;
         if (NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug(
@@ -870,12 +642,7 @@ public class FSDirectory implements Clos
               + " is renamed to " + dst);
         }
 
-        final INode srcParent = srcIIP.getINode(-2);
-        srcParent.updateModificationTime(timestamp, 
srcIIP.getLatestSnapshotId());
-        dstParent = dstIIP.getINode(-2);
-        dstParent.updateModificationTime(timestamp, 
dstIIP.getLatestSnapshotId());
-        // update moved lease with new filename
-        getFSNamesystem().unprotectedChangeLease(src, dst);
+        tx.updateMtimeAndLease(timestamp);
 
         // Collect the blocks and remove the lease for previous dst
         long filesDeleted = -1;
@@ -897,47 +664,15 @@ public class FSDirectory implements Clos
           // deleted. Need to update the SnapshotManager.
           namesystem.removeSnapshottableDirs(snapshottableDirs);
         }
-        
-        // update the quota usage in src tree
-        if (isSrcInSnapshot) {
-          // get the counts after rename
-          Quota.Counts newSrcCounts = srcChild.computeQuotaUsage(
-              Quota.Counts.newInstance(), false);
-          newSrcCounts.subtract(oldSrcCounts);
-          srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE),
-              newSrcCounts.get(Quota.DISKSPACE), false);
-        }
-        
+
+        tx.updateQuotasInSourceTree();
         return filesDeleted >= 0;
       }
     } finally {
       if (undoRemoveSrc) {
-        // Rename failed - restore src
-        final INodeDirectory srcParent = srcIIP.getINode(-2).asDirectory();
-        final INode oldSrcChild = srcChild;
-        // put it back
-        if (withCount == null) {
-          srcChild.setLocalName(srcChildName);
-        } else if (!srcChildIsReference) { // src must be in snapshot
-          // the withCount node will no longer be used thus no need to update
-          // its reference number here
-          srcChild = withCount.getReferredINode();
-          srcChild.setLocalName(srcChildName);
-        } else {
-          withCount.removeReference(oldSrcChild.asReference());
-          srcChild = new INodeReference.DstReference(
-              srcParent, withCount, srcRefDstSnapshot);
-          withCount.getReferredINode().setLocalName(srcChildName);
-        }
-        
-        if (srcParent.isWithSnapshot()) {
-          srcParent.undoRename4ScrParent(oldSrcChild.asReference(), srcChild);
-        } else {
-          // srcParent is not an INodeDirectoryWithSnapshot, we only need to 
add
-          // the srcChild back
-          addLastINodeNoQuotaCheck(srcIIP, srcChild);
-        }
+        tx.restoreSource();
       }
+
       if (undoRemoveDst) {
         // Rename failed - restore dst
         if (dstParent.isDirectory() && 
dstParent.asDirectory().isWithSnapshot()) {
@@ -958,7 +693,200 @@ public class FSDirectory implements Clos
         + "failed to rename " + src + " to " + dst);
     throw new IOException("rename from " + src + " to " + dst + " failed.");
   }
-  
+
+  private static void validateRenameOverwrite(String src, String dst,
+                                              boolean overwrite,
+                                              INode srcInode, INode dstInode)
+          throws IOException {
+    String error;// It's OK to rename a file to a symlink and vice versa
+    if (dstInode.isDirectory() != srcInode.isDirectory()) {
+      error = "Source " + src + " and destination " + dst
+          + " must both be directories";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+    if (!overwrite) { // If destination exists, overwrite flag must be true
+      error = "rename destination " + dst + " already exists";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new FileAlreadyExistsException(error);
+    }
+    if (dstInode.isDirectory()) {
+      final ReadOnlyList<INode> children = dstInode.asDirectory()
+          .getChildrenList(Snapshot.CURRENT_STATE_ID);
+      if (!children.isEmpty()) {
+        error = "rename destination directory is not empty: " + dst;
+        NameNode.stateChangeLog.warn(
+            "DIR* FSDirectory.unprotectedRenameTo: " + error);
+        throw new IOException(error);
+      }
+    }
+  }
+
+  private static void validateRenameDestination(String src, String dst, INode 
srcInode)
+          throws IOException {
+    String error;
+    if (srcInode.isSymlink() &&
+        dst.equals(srcInode.asSymlink().getSymlinkString())) {
+      throw new FileAlreadyExistsException(
+          "Cannot rename symlink "+src+" to its target "+dst);
+    }
+    // dst cannot be a directory or a file under src
+    if (dst.startsWith(src) &&
+        dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
+      error = "Rename destination " + dst
+          + " is a directory or file under source " + src;
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+  }
+
+  private static void validateRenameSource(String src, INodesInPath srcIIP)
+          throws IOException {
+    String error;
+    final INode srcInode = srcIIP.getLastINode();
+    // validate source
+    if (srcInode == null) {
+      error = "rename source " + src + " is not found.";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new FileNotFoundException(error);
+    }
+    if (srcIIP.getINodes().length == 1) {
+      error = "rename source cannot be the root";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+    // srcInode and its subtree cannot contain snapshottable directories with
+    // snapshots
+    checkSnapshot(srcInode, null);
+  }
+
+
+
+  private class RenameOperation {
+    private final INodesInPath srcIIP;
+    private final INodesInPath dstIIP;
+    private final String src;
+    private final String dst;
+
+    private INode srcChild;
+    private final INodeReference.WithCount withCount;
+    private final int srcRefDstSnapshot;
+    private final INodeDirectory srcParent;
+    private final byte[] srcChildName;
+    private final boolean isSrcInSnapshot;
+    private final boolean srcChildIsReference;
+    private final Quota.Counts oldSrcCounts;
+
+    private RenameOperation(String src, String dst, INodesInPath srcIIP, 
INodesInPath dstIIP)
+            throws QuotaExceededException {
+      this.srcIIP = srcIIP;
+      this.dstIIP = dstIIP;
+      this.src = src;
+      this.dst = dst;
+      srcChild = srcIIP.getLastINode();
+      srcChildName = srcChild.getLocalNameBytes();
+      isSrcInSnapshot = srcChild.isInLatestSnapshot(
+              srcIIP.getLatestSnapshotId());
+      srcChildIsReference = srcChild.isReference();
+      srcParent = srcIIP.getINode(-2).asDirectory();
+
+      // Record the snapshot on srcChild. After the rename, before any new
+      // snapshot is taken on the dst tree, changes will be recorded in the 
latest
+      // snapshot of the src tree.
+      if (isSrcInSnapshot) {
+        srcChild = srcChild.recordModification(srcIIP.getLatestSnapshotId());
+      }
+
+      // check srcChild for reference
+      srcRefDstSnapshot = srcChildIsReference ? srcChild.asReference()
+              .getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
+      oldSrcCounts = Quota.Counts.newInstance();
+      if (isSrcInSnapshot) {
+        final INodeReference.WithName withName = 
srcIIP.getINode(-2).asDirectory()
+                .replaceChild4ReferenceWithName(srcChild, 
srcIIP.getLatestSnapshotId());
+        withCount = (INodeReference.WithCount) withName.getReferredINode();
+        srcChild = withName;
+        srcIIP.setLastINode(srcChild);
+        // get the counts before rename
+        withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true);
+      } else if (srcChildIsReference) {
+        // srcChild is reference but srcChild is not in latest snapshot
+        withCount = (WithCount) srcChild.asReference().getReferredINode();
+      } else {
+        withCount = null;
+      }
+    }
+
+    boolean addSourceToDestination() {
+      final INode dstParent = dstIIP.getINode(-2);
+      srcChild = srcIIP.getLastINode();
+      final byte[] dstChildName = dstIIP.getLastLocalName();
+      final INode toDst;
+      if (withCount == null) {
+        srcChild.setLocalName(dstChildName);
+        toDst = srcChild;
+      } else {
+        withCount.getReferredINode().setLocalName(dstChildName);
+        int dstSnapshotId = dstIIP.getLatestSnapshotId();
+        toDst = new INodeReference.DstReference(
+                dstParent.asDirectory(), withCount, dstSnapshotId);
+      }
+      return addLastINodeNoQuotaCheck(dstIIP, toDst);
+    }
+
+    void updateMtimeAndLease(long timestamp) throws QuotaExceededException {
+      srcParent.updateModificationTime(timestamp, 
srcIIP.getLatestSnapshotId());
+      final INode dstParent = dstIIP.getINode(-2);
+      dstParent.updateModificationTime(timestamp, 
dstIIP.getLatestSnapshotId());
+      // update moved lease with new filename
+      getFSNamesystem().unprotectedChangeLease(src, dst);
+    }
+
+    void restoreSource() throws QuotaExceededException {
+      // Rename failed - restore src
+      final INode oldSrcChild = srcChild;
+      // put it back
+      if (withCount == null) {
+        srcChild.setLocalName(srcChildName);
+      } else if (!srcChildIsReference) { // src must be in snapshot
+        // the withCount node will no longer be used thus no need to update
+        // its reference number here
+        srcChild = withCount.getReferredINode();
+        srcChild.setLocalName(srcChildName);
+      } else {
+        withCount.removeReference(oldSrcChild.asReference());
+        srcChild = new INodeReference.DstReference(
+                srcParent, withCount, srcRefDstSnapshot);
+        withCount.getReferredINode().setLocalName(srcChildName);
+      }
+
+      if (isSrcInSnapshot) {
+        srcParent.undoRename4ScrParent(oldSrcChild.asReference(), srcChild);
+      } else {
+        // srcParent is not an INodeDirectoryWithSnapshot, we only need to add
+        // the srcChild back
+        addLastINodeNoQuotaCheck(srcIIP, srcChild);
+      }
+    }
+
+    void updateQuotasInSourceTree() throws QuotaExceededException {
+      // update the quota usage in src tree
+      if (isSrcInSnapshot) {
+        // get the counts after rename
+        Quota.Counts newSrcCounts = srcChild.computeQuotaUsage(
+                Quota.Counts.newInstance(), false);
+        newSrcCounts.subtract(oldSrcCounts);
+        srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE),
+                newSrcCounts.get(Quota.DISKSPACE), false);
+      }
+    }
+  }
+
   /**
    * Set file replication
    * 
@@ -1332,14 +1260,14 @@ public class FSDirectory implements Clos
    *                          but do not have snapshots yet
    */
   private static void checkSnapshot(INode target,
-      List<INodeDirectorySnapshottable> snapshottableDirs) throws IOException {
+      List<INodeDirectorySnapshottable> snapshottableDirs) throws 
SnapshotException {
     if (target.isDirectory()) {
       INodeDirectory targetDir = target.asDirectory();
       if (targetDir.isSnapshottable()) {
         INodeDirectorySnapshottable ssTargetDir = 
             (INodeDirectorySnapshottable) targetDir;
         if (ssTargetDir.getNumSnapshots() > 0) {
-          throw new IOException("The directory " + 
ssTargetDir.getFullPathName()
+          throw new SnapshotException("The directory " + 
ssTargetDir.getFullPathName()
               + " cannot be deleted since " + ssTargetDir.getFullPathName()
               + " is snapshottable and already has snapshots");
         } else {
@@ -2053,10 +1981,6 @@ public class FSDirectory implements Clos
     if (!parent.removeChild(last, latestSnapshot)) {
       return -1;
     }
-    INodeDirectory newParent = last.getParent();
-    if (parent != newParent) {
-      iip.setINode(-2, newParent);
-    }
     
     if (!last.isInLatestSnapshot(latestSnapshot)) {
       final Quota.Counts counts = last.computeQuotaUsage();


Reply via email to