HDFS-7609. Avoid retry cache collision when Standby NameNode loading edits. Contributed by Ming Ma.
(cherry picked from commit 7817674a3a4d097b647dd77f1345787dd376d5ea) (cherry picked from commit 17fb442a4c4e43105374c97fccd68dd966729a19) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6c127b44 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6c127b44 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6c127b44 Branch: refs/heads/branch-2.7 Commit: 6c127b44ca1aede95e29b81cf1c0479ae84a8c71 Parents: 188096f Author: Jing Zhao <ji...@apache.org> Authored: Fri May 29 11:05:13 2015 -0700 Committer: Vinod Kumar Vavilapalli (I am also known as @tshooter.) <vino...@apache.org> Committed: Thu Sep 10 11:56:28 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++ .../hdfs/server/namenode/FSNamesystem.java | 19 ---------- .../hdfs/server/namenode/NameNodeRpcServer.java | 22 ++++++++++-- .../namenode/ha/TestRetryCacheWithHA.java | 37 ++++++++++++++++++-- 4 files changed, 58 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c127b44/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index cd5a5ea..d22f592 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -53,6 +53,9 @@ Release 2.7.2 - UNRELEASED HDFS-8431. hdfs crypto class not found in Windows. (Anu Engineer via cnauroth) + HDFS-7609. Avoid retry cache collision when Standby NameNode loading edits. + (Ming Ma via jing9) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c127b44/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 2dda12b..6998e56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1874,7 +1874,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, */ void concat(String target, String [] srcs, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); waitForLoadingFSImage(); HdfsFileStatus stat = null; boolean success = false; @@ -2376,7 +2375,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, boolean skipSync = false; HdfsFileStatus stat = null; FSPermissionChecker pc = getPermissionChecker(); - checkOperation(OperationCategory.WRITE); if (blockSize < minBlockSize) { throw new IOException("Specified block size is less than configured" + " minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY @@ -2974,7 +2972,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, LocatedBlock lb = null; HdfsFileStatus stat = null; FSPermissionChecker pc = getPermissionChecker(); - checkOperation(OperationCategory.WRITE); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); writeLock(); try { @@ -3645,7 +3642,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, boolean renameTo(String src, String dst, boolean logRetryCache) throws IOException { waitForLoadingFSImage(); - checkOperation(OperationCategory.WRITE); FSDirRenameOp.RenameOldResult ret = null; writeLock(); try { @@ -3671,7 +3667,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, boolean logRetryCache, Options.Rename... options) throws IOException { waitForLoadingFSImage(); - checkOperation(OperationCategory.WRITE); Map.Entry<BlocksMapUpdateInfo, HdfsFileStatus> res = null; writeLock(); try { @@ -3708,7 +3703,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, boolean delete(String src, boolean recursive, boolean logRetryCache) throws IOException { waitForLoadingFSImage(); - checkOperation(OperationCategory.WRITE); BlocksMapUpdateInfo toRemovedBlocks = null; writeLock(); boolean ret = false; @@ -6332,8 +6326,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); - LOG.info("updatePipeline(" + oldBlock.getLocalBlock() + ", newGS=" + newBlock.getGenerationStamp() + ", newLength=" + newBlock.getNumBytes() @@ -7370,7 +7362,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, void renameSnapshot( String path, String snapshotOldName, String snapshotNewName, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); boolean success = false; writeLock(); try { @@ -7454,7 +7445,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, */ void deleteSnapshot(String snapshotRoot, String snapshotName, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); boolean success = false; writeLock(); BlocksMapUpdateInfo blocksToBeDeleted = null; @@ -7682,7 +7672,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, long addCacheDirective(CacheDirectiveInfo directive, EnumSet<CacheFlag> flags, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); CacheDirectiveInfo effectiveDirective = null; if (!flags.contains(CacheFlag.FORCE)) { cacheManager.waitForRescanIfNeeded(); @@ -7713,7 +7702,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, void modifyCacheDirective(CacheDirectiveInfo directive, EnumSet<CacheFlag> flags, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); boolean success = false; if (!flags.contains(CacheFlag.FORCE)) { cacheManager.waitForRescanIfNeeded(); @@ -7740,7 +7728,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } void removeCacheDirective(long id, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); boolean success = false; writeLock(); try { @@ -7782,7 +7769,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, void addCachePool(CachePoolInfo req, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); writeLock(); boolean success = false; String poolInfoStr = null; @@ -7806,7 +7792,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, void modifyCachePool(CachePoolInfo req, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); writeLock(); boolean success = false; try { @@ -7830,7 +7815,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, void removeCachePool(String cachePoolName, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); writeLock(); boolean success = false; try { @@ -8028,7 +8012,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, String src = srcArg; HdfsFileStatus resultingStat = null; checkSuperuserPrivilege(); - checkOperation(OperationCategory.WRITE); final byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); FSPermissionChecker pc = getPermissionChecker(); @@ -8114,7 +8097,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); HdfsFileStatus auditStat = null; writeLock(); try { @@ -8162,7 +8144,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, void removeXAttr(String src, XAttr xAttr, boolean logRetryCache) throws IOException { - checkOperation(OperationCategory.WRITE); HdfsFileStatus auditStat = null; writeLock(); try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c127b44/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index c057f3e..dfd51f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -610,7 +610,7 @@ class NameNodeRpcServer implements NamenodeProtocols { throw new IOException("create: Pathname too long. Limit " + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } - + namesystem.checkOperation(OperationCategory.WRITE); CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null); if (cacheEntry != null && cacheEntry.isSuccess()) { return (HdfsFileStatus) cacheEntry.getPayload(); @@ -641,6 +641,7 @@ class NameNodeRpcServer implements NamenodeProtocols { stateChangeLog.debug("*DIR* NameNode.append: file " +src+" for "+clientName+" at "+clientMachine); } + namesystem.checkOperation(OperationCategory.WRITE); CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null); if (cacheEntry != null && cacheEntry.isSuccess()) { @@ -810,6 +811,7 @@ class NameNodeRpcServer implements NamenodeProtocols { ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response @@ -854,7 +856,7 @@ class NameNodeRpcServer implements NamenodeProtocols { throw new IOException("rename: Pathname too long. Limit " + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } - + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return true; // Return previous response @@ -875,6 +877,7 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public void concat(String trg, String[] src) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response @@ -900,6 +903,7 @@ class NameNodeRpcServer implements NamenodeProtocols { throw new IOException("rename: Pathname too long. Limit " + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response @@ -937,6 +941,7 @@ class NameNodeRpcServer implements NamenodeProtocols { stateChangeLog.debug("*DIR* Namenode.delete: src=" + src + ", recursive=" + recursive); } + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return true; // Return previous response @@ -1221,6 +1226,7 @@ class NameNodeRpcServer implements NamenodeProtocols { public void createSymlink(String target, String link, FsPermission dirPerms, boolean createParent) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response @@ -1552,6 +1558,7 @@ class NameNodeRpcServer implements NamenodeProtocols { throw new IOException("createSnapshot: Pathname too long. Limit " + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } + namesystem.checkOperation(OperationCategory.WRITE); CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null); if (cacheEntry != null && cacheEntry.isSuccess()) { @@ -1573,6 +1580,7 @@ class NameNodeRpcServer implements NamenodeProtocols { public void deleteSnapshot(String snapshotRoot, String snapshotName) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); metrics.incrDeleteSnapshotOps(); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { @@ -1611,6 +1619,7 @@ class NameNodeRpcServer implements NamenodeProtocols { if (snapshotNewName == null || snapshotNewName.isEmpty()) { throw new IOException("The new snapshot name is null or empty."); } + namesystem.checkOperation(OperationCategory.WRITE); metrics.incrRenameSnapshotOps(); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { @@ -1650,6 +1659,7 @@ class NameNodeRpcServer implements NamenodeProtocols { public long addCacheDirective( CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion (retryCache, null); if (cacheEntry != null && cacheEntry.isSuccess()) { @@ -1671,6 +1681,7 @@ class NameNodeRpcServer implements NamenodeProtocols { public void modifyCacheDirective( CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; @@ -1688,6 +1699,7 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public void removeCacheDirective(long id) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; @@ -1714,6 +1726,7 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override //ClientProtocol public void addCachePool(CachePoolInfo info) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response @@ -1730,6 +1743,7 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public void modifyCachePool(CachePoolInfo info) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response @@ -1746,6 +1760,7 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public void removeCachePool(String cachePoolName) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; @@ -1808,6 +1823,7 @@ class NameNodeRpcServer implements NamenodeProtocols { public void createEncryptionZone(String src, String keyName) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; @@ -1839,6 +1855,7 @@ class NameNodeRpcServer implements NamenodeProtocols { public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response @@ -1868,6 +1885,7 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public void removeXAttr(String src, XAttr xAttr) throws IOException { checkNNStartup(); + namesystem.checkOperation(OperationCategory.WRITE); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c127b44/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java index c0d320c..1c270aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java @@ -214,7 +214,8 @@ public class TestRetryCacheWithHA { abstract class AtMostOnceOp { private final String name; final DFSClient client; - + int expectedUpdateCount = 0; + AtMostOnceOp(String name, DFSClient client) { this.name = name; this.client = client; @@ -224,6 +225,9 @@ public class TestRetryCacheWithHA { abstract void invoke() throws Exception; abstract boolean checkNamenodeBeforeReturn() throws Exception; abstract Object getResult(); + int getExpectedCacheUpdateCount() { + return expectedUpdateCount; + } } /** createSnapshot operaiton */ @@ -603,7 +607,7 @@ public class TestRetryCacheWithHA { class DeleteOp extends AtMostOnceOp { private final String target; private boolean deleted; - + DeleteOp(DFSClient client, String target) { super("delete", client); this.target = target; @@ -613,12 +617,14 @@ public class TestRetryCacheWithHA { void prepare() throws Exception { Path p = new Path(target); if (!dfs.exists(p)) { + expectedUpdateCount++; DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0); } } @Override void invoke() throws Exception { + expectedUpdateCount++; deleted = client.delete(target, true); } @@ -654,12 +660,14 @@ public class TestRetryCacheWithHA { void prepare() throws Exception { Path p = new Path(target); if (!dfs.exists(p)) { + expectedUpdateCount++; DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0); } } @Override void invoke() throws Exception { + expectedUpdateCount++; client.createSymlink(target, link, false); } @@ -772,11 +780,13 @@ public class TestRetryCacheWithHA { @Override void prepare() throws Exception { + expectedUpdateCount++; dfs.addCachePool(new CachePoolInfo(directive.getPool())); } @Override void invoke() throws Exception { + expectedUpdateCount++; result = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE)); } @@ -818,12 +828,15 @@ public class TestRetryCacheWithHA { @Override void prepare() throws Exception { + expectedUpdateCount++; dfs.addCachePool(new CachePoolInfo(directive.getPool())); + expectedUpdateCount++; id = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE)); } @Override void invoke() throws Exception { + expectedUpdateCount++; client.modifyCacheDirective( new CacheDirectiveInfo.Builder(). setId(id). @@ -874,12 +887,15 @@ public class TestRetryCacheWithHA { @Override void prepare() throws Exception { + expectedUpdateCount++; dfs.addCachePool(new CachePoolInfo(directive.getPool())); + expectedUpdateCount++; id = dfs.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE)); } @Override void invoke() throws Exception { + expectedUpdateCount++; client.removeCacheDirective(id); } @@ -921,6 +937,7 @@ public class TestRetryCacheWithHA { @Override void invoke() throws Exception { + expectedUpdateCount++; client.addCachePool(new CachePoolInfo(pool)); } @@ -953,11 +970,13 @@ public class TestRetryCacheWithHA { @Override void prepare() throws Exception { + expectedUpdateCount++; client.addCachePool(new CachePoolInfo(pool).setLimit(10l)); } @Override void invoke() throws Exception { + expectedUpdateCount++; client.modifyCachePool(new CachePoolInfo(pool).setLimit(99l)); } @@ -990,11 +1009,13 @@ public class TestRetryCacheWithHA { @Override void prepare() throws Exception { + expectedUpdateCount++; client.addCachePool(new CachePoolInfo(pool)); } @Override void invoke() throws Exception { + expectedUpdateCount++; client.removeCachePool(pool); } @@ -1029,12 +1050,14 @@ public class TestRetryCacheWithHA { void prepare() throws Exception { Path p = new Path(src); if (!dfs.exists(p)) { + expectedUpdateCount++; DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0); } } @Override void invoke() throws Exception { + expectedUpdateCount++; client.setXAttr(src, "user.key", "value".getBytes(), EnumSet.of(XAttrSetFlag.CREATE)); } @@ -1071,7 +1094,9 @@ public class TestRetryCacheWithHA { void prepare() throws Exception { Path p = new Path(src); if (!dfs.exists(p)) { + expectedUpdateCount++; DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0); + expectedUpdateCount++; client.setXAttr(src, "user.key", "value".getBytes(), EnumSet.of(XAttrSetFlag.CREATE)); } @@ -1079,6 +1104,7 @@ public class TestRetryCacheWithHA { @Override void invoke() throws Exception { + expectedUpdateCount++; client.removeXAttr(src, "user.key"); } @@ -1315,6 +1341,13 @@ public class TestRetryCacheWithHA { assertTrue("CacheUpdated on NN0: " + updatedNN0, updatedNN0 > 0); // Cache updated metrics on NN0 should be >0 since NN1 applied the editlog assertTrue("CacheUpdated on NN1: " + updatedNN1, updatedNN1 > 0); + long expectedUpdateCount = op.getExpectedCacheUpdateCount(); + if (expectedUpdateCount > 0) { + assertEquals("CacheUpdated on NN0: " + updatedNN0, expectedUpdateCount, + updatedNN0); + assertEquals("CacheUpdated on NN0: " + updatedNN1, expectedUpdateCount, + updatedNN1); + } } /**