Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
 Tue Aug 19 23:49:39 2014
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNot
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
@@ -32,6 +33,10 @@ import java.net.URI;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Map;
+
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Response;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,13 +50,17 @@ import org.apache.hadoop.hdfs.Distribute
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.web.JsonUtil;
+import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.SecurityUtil;
@@ -64,6 +73,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
+import org.mortbay.util.ajax.JSON;
 
 import com.google.common.base.Joiner;
 
@@ -290,7 +300,8 @@ public class TestDelegationTokensWithHA 
     UserGroupInformation ugi = UserGroupInformation.createRemoteUser("test");
     
     URI haUri = new URI("hdfs://my-ha-uri/");
-    token.setService(HAUtil.buildTokenServiceForLogicalUri(haUri));
+    token.setService(HAUtil.buildTokenServiceForLogicalUri(haUri,
+        HdfsConstants.HDFS_URI_SCHEME));
     ugi.addToken(token);
 
     Collection<InetSocketAddress> nnAddrs = new HashSet<InetSocketAddress>();
@@ -346,7 +357,8 @@ public class TestDelegationTokensWithHA 
   @Test
   public void testDFSGetCanonicalServiceName() throws Exception {
     URI hAUri = HATestUtil.getLogicalUri(cluster);
-    String haService = HAUtil.buildTokenServiceForLogicalUri(hAUri).toString();
+    String haService = HAUtil.buildTokenServiceForLogicalUri(hAUri,
+        HdfsConstants.HDFS_URI_SCHEME).toString();
     assertEquals(haService, dfs.getCanonicalServiceName());
     final String renewer = 
UserGroupInformation.getCurrentUser().getShortUserName();
     final Token<DelegationTokenIdentifier> token =
@@ -362,7 +374,8 @@ public class TestDelegationTokensWithHA 
     Configuration conf = dfs.getConf();
     URI haUri = HATestUtil.getLogicalUri(cluster);
     AbstractFileSystem afs =  AbstractFileSystem.createFileSystem(haUri, 
conf);    
-    String haService = HAUtil.buildTokenServiceForLogicalUri(haUri).toString();
+    String haService = HAUtil.buildTokenServiceForLogicalUri(haUri,
+        HdfsConstants.HDFS_URI_SCHEME).toString();
     assertEquals(haService, afs.getCanonicalServiceName());
     Token<?> token = afs.getDelegationTokens(
         UserGroupInformation.getCurrentUser().getShortUserName()).get(0);
@@ -372,6 +385,90 @@ public class TestDelegationTokensWithHA 
     token.cancel(conf);
   }
   
+  /**
+   * Test if StandbyException can be thrown from StandbyNN, when it's 
requested for 
+   * password. (HDFS-6475). With StandbyException, the client can failover to 
try
+   * activeNN.
+   */
+  @Test
+  public void testDelegationTokenStandbyNNAppearFirst() throws Exception {
+    // make nn0 the standby NN, and nn1 the active NN
+    cluster.transitionToStandby(0);
+    cluster.transitionToActive(1);
+
+    final DelegationTokenSecretManager stSecretManager = 
+        NameNodeAdapter.getDtSecretManager(
+            nn1.getNamesystem());
+
+    // create token
+    final Token<DelegationTokenIdentifier> token =
+        getDelegationToken(fs, "JobTracker");
+    final DelegationTokenIdentifier identifier = new 
DelegationTokenIdentifier();
+    byte[] tokenId = token.getIdentifier();
+    identifier.readFields(new DataInputStream(
+             new ByteArrayInputStream(tokenId)));
+
+    assertTrue(null != stSecretManager.retrievePassword(identifier));
+
+    final UserGroupInformation ugi = UserGroupInformation
+        .createRemoteUser("JobTracker");
+    ugi.addToken(token);
+    
+    ugi.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() {
+        try {
+          try {
+            byte[] tmppw = dtSecretManager.retrievePassword(identifier);
+            fail("InvalidToken with cause StandbyException is expected"
+                + " since nn0 is standby");
+            return tmppw;
+          } catch (IOException e) {
+            // Mimic the UserProvider class logic (server side) by throwing
+            // SecurityException here
+            throw new SecurityException(
+                "Failed to obtain user group information: " + e, e);
+          }
+        } catch (Exception oe) {
+          //
+          // The exception oe caught here is
+          //     java.lang.SecurityException: Failed to obtain user group
+          //     information: org.apache.hadoop.security.token.
+          //     SecretManager$InvalidToken: StandbyException
+          //
+          HttpServletResponse response = mock(HttpServletResponse.class);
+          ExceptionHandler eh = new ExceptionHandler();
+          eh.initResponse(response);
+          
+          // The Response (resp) below is what the server will send to client  
        
+          //
+          // BEFORE HDFS-6475 fix, the resp.entity is
+          //     {"RemoteException":{"exception":"SecurityException",
+          //      "javaClassName":"java.lang.SecurityException",
+          //      "message":"Failed to obtain user group information: 
+          //      org.apache.hadoop.security.token.SecretManager$InvalidToken:
+          //        StandbyException"}}
+          // AFTER the fix, the resp.entity is
+          //     {"RemoteException":{"exception":"StandbyException",
+          //      "javaClassName":"org.apache.hadoop.ipc.StandbyException",
+          //      "message":"Operation category READ is not supported in
+          //       state standby"}}
+          //
+          Response resp = eh.toResponse(oe);
+          
+          // Mimic the client side logic by parsing the response from server
+          //
+          Map<?, ?> m = (Map<?, ?>)JSON.parse(resp.getEntity().toString());
+          RemoteException re = JsonUtil.toRemoteException(m);
+          Exception unwrapped = ((RemoteException)re).unwrapRemoteException(
+              StandbyException.class);
+          assertTrue (unwrapped instanceof StandbyException);
+          return null;
+        }
+      }
+    });
+  }
+
   @SuppressWarnings("unchecked")
   private Token<DelegationTokenIdentifier> getDelegationToken(FileSystem fs,
       String renewer) throws IOException {

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java
 Tue Aug 19 23:49:39 2014
@@ -63,6 +63,8 @@ public class TestFailoverWithBlockTokens
   public void startCluster() throws IOException {
     conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    // Set short retry timeouts so this test runs faster
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
     cluster = new MiniDFSCluster.Builder(conf)
         .nnTopology(MiniDFSNNTopology.simpleHATopology())
         .numDataNodes(1)

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java
 Tue Aug 19 23:49:39 2014
@@ -104,7 +104,7 @@ public class TestFailureToReadEdits {
     HAUtil.setAllowStandbyReads(conf, true);
     
     if (clusterType == TestType.SHARED_DIR_HA) {
-      MiniDFSNNTopology topology = MiniQJMHACluster.createDefaultTopology();
+      MiniDFSNNTopology topology = 
MiniQJMHACluster.createDefaultTopology(10000);
       cluster = new MiniDFSCluster.Builder(conf)
         .nnTopology(topology)
         .numDataNodes(0)

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java
 Tue Aug 19 23:49:39 2014
@@ -36,6 +36,7 @@ import org.apache.commons.logging.impl.L
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -819,10 +820,13 @@ public class TestHASafeMode {
           null);
       create.write(testData.getBytes());
       create.hflush();
+      long fileId = ((DFSOutputStream)create.
+          getWrappedStream()).getFileId();
+      FileStatus fileStatus = dfs.getFileStatus(filePath);
       DFSClient client = DFSClientAdapter.getClient(dfs);
       // add one dummy block at NN, but not write to DataNode
-      ExtendedBlock previousBlock = DFSClientAdapter.getPreviousBlock(client,
-          pathString);
+      ExtendedBlock previousBlock =
+          DFSClientAdapter.getPreviousBlock(client, fileId);
       DFSClientAdapter.getNamenode(client).addBlock(
           pathString,
           client.getClientName(),

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
 Tue Aug 19 23:49:39 2014
@@ -356,7 +356,8 @@ public class TestPipelinesFailover {
       
       NameNode nn0 = cluster.getNameNode(0);
       ExtendedBlock blk = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
-      DatanodeDescriptor expectedPrimary = getExpectedPrimaryNode(nn0, blk);
+      DatanodeDescriptor expectedPrimary =
+          DFSTestUtil.getExpectedPrimaryNode(nn0, blk);
       LOG.info("Expecting block recovery to be triggered on DN " +
           expectedPrimary);
       
@@ -506,37 +507,6 @@ public class TestPipelinesFailover {
     }
   }
 
-
-
-  /**
-   * @return the node which is expected to run the recovery of the
-   * given block, which is known to be under construction inside the
-   * given NameNOde.
-   */
-  private DatanodeDescriptor getExpectedPrimaryNode(NameNode nn,
-      ExtendedBlock blk) {
-    BlockManager bm0 = nn.getNamesystem().getBlockManager();
-    BlockInfo storedBlock = bm0.getStoredBlock(blk.getLocalBlock());
-    assertTrue("Block " + blk + " should be under construction, " +
-        "got: " + storedBlock,
-        storedBlock instanceof BlockInfoUnderConstruction);
-    BlockInfoUnderConstruction ucBlock =
-      (BlockInfoUnderConstruction)storedBlock;
-    // We expect that the replica with the most recent heart beat will be
-    // the one to be in charge of the synchronization / recovery protocol.
-    final DatanodeStorageInfo[] storages = 
ucBlock.getExpectedStorageLocations();
-    DatanodeStorageInfo expectedPrimary = storages[0];
-    long mostRecentLastUpdate = 
expectedPrimary.getDatanodeDescriptor().getLastUpdate();
-    for (int i = 1; i < storages.length; i++) {
-      final long lastUpdate = 
storages[i].getDatanodeDescriptor().getLastUpdate();
-      if (lastUpdate > mostRecentLastUpdate) {
-        expectedPrimary = storages[i];
-        mostRecentLastUpdate = lastUpdate;
-      }
-    }
-    return expectedPrimary.getDatanodeDescriptor();
-  }
-
   private DistributedFileSystem createFsAsOtherUser(
       final MiniDFSCluster cluster, final Configuration conf)
       throws IOException, InterruptedException {

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
 Tue Aug 19 23:49:39 2014
@@ -33,6 +33,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -45,9 +46,11 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSOutputStream;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -126,6 +129,7 @@ public class TestRetryCacheWithHA {
     
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, 
ResponseSize);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 
ResponseSize);
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
     cluster = new MiniDFSCluster.Builder(conf)
         .nnTopology(MiniDFSNNTopology.simpleHATopology())
         .numDataNodes(DataNodes).build();
@@ -157,7 +161,7 @@ public class TestRetryCacheWithHA {
     FSNamesystem fsn0 = cluster.getNamesystem(0);
     LightWeightCache<CacheEntry, CacheEntry> cacheSet = 
         (LightWeightCache<CacheEntry, CacheEntry>) 
fsn0.getRetryCache().getCacheSet();
-    assertEquals(20, cacheSet.size());
+    assertEquals(23, cacheSet.size());
     
     Map<CacheEntry, CacheEntry> oldEntries = 
         new HashMap<CacheEntry, CacheEntry>();
@@ -178,7 +182,7 @@ public class TestRetryCacheWithHA {
     FSNamesystem fsn1 = cluster.getNamesystem(1);
     cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) fsn1
         .getRetryCache().getCacheSet();
-    assertEquals(20, cacheSet.size());
+    assertEquals(23, cacheSet.size());
     iter = cacheSet.iterator();
     while (iter.hasNext()) {
       CacheEntry entry = iter.next();
@@ -188,12 +192,9 @@ public class TestRetryCacheWithHA {
   
   private DFSClient genClientWithDummyHandler() throws IOException {
     URI nnUri = dfs.getUri();
-    Class<FailoverProxyProvider<ClientProtocol>> failoverProxyProviderClass = 
-        NameNodeProxies.getFailoverProxyProviderClass(conf, nnUri, 
-            ClientProtocol.class);
     FailoverProxyProvider<ClientProtocol> failoverProxyProvider = 
         NameNodeProxies.createFailoverProxyProvider(conf, 
-            failoverProxyProviderClass, ClientProtocol.class, nnUri);
+            nnUri, ClientProtocol.class, true);
     InvocationHandler dummyHandler = new DummyRetryInvocationHandler(
         failoverProxyProvider, RetryPolicies
         .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
@@ -725,7 +726,12 @@ public class TestRetryCacheWithHA {
       
       client.getNamenode().updatePipeline(client.getClientName(), oldBlock,
           newBlock, newNodes, storageIDs);
-      out.close();
+      // close can fail if the out.close() commit the block after block 
received
+      // notifications from Datanode.
+      // Since datanodes and output stream have still old genstamps, these
+      // blocks will be marked as corrupt after HDFS-5723 if RECEIVED
+      // notifications reaches namenode first and close() will fail.
+      DFSTestUtil.abortStream((DFSOutputStream) out.getWrappedStream());
     }
 
     @Override
@@ -1004,6 +1010,91 @@ public class TestRetryCacheWithHA {
       return null;
     }
   }
+  
+  /** setXAttr */
+  class SetXAttrOp extends AtMostOnceOp {
+    private final String src;
+
+    SetXAttrOp(DFSClient client, String src) {
+      super("setXAttr", client);
+      this.src = src;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      Path p = new Path(src);
+      if (!dfs.exists(p)) {
+        DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      client.setXAttr(src, "user.key", "value".getBytes(),
+          EnumSet.of(XAttrSetFlag.CREATE));
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      for (int i = 0; i < CHECKTIMES; i++) {
+        Map<String, byte[]> iter = dfs.getXAttrs(new Path(src));
+        Set<String> keySet = iter.keySet();
+        if (keySet.contains("user.key")) {
+          return true;
+        }
+        Thread.sleep(1000);
+      }
+      return false;
+    }
+
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
+
+  /** removeXAttr */
+  class RemoveXAttrOp extends AtMostOnceOp {
+    private final String src;
+
+    RemoveXAttrOp(DFSClient client, String src) {
+      super("removeXAttr", client);
+      this.src = src;
+    }
+
+    @Override
+    void prepare() throws Exception {
+      Path p = new Path(src);
+      if (!dfs.exists(p)) {
+        DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
+        client.setXAttr(src, "user.key", "value".getBytes(),
+          EnumSet.of(XAttrSetFlag.CREATE));
+      }
+    }
+
+    @Override
+    void invoke() throws Exception {
+      client.removeXAttr(src, "user.key");
+    }
+
+    @Override
+    boolean checkNamenodeBeforeReturn() throws Exception {
+      for (int i = 0; i < CHECKTIMES; i++) {
+        Map<String, byte[]> iter = dfs.getXAttrs(new Path(src));
+        Set<String> keySet = iter.keySet();
+        if (!keySet.contains("user.key")) {
+          return true;
+        }
+        Thread.sleep(1000);
+      }
+      return false;
+    }
+
+    @Override
+    Object getResult() {
+      return null;
+    }
+  }
 
   @Test (timeout=60000)
   public void testCreateSnapshot() throws Exception {
@@ -1133,6 +1224,20 @@ public class TestRetryCacheWithHA {
     AtMostOnceOp op = new RemoveCachePoolOp(client, "pool");
     testClientRetryWithFailover(op);
   }
+  
+  @Test (timeout=60000)
+  public void testSetXAttr() throws Exception {
+    DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new SetXAttrOp(client, "/setxattr");
+    testClientRetryWithFailover(op);
+  }
+
+  @Test (timeout=60000)
+  public void testRemoveXAttr() throws Exception {
+    DFSClient client = genClientWithDummyHandler();
+    AtMostOnceOp op = new RemoveXAttrOp(client, "/removexattr");
+    testClientRetryWithFailover(op);
+  }
 
   /**
    * When NN failover happens, if the client did not receive the response and

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
 Tue Aug 19 23:49:39 2014
@@ -25,12 +25,14 @@ import static org.junit.Assert.fail;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.net.BindException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
 import java.net.URI;
 import java.net.URL;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -66,53 +68,74 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
-
+import com.google.common.io.Files;
 
 public class TestStandbyCheckpoints {
   private static final int NUM_DIRS_IN_LOG = 200000;
   protected MiniDFSCluster cluster;
   protected NameNode nn0, nn1;
   protected FileSystem fs;
+  private final Random random = new Random();
+  protected File tmpOivImgDir;
   
   private static final Log LOG = 
LogFactory.getLog(TestStandbyCheckpoints.class);
 
   @SuppressWarnings("rawtypes")
   @Before
   public void setupCluster() throws Exception {
-    Configuration conf = new Configuration();
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
-    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
-    
+    Configuration conf = setupCommonConfig();
+
     // Dial down the retention of extra edits and checkpoints. This is to
     // help catch regressions of HDFS-4238 (SBN should not purge shared edits)
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY, 0);
-    
+
+    int retryCount = 0;
+    while (true) {
+      try {
+        int basePort = 10060 + random.nextInt(100) * 2;
+        MiniDFSNNTopology topology = new MiniDFSNNTopology()
+            .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
+                .addNN(new 
MiniDFSNNTopology.NNConf("nn1").setHttpPort(basePort))
+                .addNN(new 
MiniDFSNNTopology.NNConf("nn2").setHttpPort(basePort + 1)));
+
+        cluster = new MiniDFSCluster.Builder(conf)
+            .nnTopology(topology)
+            .numDataNodes(0)
+            .build();
+        cluster.waitActive();
+
+        nn0 = cluster.getNameNode(0);
+        nn1 = cluster.getNameNode(1);
+        fs = HATestUtil.configureFailoverFs(cluster, conf);
+
+        cluster.transitionToActive(0);
+        ++retryCount;
+        break;
+      } catch (BindException e) {
+        LOG.info("Set up MiniDFSCluster failed due to port conflicts, retry "
+            + retryCount + " times");
+      }
+    }
+  }
+
+  protected Configuration setupCommonConfig() {
+    tmpOivImgDir = Files.createTempDir();
+
+    Configuration conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+    conf.set(DFSConfigKeys.DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY,
+        tmpOivImgDir.getAbsolutePath());
     conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
     conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
         SlowCodec.class.getCanonicalName());
     CompressionCodecFactory.setCodecClasses(conf,
         ImmutableList.<Class>of(SlowCodec.class));
-
-    MiniDFSNNTopology topology = new MiniDFSNNTopology()
-      .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
-        .addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10061))
-        .addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10062)));
-    
-    cluster = new MiniDFSCluster.Builder(conf)
-      .nnTopology(topology)
-      .numDataNodes(0)
-      .build();
-    cluster.waitActive();
-    
-    nn0 = cluster.getNameNode(0);
-    nn1 = cluster.getNameNode(1);
-    fs = HATestUtil.configureFailoverFs(cluster, conf);
-
-    cluster.transitionToActive(0);
+    return conf;
   }
-  
+
   @After
   public void shutdownCluster() throws IOException {
     if (cluster != null) {
@@ -129,6 +152,21 @@ public class TestStandbyCheckpoints {
     // Once the standby catches up, it should notice that it needs to
     // do a checkpoint and save one to its local directories.
     HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        if(tmpOivImgDir.list().length > 0) {
+          return true;
+        }
+        else {
+          return false;
+        }
+      }
+    }, 1000, 60000);
+    
+    // It should have saved the oiv image too.
+    assertEquals("One file is expected", 1, tmpOivImgDir.list().length);
     
     // It should also upload it back to the active.
     HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12));

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestAclWithSnapshot.java
 Tue Aug 19 23:49:39 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -119,14 +120,14 @@ public class TestAclWithSnapshot {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)0750, path);
+    assertPermission((short)010750, path);
 
     s = hdfs.getAclStatus(snapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)0750, snapshotPath);
+    assertPermission((short)010750, snapshotPath);
 
     assertDirPermissionGranted(fsAsBruce, BRUCE, snapshotPath);
     assertDirPermissionDenied(fsAsDiana, DIANA, snapshotPath);
@@ -153,14 +154,14 @@ public class TestAclWithSnapshot {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "diana", READ_EXECUTE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)0550, path);
+    assertPermission((short)010550, path);
 
     s = hdfs.getAclStatus(snapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)0750, snapshotPath);
+    assertPermission((short)010750, snapshotPath);
 
     assertDirPermissionDenied(fsAsBruce, BRUCE, path);
     assertDirPermissionGranted(fsAsDiana, DIANA, path);
@@ -202,24 +203,24 @@ public class TestAclWithSnapshot {
     AclStatus s = hdfs.getAclStatus(filePath);
     AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)0550, filePath);
+    assertPermission((short)010550, filePath);
 
     s = hdfs.getAclStatus(subdirPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)0550, subdirPath);
+    assertPermission((short)010550, subdirPath);
 
     s = hdfs.getAclStatus(fileSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)0550, fileSnapshotPath);
+    assertPermission((short)010550, fileSnapshotPath);
     assertFilePermissionGranted(fsAsBruce, BRUCE, fileSnapshotPath);
     assertFilePermissionDenied(fsAsDiana, DIANA, fileSnapshotPath);
 
     s = hdfs.getAclStatus(subdirSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)0550, subdirSnapshotPath);
+    assertPermission((short)010550, subdirSnapshotPath);
     assertDirPermissionGranted(fsAsBruce, BRUCE, subdirSnapshotPath);
     assertDirPermissionDenied(fsAsDiana, DIANA, subdirSnapshotPath);
 
@@ -251,14 +252,14 @@ public class TestAclWithSnapshot {
     AclStatus s = hdfs.getAclStatus(filePath);
     AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)0570, filePath);
+    assertPermission((short)010570, filePath);
     assertFilePermissionDenied(fsAsBruce, BRUCE, filePath);
     assertFilePermissionGranted(fsAsDiana, DIANA, filePath);
 
     s = hdfs.getAclStatus(subdirPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)0570, subdirPath);
+    assertPermission((short)010570, subdirPath);
     assertDirPermissionDenied(fsAsBruce, BRUCE, subdirPath);
     assertDirPermissionGranted(fsAsDiana, DIANA, subdirPath);
 
@@ -268,14 +269,14 @@ public class TestAclWithSnapshot {
     s = hdfs.getAclStatus(fileSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)0550, fileSnapshotPath);
+    assertPermission((short)010550, fileSnapshotPath);
     assertFilePermissionGranted(fsAsBruce, BRUCE, fileSnapshotPath);
     assertFilePermissionDenied(fsAsDiana, DIANA, fileSnapshotPath);
 
     s = hdfs.getAclStatus(subdirSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)0550, subdirSnapshotPath);
+    assertPermission((short)010550, subdirSnapshotPath);
     assertDirPermissionGranted(fsAsBruce, BRUCE, subdirSnapshotPath);
     assertDirPermissionDenied(fsAsDiana, DIANA, subdirSnapshotPath);
   }
@@ -302,14 +303,14 @@ public class TestAclWithSnapshot {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)0750, path);
+    assertPermission((short)010750, path);
 
     s = hdfs.getAclStatus(snapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)0750, snapshotPath);
+    assertPermission((short)010750, snapshotPath);
 
     assertDirPermissionGranted(fsAsBruce, BRUCE, snapshotPath);
     assertDirPermissionDenied(fsAsDiana, DIANA, snapshotPath);
@@ -336,7 +337,7 @@ public class TestAclWithSnapshot {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_EXECUTE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)0750, snapshotPath);
+    assertPermission((short)010750, snapshotPath);
 
     assertDirPermissionDenied(fsAsBruce, BRUCE, path);
     assertDirPermissionDenied(fsAsDiana, DIANA, path);
@@ -378,24 +379,24 @@ public class TestAclWithSnapshot {
     AclStatus s = hdfs.getAclStatus(filePath);
     AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)0550, filePath);
+    assertPermission((short)010550, filePath);
 
     s = hdfs.getAclStatus(subdirPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)0550, subdirPath);
+    assertPermission((short)010550, subdirPath);
 
     s = hdfs.getAclStatus(fileSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)0550, fileSnapshotPath);
+    assertPermission((short)010550, fileSnapshotPath);
     assertFilePermissionGranted(fsAsBruce, BRUCE, fileSnapshotPath);
     assertFilePermissionDenied(fsAsDiana, DIANA, fileSnapshotPath);
 
     s = hdfs.getAclStatus(subdirSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)0550, subdirSnapshotPath);
+    assertPermission((short)010550, subdirSnapshotPath);
     assertDirPermissionGranted(fsAsBruce, BRUCE, subdirSnapshotPath);
     assertDirPermissionDenied(fsAsDiana, DIANA, subdirSnapshotPath);
 
@@ -437,14 +438,14 @@ public class TestAclWithSnapshot {
     s = hdfs.getAclStatus(fileSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)0550, fileSnapshotPath);
+    assertPermission((short)010550, fileSnapshotPath);
     assertFilePermissionGranted(fsAsBruce, BRUCE, fileSnapshotPath);
     assertFilePermissionDenied(fsAsDiana, DIANA, fileSnapshotPath);
 
     s = hdfs.getAclStatus(subdirSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)0550, subdirSnapshotPath);
+    assertPermission((short)010550, subdirSnapshotPath);
     assertDirPermissionGranted(fsAsBruce, BRUCE, subdirSnapshotPath);
     assertDirPermissionDenied(fsAsDiana, DIANA, subdirSnapshotPath);
   }
@@ -470,7 +471,7 @@ public class TestAclWithSnapshot {
     AclStatus s = hdfs.getAclStatus(path);
     AclEntry[] returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(expected, returned);
-    assertPermission((short)0770, path);
+    assertPermission((short)010770, path);
     assertDirPermissionGranted(fsAsBruce, BRUCE, path);
     assertDirPermissionGranted(fsAsDiana, DIANA, path);
   }
@@ -514,7 +515,7 @@ public class TestAclWithSnapshot {
       aclEntry(DEFAULT, GROUP, NONE),
       aclEntry(DEFAULT, MASK, READ_EXECUTE),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission((short)0700, path);
+    assertPermission((short)010700, path);
 
     s = hdfs.getAclStatus(snapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
@@ -524,7 +525,7 @@ public class TestAclWithSnapshot {
       aclEntry(DEFAULT, GROUP, NONE),
       aclEntry(DEFAULT, MASK, READ_EXECUTE),
       aclEntry(DEFAULT, OTHER, NONE) }, returned);
-    assertPermission((short)0700, snapshotPath);
+    assertPermission((short)010700, snapshotPath);
 
     assertDirPermissionDenied(fsAsBruce, BRUCE, snapshotPath);
   }
@@ -596,14 +597,14 @@ public class TestAclWithSnapshot {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_WRITE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)0660, filePath);
+    assertPermission((short)010660, filePath);
 
     s = hdfs.getAclStatus(fileSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_WRITE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)0660, filePath);
+    assertPermission((short)010660, filePath);
 
     aclSpec = Lists.newArrayList(
       aclEntry(ACCESS, USER, "bruce", READ));
@@ -632,14 +633,14 @@ public class TestAclWithSnapshot {
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_WRITE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)0660, filePath);
+    assertPermission((short)010660, filePath);
 
     s = hdfs.getAclStatus(fileSnapshotPath);
     returned = s.getEntries().toArray(new AclEntry[0]);
     assertArrayEquals(new AclEntry[] {
       aclEntry(ACCESS, USER, "bruce", READ_WRITE),
       aclEntry(ACCESS, GROUP, NONE) }, returned);
-    assertPermission((short)0660, filePath);
+    assertPermission((short)010660, filePath);
 
     aclSpec = Lists.newArrayList(
       aclEntry(ACCESS, USER, "bruce", READ));
@@ -674,6 +675,13 @@ public class TestAclWithSnapshot {
     } catch (AccessControlException e) {
       // expected
     }
+
+    try {
+      fs.access(pathToCheck, FsAction.READ);
+      fail("The access call should have failed for "+pathToCheck);
+    } catch (AccessControlException e) {
+      // expected
+    }
   }
 
   /**
@@ -689,6 +697,7 @@ public class TestAclWithSnapshot {
       UserGroupInformation user, Path pathToCheck) throws Exception {
     try {
       fs.listStatus(pathToCheck);
+      fs.access(pathToCheck, FsAction.READ);
     } catch (AccessControlException e) {
       fail("expected permission granted for user " + user + ", path = " +
         pathToCheck);

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeFileUnderConstructionWithSnapshot.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeFileUnderConstructionWithSnapshot.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeFileUnderConstructionWithSnapshot.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeFileUnderConstructionWithSnapshot.java
 Tue Aug 19 23:49:39 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import 
org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
 import org.apache.log4j.Level;
@@ -153,8 +154,7 @@ public class TestINodeFileUnderConstruct
     // deleted list, with size BLOCKSIZE*2
     INodeFile fileNode = (INodeFile) fsdir.getINode(file.toString());
     assertEquals(BLOCKSIZE * 2, fileNode.computeFileSize());
-    INodeDirectorySnapshottable dirNode = (INodeDirectorySnapshottable) fsdir
-        .getINode(dir.toString());
+    INodeDirectory dirNode = fsdir.getINode(dir.toString()).asDirectory();
     DirectoryDiff last = dirNode.getDiffs().getLast();
     
     // 2. append without closing stream
@@ -162,7 +162,7 @@ public class TestINodeFileUnderConstruct
     out.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
     
     // re-check nodeInDeleted_S0
-    dirNode = (INodeDirectorySnapshottable) fsdir.getINode(dir.toString());
+    dirNode = fsdir.getINode(dir.toString()).asDirectory();
     assertEquals(BLOCKSIZE * 2, 
fileNode.computeFileSize(last.getSnapshotId()));
     
     // 3. take snapshot --> close stream
@@ -172,7 +172,7 @@ public class TestINodeFileUnderConstruct
     // check: an INodeFileUnderConstructionWithSnapshot with size BLOCKSIZE*3 
should
     // have been stored in s1's deleted list
     fileNode = (INodeFile) fsdir.getINode(file.toString());
-    dirNode = (INodeDirectorySnapshottable) fsdir.getINode(dir.toString());
+    dirNode = fsdir.getINode(dir.toString()).asDirectory();
     last = dirNode.getDiffs().getLast();
     assertTrue(fileNode.isWithSnapshot());
     assertEquals(BLOCKSIZE * 3, 
fileNode.computeFileSize(last.getSnapshotId()));

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java
 Tue Aug 19 23:49:39 2014
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
-import static 
org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable.SNAPSHOT_LIMIT;
+import static 
org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature.SNAPSHOT_LIMIT;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -312,10 +312,9 @@ public class TestNestedSnapshots {
   public void testIdCmp() {
     final PermissionStatus perm = PermissionStatus.createImmutable(
         "user", "group", FsPermission.createImmutable((short)0));
-    final INodeDirectory dir = new INodeDirectory(0,
+    final INodeDirectory snapshottable = new INodeDirectory(0,
         DFSUtil.string2Bytes("foo"), perm, 0L);
-    final INodeDirectorySnapshottable snapshottable
-        = new INodeDirectorySnapshottable(dir);
+    snapshottable.addSnapshottableFeature();
     final Snapshot[] snapshots = {
       new Snapshot(1, "s1", snapshottable),
       new Snapshot(1, "s1", snapshottable),
@@ -362,7 +361,7 @@ public class TestNestedSnapshots {
     
     hdfs.allowSnapshot(sub);
     subNode = fsdir.getINode(sub.toString());
-    assertTrue(subNode instanceof INodeDirectorySnapshottable);
+    assertTrue(subNode.isDirectory() && 
subNode.asDirectory().isSnapshottable());
     
     hdfs.disallowSnapshot(sub);
     subNode = fsdir.getINode(sub.toString());

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
 Tue Aug 19 23:49:39 2014
@@ -169,12 +169,11 @@ public class TestRenameWithSnapshots {
   }
   
   private static boolean existsInDiffReport(List<DiffReportEntry> entries,
-      DiffType type, String relativePath) {
+      DiffType type, String sourcePath, String targetPath) {
     for (DiffReportEntry entry : entries) {
-      System.out.println("DiffEntry is:" + entry.getType() + "\""
-          + new String(entry.getRelativePath()) + "\"");
-      if ((entry.getType() == type)
-          && ((new String(entry.getRelativePath())).compareTo(relativePath) == 
0)) {
+      if (entry.equals(new DiffReportEntry(type, DFSUtil
+          .string2Bytes(sourcePath), targetPath == null ? null : DFSUtil
+          .string2Bytes(targetPath)))) {
         return true;
       }
     }
@@ -197,8 +196,9 @@ public class TestRenameWithSnapshots {
     SnapshotDiffReport diffReport = hdfs.getSnapshotDiffReport(sub1, snap1, 
"");
     List<DiffReportEntry> entries = diffReport.getDiffList();
     assertTrue(entries.size() == 2);
-    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, ""));
-    assertTrue(existsInDiffReport(entries, DiffType.CREATE, file2.getName()));
+    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, "", null));
+    assertTrue(existsInDiffReport(entries, DiffType.CREATE, file2.getName(),
+        null));
   }
 
   /**
@@ -217,10 +217,10 @@ public class TestRenameWithSnapshots {
     SnapshotDiffReport diffReport = hdfs.getSnapshotDiffReport(sub1, snap1, 
"");
     System.out.println("DiffList is " + diffReport.toString());
     List<DiffReportEntry> entries = diffReport.getDiffList();
-    assertTrue(entries.size() == 3);
-    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, ""));
-    assertTrue(existsInDiffReport(entries, DiffType.CREATE, file2.getName()));
-    assertTrue(existsInDiffReport(entries, DiffType.DELETE, file1.getName()));
+    assertTrue(entries.size() == 2);
+    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, "", null));
+    assertTrue(existsInDiffReport(entries, DiffType.RENAME, file1.getName(),
+        file2.getName()));
   }
 
   @Test (timeout=60000)
@@ -240,26 +240,26 @@ public class TestRenameWithSnapshots {
     diffReport = hdfs.getSnapshotDiffReport(sub1, snap1, snap2);
     LOG.info("DiffList is " + diffReport.toString());
     List<DiffReportEntry> entries = diffReport.getDiffList();
-    assertTrue(entries.size() == 3);
-    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, ""));
-    assertTrue(existsInDiffReport(entries, DiffType.CREATE, file2.getName()));
-    assertTrue(existsInDiffReport(entries, DiffType.DELETE, file1.getName()));
+    assertTrue(entries.size() == 2);
+    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, "", null));
+    assertTrue(existsInDiffReport(entries, DiffType.RENAME, file1.getName(),
+        file2.getName()));
     
     diffReport = hdfs.getSnapshotDiffReport(sub1, snap2, "");
     LOG.info("DiffList is " + diffReport.toString());
     entries = diffReport.getDiffList();
-    assertTrue(entries.size() == 3);
-    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, ""));
-    assertTrue(existsInDiffReport(entries, DiffType.CREATE, file3.getName()));
-    assertTrue(existsInDiffReport(entries, DiffType.DELETE, file2.getName()));
+    assertTrue(entries.size() == 2);
+    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, "", null));
+    assertTrue(existsInDiffReport(entries, DiffType.RENAME, file2.getName(),
+        file3.getName()));
     
     diffReport = hdfs.getSnapshotDiffReport(sub1, snap1, "");
     LOG.info("DiffList is " + diffReport.toString());
     entries = diffReport.getDiffList();
-    assertTrue(entries.size() == 3);
-    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, ""));
-    assertTrue(existsInDiffReport(entries, DiffType.CREATE, file3.getName()));
-    assertTrue(existsInDiffReport(entries, DiffType.DELETE, file1.getName()));
+    assertTrue(entries.size() == 2);
+    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, "", null));
+    assertTrue(existsInDiffReport(entries, DiffType.RENAME, file1.getName(),
+        file3.getName()));
   }
   
   @Test (timeout=60000)
@@ -282,11 +282,10 @@ public class TestRenameWithSnapshots {
         "");
     LOG.info("DiffList is \n\"" + diffReport.toString() + "\"");
     List<DiffReportEntry> entries = diffReport.getDiffList();
-    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, sub2.getName()));
-    assertTrue(existsInDiffReport(entries, DiffType.CREATE, sub2.getName()
-        + "/" + sub2file2.getName()));
-    assertTrue(existsInDiffReport(entries, DiffType.DELETE, sub2.getName()
-        + "/" + sub2file1.getName()));
+    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, sub2.getName(),
+        null));
+    assertTrue(existsInDiffReport(entries, DiffType.RENAME, sub2.getName()
+        + "/" + sub2file1.getName(), sub2.getName() + "/" + 
sub2file2.getName()));
   }
 
   @Test (timeout=60000)
@@ -309,10 +308,10 @@ public class TestRenameWithSnapshots {
         "");
     LOG.info("DiffList is \n\"" + diffReport.toString() + "\"");
     List<DiffReportEntry> entries = diffReport.getDiffList();
-    assertEquals(3, entries.size());
-    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, ""));
-    assertTrue(existsInDiffReport(entries, DiffType.CREATE, sub3.getName()));
-    assertTrue(existsInDiffReport(entries, DiffType.DELETE, sub2.getName()));
+    assertEquals(2, entries.size());
+    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, "", null));
+    assertTrue(existsInDiffReport(entries, DiffType.RENAME, sub2.getName(),
+        sub3.getName()));
   }
   
   /**
@@ -403,8 +402,7 @@ public class TestRenameWithSnapshots {
     final Path foo_s3 = SnapshotTestHelper.getSnapshotPath(sdir1, "s3",
         "foo");
     assertFalse(hdfs.exists(foo_s3));
-    INodeDirectorySnapshottable sdir2Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode(sdir2.toString());
+    INodeDirectory sdir2Node = fsdir.getINode(sdir2.toString()).asDirectory();
     Snapshot s2 = sdir2Node.getSnapshot(DFSUtil.string2Bytes("s2"));
     INodeFile sfoo = fsdir.getINode(newfoo.toString()).asFile();
     assertEquals(s2.getId(), sfoo.getDiffs().getLastSnapshotId());
@@ -607,8 +605,7 @@ public class TestRenameWithSnapshots {
     
     INodeFile snode = fsdir.getINode(newfoo.toString()).asFile();
     assertEquals(1, snode.getDiffs().asList().size());
-    INodeDirectorySnapshottable sdir2Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode(sdir2.toString());
+    INodeDirectory sdir2Node = fsdir.getINode(sdir2.toString()).asDirectory();
     Snapshot s2 = sdir2Node.getSnapshot(DFSUtil.string2Bytes("s2"));
     assertEquals(s2.getId(), snode.getDiffs().getLastSnapshotId());
     
@@ -763,8 +760,7 @@ public class TestRenameWithSnapshots {
     assertEquals(2, fooWithCount.getReferenceCount());
     INodeDirectory foo = fooWithCount.asDirectory();
     assertEquals(1, foo.getDiffs().asList().size());
-    INodeDirectorySnapshottable sdir1Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode(sdir1.toString());
+    INodeDirectory sdir1Node = fsdir.getINode(sdir1.toString()).asDirectory();
     Snapshot s1 = sdir1Node.getSnapshot(DFSUtil.string2Bytes("s1"));
     assertEquals(s1.getId(), foo.getDirectoryWithSnapshotFeature()
         .getLastSnapshotId());
@@ -973,12 +969,9 @@ public class TestRenameWithSnapshots {
     hdfs.rename(bar_dir2, bar_dir1);
     
     // check the internal details
-    INodeDirectorySnapshottable sdir1Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode(sdir1.toString());
-    INodeDirectorySnapshottable sdir2Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode(sdir2.toString());
-    INodeDirectorySnapshottable sdir3Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode(sdir3.toString());
+    INodeDirectory sdir1Node = fsdir.getINode(sdir1.toString()).asDirectory();
+    INodeDirectory sdir2Node = fsdir.getINode(sdir2.toString()).asDirectory();
+    INodeDirectory sdir3Node = fsdir.getINode(sdir3.toString()).asDirectory();
     
     INodeReference fooRef = fsdir.getINode4Write(foo_dir1.toString())
         .asReference();
@@ -1183,8 +1176,7 @@ public class TestRenameWithSnapshots {
     assertTrue(hdfs.exists(bar_s2));
     
     // check internal details
-    INodeDirectorySnapshottable sdir2Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode(sdir2.toString());
+    INodeDirectory sdir2Node = fsdir.getINode(sdir2.toString()).asDirectory();
     Snapshot s2 = sdir2Node.getSnapshot(DFSUtil.string2Bytes("s2"));
     final Path foo_s2 = SnapshotTestHelper.getSnapshotPath(sdir2, "s2", "foo");
     INodeReference fooRef = fsdir.getINode(foo_s2.toString()).asReference();
@@ -1291,8 +1283,8 @@ public class TestRenameWithSnapshots {
     assertFalse(result);
     
     // check the current internal details
-    INodeDirectorySnapshottable dir1Node = (INodeDirectorySnapshottable) fsdir
-        .getINode4Write(sdir1.toString());
+    INodeDirectory dir1Node = fsdir.getINode4Write(sdir1.toString())
+        .asDirectory();
     Snapshot s1 = dir1Node.getSnapshot(DFSUtil.string2Bytes("s1"));
     ReadOnlyList<INode> dir1Children = dir1Node
         .getChildrenList(Snapshot.CURRENT_STATE_ID);
@@ -1361,8 +1353,8 @@ public class TestRenameWithSnapshots {
     assertFalse(result);
     
     // check the current internal details
-    INodeDirectorySnapshottable dir1Node = (INodeDirectorySnapshottable) fsdir
-        .getINode4Write(sdir1.toString());
+    INodeDirectory dir1Node = fsdir.getINode4Write(sdir1.toString())
+        .asDirectory();
     Snapshot s1 = dir1Node.getSnapshot(DFSUtil.string2Bytes("s1"));
     ReadOnlyList<INode> dir1Children = dir1Node
         .getChildrenList(Snapshot.CURRENT_STATE_ID);
@@ -1428,11 +1420,11 @@ public class TestRenameWithSnapshots {
     assertFalse(result);
     
     // check the current internal details
-    INodeDirectorySnapshottable dir1Node = (INodeDirectorySnapshottable) fsdir
-        .getINode4Write(sdir1.toString());
+    INodeDirectory dir1Node = fsdir.getINode4Write(sdir1.toString())
+        .asDirectory();
     Snapshot s1 = dir1Node.getSnapshot(DFSUtil.string2Bytes("s1"));
-    INodeDirectorySnapshottable dir2Node = (INodeDirectorySnapshottable) fsdir
-        .getINode4Write(sdir2.toString());
+    INodeDirectory dir2Node = fsdir.getINode4Write(sdir2.toString())
+        .asDirectory();
     Snapshot s2 = dir2Node.getSnapshot(DFSUtil.string2Bytes("s2"));
     ReadOnlyList<INode> dir2Children = dir2Node
         .getChildrenList(Snapshot.CURRENT_STATE_ID);
@@ -1459,8 +1451,7 @@ public class TestRenameWithSnapshots {
     assertFalse(result);
 
     // check internal details again
-    dir2Node = (INodeDirectorySnapshottable) fsdir.getINode4Write(sdir2
-        .toString());
+    dir2Node = fsdir.getINode4Write(sdir2.toString()).asDirectory();
     Snapshot s3 = dir2Node.getSnapshot(DFSUtil.string2Bytes("s3"));
     fooNode = fsdir.getINode4Write(foo_dir2.toString());
     dir2Children = dir2Node.getChildrenList(Snapshot.CURRENT_STATE_ID);
@@ -1600,8 +1591,8 @@ public class TestRenameWithSnapshots {
     assertTrue(diff.getChildrenDiff().getList(ListType.DELETED).isEmpty());
     
     // check dir2
-    INode dir2Node = fsdir.getINode4Write(dir2.toString());
-    assertTrue(dir2Node.getClass() == INodeDirectorySnapshottable.class);
+    INodeDirectory dir2Node = 
fsdir.getINode4Write(dir2.toString()).asDirectory();
+    assertTrue(dir2Node.isSnapshottable());
     Quota.Counts counts = dir2Node.computeQuotaUsage();
     assertEquals(3, counts.get(Quota.NAMESPACE));
     assertEquals(0, counts.get(Quota.DISKSPACE));
@@ -1611,8 +1602,7 @@ public class TestRenameWithSnapshots {
     INode subdir2Node = childrenList.get(0);
     assertSame(dir2Node, subdir2Node.getParent());
     assertSame(subdir2Node, fsdir.getINode4Write(subdir2.toString()));
-    diffList = ((INodeDirectorySnapshottable) dir2Node)
-        .getDiffs().asList();
+    diffList = dir2Node.getDiffs().asList();
     assertEquals(1, diffList.size());
     diff = diffList.get(0);
     assertTrue(diff.getChildrenDiff().getList(ListType.CREATED).isEmpty());
@@ -1674,8 +1664,8 @@ public class TestRenameWithSnapshots {
     assertTrue(diff.getChildrenDiff().getList(ListType.DELETED).isEmpty());
     
     // check dir2
-    INode dir2Node = fsdir.getINode4Write(dir2.toString());
-    assertTrue(dir2Node.getClass() == INodeDirectorySnapshottable.class);
+    INodeDirectory dir2Node = 
fsdir.getINode4Write(dir2.toString()).asDirectory();
+    assertTrue(dir2Node.isSnapshottable());
     Quota.Counts counts = dir2Node.computeQuotaUsage();
     assertEquals(4, counts.get(Quota.NAMESPACE));
     assertEquals(0, counts.get(Quota.DISKSPACE));
@@ -1690,7 +1680,7 @@ public class TestRenameWithSnapshots {
     assertTrue(subsubdir2Node.getClass() == INodeDirectory.class);
     assertSame(subdir2Node, subsubdir2Node.getParent());
     
-    diffList = ((INodeDirectorySnapshottable) dir2Node).getDiffs().asList();
+    diffList = (  dir2Node).getDiffs().asList();
     assertEquals(1, diffList.size());
     diff = diffList.get(0);
     assertTrue(diff.getChildrenDiff().getList(ListType.CREATED).isEmpty());
@@ -1724,8 +1714,8 @@ public class TestRenameWithSnapshots {
     }
     
     // check
-    INodeDirectorySnapshottable rootNode = (INodeDirectorySnapshottable) fsdir
-        .getINode4Write(root.toString());
+    INodeDirectory rootNode = fsdir.getINode4Write(root.toString())
+        .asDirectory();
     INodeDirectory fooNode = 
fsdir.getINode4Write(foo.toString()).asDirectory();
     ReadOnlyList<INode> children = fooNode
         .getChildrenList(Snapshot.CURRENT_STATE_ID);
@@ -1795,7 +1785,7 @@ public class TestRenameWithSnapshots {
     
     // check dir2
     INode dir2Node = fsdir.getINode4Write(dir2.toString());
-    assertTrue(dir2Node.getClass() == INodeDirectorySnapshottable.class);
+    assertTrue(dir2Node.asDirectory().isSnapshottable());
     Quota.Counts counts = dir2Node.computeQuotaUsage();
     assertEquals(7, counts.get(Quota.NAMESPACE));
     assertEquals(BLOCKSIZE * REPL * 2, counts.get(Quota.DISKSPACE));
@@ -1829,7 +1819,7 @@ public class TestRenameWithSnapshots {
   }
   
   /**
-   * move a directory to its prior descedant
+   * move a directory to its prior descendant
    */
   @Test
   public void testRename2PreDescendant_2() throws Exception {
@@ -1962,12 +1952,12 @@ public class TestRenameWithSnapshots {
     hdfs.deleteSnapshot(sdir2, "s3");
     
     // check
-    final INodeDirectorySnapshottable dir1Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode4Write(sdir1.toString());
+    final INodeDirectory dir1Node = fsdir.getINode4Write(sdir1.toString())
+        .asDirectory();
     Quota.Counts q1 = 
dir1Node.getDirectoryWithQuotaFeature().getSpaceConsumed();  
     assertEquals(4, q1.get(Quota.NAMESPACE));
-    final INodeDirectorySnapshottable dir2Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode4Write(sdir2.toString());
+    final INodeDirectory dir2Node = fsdir.getINode4Write(sdir2.toString())
+        .asDirectory();
     Quota.Counts q2 = 
dir2Node.getDirectoryWithQuotaFeature().getSpaceConsumed();  
     assertEquals(2, q2.get(Quota.NAMESPACE));
     
@@ -2031,13 +2021,13 @@ public class TestRenameWithSnapshots {
     hdfs.deleteSnapshot(sdir2, "s3");
     
     // check
-    final INodeDirectorySnapshottable dir1Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode4Write(sdir1.toString());
+    final INodeDirectory dir1Node = fsdir.getINode4Write(sdir1.toString())
+        .asDirectory();
     // sdir1 + s1 + foo_s1 (foo) + foo (foo + s1 + bar~bar3)
     Quota.Counts q1 = 
dir1Node.getDirectoryWithQuotaFeature().getSpaceConsumed();  
     assertEquals(9, q1.get(Quota.NAMESPACE));
-    final INodeDirectorySnapshottable dir2Node = 
-        (INodeDirectorySnapshottable) fsdir.getINode4Write(sdir2.toString());
+    final INodeDirectory dir2Node = fsdir.getINode4Write(sdir2.toString())
+        .asDirectory();
     Quota.Counts q2 = 
dir2Node.getDirectoryWithQuotaFeature().getSpaceConsumed();  
     assertEquals(2, q2.get(Quota.NAMESPACE));
     
@@ -2253,8 +2243,8 @@ public class TestRenameWithSnapshots {
     List<DirectoryDiff> barDiffList = barNode.getDiffs().asList();
     assertEquals(1, barDiffList.size());
     DirectoryDiff diff = barDiffList.get(0);
-    INodeDirectorySnapshottable testNode = 
-        (INodeDirectorySnapshottable) fsdir.getINode4Write(test.toString());
+    INodeDirectory testNode = fsdir.getINode4Write(test.toString())
+        .asDirectory();
     Snapshot s0 = testNode.getSnapshot(DFSUtil.string2Bytes("s0"));
     assertEquals(s0.getId(), diff.getSnapshotId());
     // and file should be stored in the deleted list of this snapshot diff
@@ -2266,14 +2256,10 @@ public class TestRenameWithSnapshots {
     INodeDirectory dir2Node = fsdir.getINode4Write(dir2.toString())
         .asDirectory();
     List<DirectoryDiff> dir2DiffList = dir2Node.getDiffs().asList();
-    // dir2Node should contain 2 snapshot diffs, one for s2, and the other was
-    // originally s1 (created when dir2 was transformed to a snapshottable 
dir),
-    // and currently is s0
-    assertEquals(2, dir2DiffList.size());
-    dList = dir2DiffList.get(1).getChildrenDiff().getList(ListType.DELETED);
+    // dir2Node should contain 1 snapshot diffs for s2
+    assertEquals(1, dir2DiffList.size());
+    dList = dir2DiffList.get(0).getChildrenDiff().getList(ListType.DELETED);
     assertEquals(1, dList.size());
-    cList = dir2DiffList.get(0).getChildrenDiff().getList(ListType.CREATED);
-    assertTrue(cList.isEmpty());
     final Path foo_s2 = SnapshotTestHelper.getSnapshotPath(dir2, "s2", 
         foo.getName());
     INodeReference.WithName fooNode_s2 = 
@@ -2374,4 +2360,46 @@ public class TestRenameWithSnapshots {
     // save namespace and restart
     restartClusterAndCheckImage(true);
   }
+
+  @Test
+  public void testRenameWithOverWrite() throws Exception {
+    final Path root = new Path("/");
+    final Path foo = new Path(root, "foo");
+    final Path file1InFoo = new Path(foo, "file1");
+    final Path file2InFoo = new Path(foo, "file2");
+    final Path file3InFoo = new Path(foo, "file3");
+    DFSTestUtil.createFile(hdfs, file1InFoo, 1L, REPL, SEED);
+    DFSTestUtil.createFile(hdfs, file2InFoo, 1L, REPL, SEED);
+    DFSTestUtil.createFile(hdfs, file3InFoo, 1L, REPL, SEED);
+    final Path bar = new Path(root, "bar");
+    hdfs.mkdirs(bar);
+
+    SnapshotTestHelper.createSnapshot(hdfs, root, "s0");
+    // move file1 from foo to bar
+    final Path fileInBar = new Path(bar, "file1");
+    hdfs.rename(file1InFoo, fileInBar);
+    // rename bar to newDir
+    final Path newDir = new Path(root, "newDir");
+    hdfs.rename(bar, newDir);
+    // move file2 from foo to newDir
+    final Path file2InNewDir = new Path(newDir, "file2");
+    hdfs.rename(file2InFoo, file2InNewDir);
+    // move file3 from foo to newDir and rename it to file1, this will 
overwrite
+    // the original file1
+    final Path file1InNewDir = new Path(newDir, "file1");
+    hdfs.rename(file3InFoo, file1InNewDir, Rename.OVERWRITE);
+    SnapshotTestHelper.createSnapshot(hdfs, root, "s1");
+
+    SnapshotDiffReport report = hdfs.getSnapshotDiffReport(root, "s0", "s1");
+    LOG.info("DiffList is \n\"" + report.toString() + "\"");
+    List<DiffReportEntry> entries = report.getDiffList();
+    assertEquals(7, entries.size());
+    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, "", null));
+    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, foo.getName(), 
null));
+    assertTrue(existsInDiffReport(entries, DiffType.MODIFY, bar.getName(), 
null));
+    assertTrue(existsInDiffReport(entries, DiffType.DELETE, "foo/file1", 
null));
+    assertTrue(existsInDiffReport(entries, DiffType.RENAME, "bar", "newDir"));
+    assertTrue(existsInDiffReport(entries, DiffType.RENAME, "foo/file2", 
"newDir/file2"));
+    assertTrue(existsInDiffReport(entries, DiffType.RENAME, "foo/file3", 
"newDir/file1"));
+  }
 }

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSetQuotaWithSnapshot.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSetQuotaWithSnapshot.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSetQuotaWithSnapshot.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSetQuotaWithSnapshot.java
 Tue Aug 19 23:49:39 2014
@@ -112,23 +112,20 @@ public class TestSetQuotaWithSnapshot {
     hdfs.allowSnapshot(dir);
     hdfs.setQuota(dir, HdfsConstants.QUOTA_DONT_SET,
         HdfsConstants.QUOTA_DONT_SET);
-    INode dirNode = fsdir.getINode4Write(dir.toString());
-    assertTrue(dirNode instanceof INodeDirectorySnapshottable);
-    assertEquals(0, ((INodeDirectorySnapshottable) dirNode).getDiffs().asList()
-        .size());
+    INodeDirectory dirNode = 
fsdir.getINode4Write(dir.toString()).asDirectory();
+    assertTrue(dirNode.isSnapshottable());
+    assertEquals(0, dirNode.getDiffs().asList().size());
     
     hdfs.setQuota(dir, HdfsConstants.QUOTA_DONT_SET - 1,
         HdfsConstants.QUOTA_DONT_SET - 1);
-    dirNode = fsdir.getINode4Write(dir.toString());
-    assertTrue(dirNode instanceof INodeDirectorySnapshottable);
-    assertEquals(0, ((INodeDirectorySnapshottable) dirNode).getDiffs().asList()
-        .size());
+    dirNode = fsdir.getINode4Write(dir.toString()).asDirectory();
+    assertTrue(dirNode.isSnapshottable());
+    assertEquals(0, dirNode.getDiffs().asList().size());
     
     hdfs.setQuota(dir, HdfsConstants.QUOTA_RESET, HdfsConstants.QUOTA_RESET);
-    dirNode = fsdir.getINode4Write(dir.toString());
-    assertTrue(dirNode instanceof INodeDirectorySnapshottable);
-    assertEquals(0, ((INodeDirectorySnapshottable) dirNode).getDiffs().asList()
-        .size());
+    dirNode = fsdir.getINode4Write(dir.toString()).asDirectory();
+    assertTrue(dirNode.isSnapshottable());
+    assertEquals(0, dirNode.getDiffs().asList().size());
     
     // allow snapshot on dir and create snapshot s1
     SnapshotTestHelper.createSnapshot(hdfs, dir, "s1");
@@ -136,10 +133,9 @@ public class TestSetQuotaWithSnapshot {
     // clear quota of dir
     hdfs.setQuota(dir, HdfsConstants.QUOTA_RESET, HdfsConstants.QUOTA_RESET);
     // dir should still be a snapshottable directory
-    dirNode = fsdir.getINode4Write(dir.toString());
-    assertTrue(dirNode instanceof INodeDirectorySnapshottable);
-    assertEquals(1, ((INodeDirectorySnapshottable) dirNode).getDiffs().asList()
-        .size());
+    dirNode = fsdir.getINode4Write(dir.toString()).asDirectory();
+    assertTrue(dirNode.isSnapshottable());
+    assertEquals(1, dirNode.getDiffs().asList().size());
     SnapshottableDirectoryStatus[] status = hdfs.getSnapshottableDirListing();
     assertEquals(1, status.length);
     assertEquals(dir, status[0].getFullPath());
@@ -154,8 +150,7 @@ public class TestSetQuotaWithSnapshot {
     assertTrue(subNode.asDirectory().isWithSnapshot());
     List<DirectoryDiff> diffList = subNode.asDirectory().getDiffs().asList();
     assertEquals(1, diffList.size());
-    Snapshot s2 = ((INodeDirectorySnapshottable) dirNode).getSnapshot(DFSUtil
-        .string2Bytes("s2"));
+    Snapshot s2 = dirNode.getSnapshot(DFSUtil.string2Bytes("s2"));
     assertEquals(s2.getId(), diffList.get(0).getSnapshotId());
     List<INode> createdList = 
diffList.get(0).getChildrenDiff().getList(ListType.CREATED);
     assertEquals(1, createdList.size());

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java
 Tue Aug 19 23:49:39 2014
@@ -430,30 +430,31 @@ public class TestSnapshot {
         .asDirectory();
     assertTrue(rootNode.isSnapshottable());
     // root is snapshottable dir, but with 0 snapshot quota
-    assertEquals(0, ((INodeDirectorySnapshottable) 
rootNode).getSnapshotQuota());
+    assertEquals(0, rootNode.getDirectorySnapshottableFeature()
+        .getSnapshotQuota());
     
     hdfs.allowSnapshot(root);
     rootNode = fsdir.getINode4Write(root.toString()).asDirectory();
     assertTrue(rootNode.isSnapshottable());
-    assertEquals(INodeDirectorySnapshottable.SNAPSHOT_LIMIT,
-        ((INodeDirectorySnapshottable) rootNode).getSnapshotQuota());
+    assertEquals(DirectorySnapshottableFeature.SNAPSHOT_LIMIT,
+        rootNode.getDirectorySnapshottableFeature().getSnapshotQuota());
     // call allowSnapshot again
     hdfs.allowSnapshot(root);
     rootNode = fsdir.getINode4Write(root.toString()).asDirectory();
     assertTrue(rootNode.isSnapshottable());
-    assertEquals(INodeDirectorySnapshottable.SNAPSHOT_LIMIT,
-        ((INodeDirectorySnapshottable) rootNode).getSnapshotQuota());
+    assertEquals(DirectorySnapshottableFeature.SNAPSHOT_LIMIT,
+        rootNode.getDirectorySnapshottableFeature().getSnapshotQuota());
     
     // disallowSnapshot on dir
     hdfs.disallowSnapshot(root);
     rootNode = fsdir.getINode4Write(root.toString()).asDirectory();
     assertTrue(rootNode.isSnapshottable());
-    assertEquals(0, ((INodeDirectorySnapshottable) 
rootNode).getSnapshotQuota());
+    assertEquals(0, 
rootNode.getDirectorySnapshottableFeature().getSnapshotQuota());
     // do it again
     hdfs.disallowSnapshot(root);
     rootNode = fsdir.getINode4Write(root.toString()).asDirectory();
     assertTrue(rootNode.isSnapshottable());
-    assertEquals(0, ((INodeDirectorySnapshottable) 
rootNode).getSnapshotQuota());
+    assertEquals(0, 
rootNode.getDirectorySnapshottableFeature().getSnapshotQuota());
   }
 
   /**

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java
 Tue Aug 19 23:49:39 2014
@@ -28,12 +28,14 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
@@ -396,4 +398,39 @@ public class TestSnapshotBlocksMap {
     assertEquals(1, blks.length);
     assertEquals(BLOCKSIZE, blks[0].getNumBytes());
   }
+  
+  /**
+   * Make sure that a delete of a non-zero-length file which results in a
+   * zero-length file in a snapshot works.
+   */
+  @Test
+  public void testDeletionOfLaterBlocksWithZeroSizeFirstBlock() throws 
Exception {
+    final Path foo = new Path("/foo");
+    final Path bar = new Path(foo, "bar");
+    final byte[] testData = "foo bar baz".getBytes();
+    
+    // Create a zero-length file.
+    DFSTestUtil.createFile(hdfs, bar, 0, REPLICATION, 0L);
+    assertEquals(0, 
fsdir.getINode4Write(bar.toString()).asFile().getBlocks().length);
+
+    // Create a snapshot that includes that file.
+    SnapshotTestHelper.createSnapshot(hdfs, foo, "s0");
+    
+    // Extend that file.
+    FSDataOutputStream out = hdfs.append(bar);
+    out.write(testData);
+    out.close();
+    INodeFile barNode = fsdir.getINode4Write(bar.toString()).asFile();
+    BlockInfo[] blks = barNode.getBlocks();
+    assertEquals(1, blks.length);
+    assertEquals(testData.length, blks[0].getNumBytes());
+    
+    // Delete the file.
+    hdfs.delete(bar, true);
+    
+    // Now make sure that the NN can still save an fsimage successfully.
+    cluster.getNameNode().getRpcServer().setSafeMode(
+        SafeModeAction.SAFEMODE_ENTER, false);
+    cluster.getNameNode().getRpcServer().saveNamespace();
+  }
 }

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java
 Tue Aug 19 23:49:39 2014
@@ -281,10 +281,10 @@ public class TestSnapshotDeletion {
     checkQuotaUsageComputation(dir, 14L, BLOCKSIZE * REPLICATION * 4);
     
     // get two snapshots for later use
-    Snapshot snapshot0 = ((INodeDirectorySnapshottable) fsdir.getINode(dir
-        .toString())).getSnapshot(DFSUtil.string2Bytes("s0"));
-    Snapshot snapshot1 = ((INodeDirectorySnapshottable) fsdir.getINode(dir
-        .toString())).getSnapshot(DFSUtil.string2Bytes("s1"));
+    Snapshot snapshot0 = fsdir.getINode(dir.toString()).asDirectory()
+        .getSnapshot(DFSUtil.string2Bytes("s0"));
+    Snapshot snapshot1 = fsdir.getINode(dir.toString()).asDirectory()
+        .getSnapshot(DFSUtil.string2Bytes("s1"));
     
     // Case 2 + Case 3: delete noChangeDirParent, noChangeFile, and
     // metaChangeFile2. Note that when we directly delete a directory, the 
@@ -509,8 +509,7 @@ public class TestSnapshotDeletion {
     }
     
     // check 1. there is no snapshot s0
-    final INodeDirectorySnapshottable dirNode = 
-        (INodeDirectorySnapshottable) fsdir.getINode(dir.toString());
+    final INodeDirectory dirNode = 
fsdir.getINode(dir.toString()).asDirectory();
     Snapshot snapshot0 = dirNode.getSnapshot(DFSUtil.string2Bytes("s0"));
     assertNull(snapshot0);
     Snapshot snapshot1 = dirNode.getSnapshot(DFSUtil.string2Bytes("s1"));

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDiffReport.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDiffReport.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDiffReport.java
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDiffReport.java
 Tue Aug 19 23:49:39 2014
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.HashMap;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -143,7 +144,7 @@ public class TestSnapshotDiffReport {
       hdfs.createSnapshot(snapshotDir, genSnapshotName(snapshotDir));
     }
     // modify file10
-    hdfs.setReplication(file10, (short) (REPLICATION - 1));
+    hdfs.setReplication(file10, (short) (REPLICATION + 1));
   }
   
   /** check the correctness of the diff reports */
@@ -166,11 +167,11 @@ public class TestSnapshotDiffReport {
       } else if (entry.getType() == DiffType.DELETE) {
         assertTrue(report.getDiffList().contains(entry));
         assertTrue(inverseReport.getDiffList().contains(
-            new DiffReportEntry(DiffType.CREATE, entry.getRelativePath())));
+            new DiffReportEntry(DiffType.CREATE, entry.getSourcePath())));
       } else if (entry.getType() == DiffType.CREATE) {
         assertTrue(report.getDiffList().contains(entry));
         assertTrue(inverseReport.getDiffList().contains(
-            new DiffReportEntry(DiffType.DELETE, entry.getRelativePath())));
+            new DiffReportEntry(DiffType.DELETE, entry.getSourcePath())));
       }
     }
   }
@@ -329,5 +330,166 @@ public class TestSnapshotDiffReport {
         new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
         new DiffReportEntry(DiffType.DELETE, DFSUtil.string2Bytes("subsub1")));
   }
-  
+
+  /**
+   * Rename a directory to its prior descendant, and verify the diff report.
+   */
+  @Test
+  public void testDiffReportWithRename() throws Exception {
+    final Path root = new Path("/");
+    final Path sdir1 = new Path(root, "dir1");
+    final Path sdir2 = new Path(root, "dir2");
+    final Path foo = new Path(sdir1, "foo");
+    final Path bar = new Path(foo, "bar");
+    hdfs.mkdirs(bar);
+    hdfs.mkdirs(sdir2);
+
+    // create snapshot on root
+    SnapshotTestHelper.createSnapshot(hdfs, root, "s1");
+
+    // /dir1/foo/bar -> /dir2/bar
+    final Path bar2 = new Path(sdir2, "bar");
+    hdfs.rename(bar, bar2);
+
+    // /dir1/foo -> /dir2/bar/foo
+    final Path foo2 = new Path(bar2, "foo");
+    hdfs.rename(foo, foo2);
+
+    SnapshotTestHelper.createSnapshot(hdfs, root, "s2");
+    // let's delete /dir2 to make things more complicated
+    hdfs.delete(sdir2, true);
+
+    verifyDiffReport(root, "s1", "s2",
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("dir1")),
+        new DiffReportEntry(DiffType.RENAME, DFSUtil.string2Bytes("dir1/foo"),
+            DFSUtil.string2Bytes("dir2/bar/foo")),
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("dir2")),
+        new DiffReportEntry(DiffType.MODIFY,
+            DFSUtil.string2Bytes("dir1/foo/bar")),
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("dir1/foo")),
+        new DiffReportEntry(DiffType.RENAME, DFSUtil
+            .string2Bytes("dir1/foo/bar"), DFSUtil.string2Bytes("dir2/bar")));
+  }
+
+  /**
+   * Rename a file/dir outside of the snapshottable dir should be reported as
+   * deleted. Rename a file/dir from outside should be reported as created.
+   */
+  @Test
+  public void testDiffReportWithRenameOutside() throws Exception {
+    final Path root = new Path("/");
+    final Path dir1 = new Path(root, "dir1");
+    final Path dir2 = new Path(root, "dir2");
+    final Path foo = new Path(dir1, "foo");
+    final Path fileInFoo = new Path(foo, "file");
+    final Path bar = new Path(dir2, "bar");
+    final Path fileInBar = new Path(bar, "file");
+    DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, seed);
+    DFSTestUtil.createFile(hdfs, fileInBar, BLOCKSIZE, REPLICATION, seed);
+
+    // create snapshot on /dir1
+    SnapshotTestHelper.createSnapshot(hdfs, dir1, "s0");
+
+    // move bar into dir1
+    final Path newBar = new Path(dir1, "newBar");
+    hdfs.rename(bar, newBar);
+    // move foo out of dir1 into dir2
+    final Path newFoo = new Path(dir2, "new");
+    hdfs.rename(foo, newFoo);
+
+    SnapshotTestHelper.createSnapshot(hdfs, dir1, "s1");
+    verifyDiffReport(dir1, "s0", "s1",
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
+        new DiffReportEntry(DiffType.CREATE, DFSUtil.string2Bytes(newBar
+            .getName())),
+        new DiffReportEntry(DiffType.DELETE,
+            DFSUtil.string2Bytes(foo.getName())));
+  }
+
+  /**
+   * Renaming a file/dir then delete the ancestor dir of the rename target
+   * should be reported as deleted.
+   */
+  @Test
+  public void testDiffReportWithRenameAndDelete() throws Exception {
+    final Path root = new Path("/");
+    final Path dir1 = new Path(root, "dir1");
+    final Path dir2 = new Path(root, "dir2");
+    final Path foo = new Path(dir1, "foo");
+    final Path fileInFoo = new Path(foo, "file");
+    final Path bar = new Path(dir2, "bar");
+    final Path fileInBar = new Path(bar, "file");
+    DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, seed);
+    DFSTestUtil.createFile(hdfs, fileInBar, BLOCKSIZE, REPLICATION, seed);
+
+    SnapshotTestHelper.createSnapshot(hdfs, root, "s0");
+    hdfs.rename(fileInFoo, fileInBar, Rename.OVERWRITE);
+    SnapshotTestHelper.createSnapshot(hdfs, root, "s1");
+    verifyDiffReport(root, "s0", "s1",
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("dir1/foo")),
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("dir2/bar")),
+        new DiffReportEntry(DiffType.DELETE, DFSUtil
+            .string2Bytes("dir2/bar/file")),
+        new DiffReportEntry(DiffType.RENAME,
+            DFSUtil.string2Bytes("dir1/foo/file"),
+            DFSUtil.string2Bytes("dir2/bar/file")));
+
+    // delete bar
+    hdfs.delete(bar, true);
+    SnapshotTestHelper.createSnapshot(hdfs, root, "s2");
+    verifyDiffReport(root, "s0", "s2",
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("dir1/foo")),
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("dir2")),
+        new DiffReportEntry(DiffType.DELETE, DFSUtil.string2Bytes("dir2/bar")),
+        new DiffReportEntry(DiffType.DELETE,
+            DFSUtil.string2Bytes("dir1/foo/file")));
+  }
+
+  @Test
+  public void testDiffReportWithRenameToNewDir() throws Exception {
+    final Path root = new Path("/");
+    final Path foo = new Path(root, "foo");
+    final Path fileInFoo = new Path(foo, "file");
+    DFSTestUtil.createFile(hdfs, fileInFoo, BLOCKSIZE, REPLICATION, seed);
+
+    SnapshotTestHelper.createSnapshot(hdfs, root, "s0");
+    final Path bar = new Path(root, "bar");
+    hdfs.mkdirs(bar);
+    final Path fileInBar = new Path(bar, "file");
+    hdfs.rename(fileInFoo, fileInBar);
+    SnapshotTestHelper.createSnapshot(hdfs, root, "s1");
+
+    verifyDiffReport(root, "s0", "s1",
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("foo")),
+        new DiffReportEntry(DiffType.CREATE, DFSUtil.string2Bytes("bar")),
+        new DiffReportEntry(DiffType.RENAME, DFSUtil.string2Bytes("foo/file"),
+            DFSUtil.string2Bytes("bar/file")));
+  }
+
+  /**
+   * Rename a file and then append some data to it
+   */
+  @Test
+  public void testDiffReportWithRenameAndAppend() throws Exception {
+    final Path root = new Path("/");
+    final Path foo = new Path(root, "foo");
+    DFSTestUtil.createFile(hdfs, foo, BLOCKSIZE, REPLICATION, seed);
+
+    SnapshotTestHelper.createSnapshot(hdfs, root, "s0");
+    final Path bar = new Path(root, "bar");
+    hdfs.rename(foo, bar);
+    DFSTestUtil.appendFile(hdfs, bar, 10); // append 10 bytes
+    SnapshotTestHelper.createSnapshot(hdfs, root, "s1");
+
+    // we always put modification on the file before rename
+    verifyDiffReport(root, "s0", "s1",
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("")),
+        new DiffReportEntry(DiffType.MODIFY, DFSUtil.string2Bytes("foo")),
+        new DiffReportEntry(DiffType.RENAME, DFSUtil.string2Bytes("foo"),
+            DFSUtil.string2Bytes("bar")));
+  }
 }
\ No newline at end of file


Reply via email to