HDFS-7609. Avoid retry cache collision when Standby NameNode loading edits. 
Contributed by Ming Ma.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/58b91c94
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/58b91c94
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/58b91c94

Branch: refs/heads/YARN-2928
Commit: 58b91c94086f7e2cacd07087c9e7a0d28b2355d3
Parents: ab02e31
Author: Jing Zhao <ji...@apache.org>
Authored: Fri May 29 11:05:13 2015 -0700
Committer: Zhijie Shen <zjs...@apache.org>
Committed: Tue Jun 2 16:12:57 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 ++
 .../hdfs/server/namenode/FSNamesystem.java      | 20 -----------
 .../hdfs/server/namenode/NameNodeRpcServer.java | 22 ++++++++++--
 .../namenode/ha/TestRetryCacheWithHA.java       | 37 ++++++++++++++++++--
 4 files changed, 58 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/58b91c94/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 23699a3..cc8235c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -822,6 +822,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-7401. Add block info to DFSInputStream' WARN message when it adds
     node to deadNodes (Arshad Mohammad via vinayakumarb)
 
+    HDFS-7609. Avoid retry cache collision when Standby NameNode loading edits.
+    (Ming Ma via jing9)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58b91c94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 798f8d5..5ed069d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1887,7 +1887,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
    */
   void concat(String target, String [] srcs, boolean logRetryCache)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
     waitForLoadingFSImage();
     HdfsFileStatus stat = null;
     boolean success = false;
@@ -2162,7 +2161,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     }
     waitForLoadingFSImage();
     HdfsFileStatus auditStat = null;
-    checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -2379,7 +2377,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       throw new InvalidPathException(src);
     }
     blockManager.verifyReplication(src, replication, clientMachine);
-    checkOperation(OperationCategory.WRITE);
     if (blockSize < minBlockSize) {
       throw new IOException("Specified block size is less than configured" +
           " minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY
@@ -2786,7 +2783,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     LocatedBlock lb = null;
     HdfsFileStatus stat = null;
     FSPermissionChecker pc = getPermissionChecker();
-    checkOperation(OperationCategory.WRITE);
     byte[][] pathComponents = 
FSDirectory.getPathComponentsForReservedPath(src);
     writeLock();
     try {
@@ -3081,7 +3077,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
   boolean renameTo(String src, String dst, boolean logRetryCache)
       throws IOException {
     waitForLoadingFSImage();
-    checkOperation(OperationCategory.WRITE);
     FSDirRenameOp.RenameOldResult ret = null;
     writeLock();
     try {
@@ -3107,7 +3102,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
                 boolean logRetryCache, Options.Rename... options)
       throws IOException {
     waitForLoadingFSImage();
-    checkOperation(OperationCategory.WRITE);
     Map.Entry<BlocksMapUpdateInfo, HdfsFileStatus> res = null;
     writeLock();
     try {
@@ -3144,7 +3138,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
   boolean delete(String src, boolean recursive, boolean logRetryCache)
       throws IOException {
     waitForLoadingFSImage();
-    checkOperation(OperationCategory.WRITE);
     BlocksMapUpdateInfo toRemovedBlocks = null;
     writeLock();
     boolean ret = false;
@@ -5762,8 +5755,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock,
       DatanodeID[] newNodes, String[] newStorageIDs, boolean logRetryCache)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
     LOG.info("updatePipeline(" + oldBlock.getLocalBlock()
              + ", newGS=" + newBlock.getGenerationStamp()
              + ", newLength=" + newBlock.getNumBytes()
@@ -6744,7 +6735,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
   void renameSnapshot(
       String path, String snapshotOldName, String snapshotNewName,
       boolean logRetryCache) throws IOException {
-    checkOperation(OperationCategory.WRITE);
     boolean success = false;
     writeLock();
     try {
@@ -6832,7 +6822,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
    */
   void deleteSnapshot(String snapshotRoot, String snapshotName,
       boolean logRetryCache) throws IOException {
-    checkOperation(OperationCategory.WRITE);
     boolean success = false;
     writeLock();
     BlocksMapUpdateInfo blocksToBeDeleted = null;
@@ -7057,7 +7046,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
   long addCacheDirective(CacheDirectiveInfo directive,
                          EnumSet<CacheFlag> flags, boolean logRetryCache)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
     CacheDirectiveInfo effectiveDirective = null;
     if (!flags.contains(CacheFlag.FORCE)) {
       cacheManager.waitForRescanIfNeeded();
@@ -7085,7 +7073,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
 
   void modifyCacheDirective(CacheDirectiveInfo directive,
       EnumSet<CacheFlag> flags, boolean logRetryCache) throws IOException {
-    checkOperation(OperationCategory.WRITE);
     boolean success = false;
     if (!flags.contains(CacheFlag.FORCE)) {
       cacheManager.waitForRescanIfNeeded();
@@ -7109,7 +7096,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
   }
 
   void removeCacheDirective(long id, boolean logRetryCache) throws IOException 
{
-    checkOperation(OperationCategory.WRITE);
     boolean success = false;
     writeLock();
     try {
@@ -7148,7 +7134,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
 
   void addCachePool(CachePoolInfo req, boolean logRetryCache)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
     writeLock();
     boolean success = false;
     String poolInfoStr = null;
@@ -7170,7 +7155,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
 
   void modifyCachePool(CachePoolInfo req, boolean logRetryCache)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
     writeLock();
     boolean success = false;
     try {
@@ -7192,7 +7176,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
 
   void removeCachePool(String cachePoolName, boolean logRetryCache)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
     writeLock();
     boolean success = false;
     try {
@@ -7387,7 +7370,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     String src = srcArg;
     HdfsFileStatus resultingStat = null;
     checkSuperuserPrivilege();
-    checkOperation(OperationCategory.WRITE);
     final byte[][] pathComponents =
       FSDirectory.getPathComponentsForReservedPath(src);
     FSPermissionChecker pc = getPermissionChecker();
@@ -7473,7 +7455,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
   void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
                 boolean logRetryCache)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
     HdfsFileStatus auditStat = null;
     writeLock();
     try {
@@ -7521,7 +7502,6 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
 
   void removeXAttr(String src, XAttr xAttr, boolean logRetryCache)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
     HdfsFileStatus auditStat = null;
     writeLock();
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58b91c94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 0d416a6..dafa23e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -616,7 +616,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
       throw new IOException("create: Pathname too long.  Limit "
           + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
     }
-
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntryWithPayload cacheEntry = 
RetryCache.waitForCompletion(retryCache, null);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return (HdfsFileStatus) cacheEntry.getPayload();
@@ -647,6 +647,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
       stateChangeLog.debug("*DIR* NameNode.append: file "
           +src+" for "+clientName+" at "+clientMachine);
     }
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
         null);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
@@ -794,6 +795,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
       ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
       throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
@@ -838,7 +840,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
       throw new IOException("rename: Pathname too long.  Limit "
           + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
     }
-
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return true; // Return previous response
@@ -859,6 +861,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   @Override // ClientProtocol
   public void concat(String trg, String[] src) throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
@@ -884,6 +887,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
       throw new IOException("rename: Pathname too long.  Limit "
           + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
     }
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
@@ -922,6 +926,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
       stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
           + ", recursive=" + recursive);
     }
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return true; // Return previous response
@@ -1207,6 +1212,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public void createSymlink(String target, String link, FsPermission dirPerms,
       boolean createParent) throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
@@ -1537,6 +1543,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
       throw new IOException("createSnapshot: Pathname too long.  Limit "
           + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
     }
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
       null);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
@@ -1558,6 +1565,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public void deleteSnapshot(String snapshotRoot, String snapshotName)
       throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     metrics.incrDeleteSnapshotOps();
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
@@ -1596,6 +1604,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
     if (snapshotNewName == null || snapshotNewName.isEmpty()) {
       throw new IOException("The new snapshot name is null or empty.");
     }
+    namesystem.checkOperation(OperationCategory.WRITE);
     metrics.incrRenameSnapshotOps();
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
@@ -1635,6 +1644,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public long addCacheDirective(
       CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion
       (retryCache, null);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
@@ -1656,6 +1666,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public void modifyCacheDirective(
       CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws 
IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
@@ -1673,6 +1684,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   @Override // ClientProtocol
   public void removeCacheDirective(long id) throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
@@ -1699,6 +1711,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   @Override //ClientProtocol
   public void addCachePool(CachePoolInfo info) throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
@@ -1715,6 +1728,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   @Override // ClientProtocol
   public void modifyCachePool(CachePoolInfo info) throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
@@ -1731,6 +1745,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   @Override // ClientProtocol
   public void removeCachePool(String cachePoolName) throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
@@ -1793,6 +1808,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public void createEncryptionZone(String src, String keyName)
     throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
@@ -1824,6 +1840,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
       throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
@@ -1853,6 +1870,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   @Override // ClientProtocol
   public void removeXAttr(String src, XAttr xAttr) throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58b91c94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
index e3572ab..d202fb7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
@@ -215,7 +215,8 @@ public class TestRetryCacheWithHA {
   abstract class AtMostOnceOp {
     private final String name;
     final DFSClient client;
-    
+    int expectedUpdateCount = 0;
+
     AtMostOnceOp(String name, DFSClient client) {
       this.name = name;
       this.client = client;
@@ -225,6 +226,9 @@ public class TestRetryCacheWithHA {
     abstract void invoke() throws Exception;
     abstract boolean checkNamenodeBeforeReturn() throws Exception;
     abstract Object getResult();
+    int getExpectedCacheUpdateCount() {
+      return expectedUpdateCount;
+    }
   }
   
   /** createSnapshot operaiton */
@@ -604,7 +608,7 @@ public class TestRetryCacheWithHA {
   class DeleteOp extends AtMostOnceOp {
     private final String target;
     private boolean deleted;
-    
+
     DeleteOp(DFSClient client, String target) {
       super("delete", client);
       this.target = target;
@@ -614,12 +618,14 @@ public class TestRetryCacheWithHA {
     void prepare() throws Exception {
       Path p = new Path(target);
       if (!dfs.exists(p)) {
+        expectedUpdateCount++;
         DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
       }
     }
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       deleted = client.delete(target, true);
     }
 
@@ -655,12 +661,14 @@ public class TestRetryCacheWithHA {
     void prepare() throws Exception {
       Path p = new Path(target);
       if (!dfs.exists(p)) {
+        expectedUpdateCount++;
         DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
       }
     }
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       client.createSymlink(target, link, false);
     }
 
@@ -773,11 +781,13 @@ public class TestRetryCacheWithHA {
 
     @Override
     void prepare() throws Exception {
+      expectedUpdateCount++;
       dfs.addCachePool(new CachePoolInfo(directive.getPool()));
     }
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       result = client.addCacheDirective(directive, 
EnumSet.of(CacheFlag.FORCE));
     }
 
@@ -819,12 +829,15 @@ public class TestRetryCacheWithHA {
 
     @Override
     void prepare() throws Exception {
+      expectedUpdateCount++;
       dfs.addCachePool(new CachePoolInfo(directive.getPool()));
+      expectedUpdateCount++;
       id = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
     }
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       client.modifyCacheDirective(
           new CacheDirectiveInfo.Builder().
               setId(id).
@@ -875,12 +888,15 @@ public class TestRetryCacheWithHA {
 
     @Override
     void prepare() throws Exception {
+      expectedUpdateCount++;
       dfs.addCachePool(new CachePoolInfo(directive.getPool()));
+      expectedUpdateCount++;
       id = dfs.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
     }
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       client.removeCacheDirective(id);
     }
 
@@ -922,6 +938,7 @@ public class TestRetryCacheWithHA {
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       client.addCachePool(new CachePoolInfo(pool));
     }
 
@@ -954,11 +971,13 @@ public class TestRetryCacheWithHA {
 
     @Override
     void prepare() throws Exception {
+      expectedUpdateCount++;
       client.addCachePool(new CachePoolInfo(pool).setLimit(10l));
     }
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       client.modifyCachePool(new CachePoolInfo(pool).setLimit(99l));
     }
 
@@ -991,11 +1010,13 @@ public class TestRetryCacheWithHA {
 
     @Override
     void prepare() throws Exception {
+      expectedUpdateCount++;
       client.addCachePool(new CachePoolInfo(pool));
     }
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       client.removeCachePool(pool);
     }
 
@@ -1030,12 +1051,14 @@ public class TestRetryCacheWithHA {
     void prepare() throws Exception {
       Path p = new Path(src);
       if (!dfs.exists(p)) {
+        expectedUpdateCount++;
         DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
       }
     }
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       client.setXAttr(src, "user.key", "value".getBytes(),
           EnumSet.of(XAttrSetFlag.CREATE));
     }
@@ -1072,7 +1095,9 @@ public class TestRetryCacheWithHA {
     void prepare() throws Exception {
       Path p = new Path(src);
       if (!dfs.exists(p)) {
+        expectedUpdateCount++;
         DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
+        expectedUpdateCount++;
         client.setXAttr(src, "user.key", "value".getBytes(),
           EnumSet.of(XAttrSetFlag.CREATE));
       }
@@ -1080,6 +1105,7 @@ public class TestRetryCacheWithHA {
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       client.removeXAttr(src, "user.key");
     }
 
@@ -1316,6 +1342,13 @@ public class TestRetryCacheWithHA {
     assertTrue("CacheUpdated on NN0: " + updatedNN0, updatedNN0 > 0);
     // Cache updated metrics on NN0 should be >0 since NN1 applied the editlog
     assertTrue("CacheUpdated on NN1: " + updatedNN1, updatedNN1 > 0);
+    long expectedUpdateCount = op.getExpectedCacheUpdateCount();
+    if (expectedUpdateCount > 0) {
+      assertEquals("CacheUpdated on NN0: " + updatedNN0, expectedUpdateCount,
+          updatedNN0);
+      assertEquals("CacheUpdated on NN0: " + updatedNN1, expectedUpdateCount,
+          updatedNN1);
+    }
   }
 
   /**

Reply via email to