This is an automated email from the ASF dual-hosted git repository.

zanderxu pushed a commit to branch HDFS-17384
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 213f478f58cbbb57b67ae4f37b2c1639677742c7
Author: ZanderXu <zande...@apache.org>
AuthorDate: Fri Mar 22 10:08:46 2024 +0800

    HDFS-17413. [FGL] CacheReplicationMonitor supports fine-grained lock (#6641)
---
 .../blockmanagement/CacheReplicationMonitor.java   | 13 +++++----
 .../hadoop/hdfs/server/namenode/CacheManager.java  | 33 +++++++++++-----------
 .../hadoop/hdfs/server/namenode/FSNamesystem.java  | 32 ++++++++++-----------
 3 files changed, 40 insertions(+), 38 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
index f9036c550e85..fad9f4248b4f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.util.GSet;
@@ -223,7 +224,7 @@ public class CacheReplicationMonitor extends Thread 
implements Closeable {
    * after are not atomic.
    */
   public void waitForRescanIfNeeded() {
-    Preconditions.checkArgument(!namesystem.hasWriteLock(),
+    
Preconditions.checkArgument(!namesystem.hasWriteLock(FSNamesystemLockMode.FS),
         "Must not hold the FSN write lock when waiting for a rescan.");
     Preconditions.checkArgument(lock.isHeldByCurrentThread(),
         "Must hold the CRM lock when waiting for a rescan.");
@@ -268,7 +269,7 @@ public class CacheReplicationMonitor extends Thread 
implements Closeable {
    */
   @Override
   public void close() throws IOException {
-    Preconditions.checkArgument(namesystem.hasWriteLock());
+    
Preconditions.checkArgument(namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL));
     lock.lock();
     try {
       if (shutdown) return;
@@ -291,7 +292,7 @@ public class CacheReplicationMonitor extends Thread 
implements Closeable {
     scannedBlocks = 0;
     lastScanTimeMs = Time.monotonicNow();
     try {
-      namesystem.writeLock();
+      namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
       try {
         lock.lock();
         if (shutdown) {
@@ -308,7 +309,7 @@ public class CacheReplicationMonitor extends Thread 
implements Closeable {
       rescanCachedBlockMap();
       blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
     } finally {
-      namesystem.writeUnlock("cacheReplicationMonitorRescan");
+      namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, 
"cacheReplicationMonitorRescan");
     }
   }
 
@@ -325,11 +326,11 @@ public class CacheReplicationMonitor extends Thread 
implements Closeable {
     long now = Time.monotonicNow();
     if (now - last > cacheManager.getMaxLockTimeMs()) {
       try {
-        namesystem.writeUnlock();
+        namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, 
"cacheReplicationMonitorRescan");
         Thread.sleep(cacheManager.getSleepTimeMs());
       } catch (InterruptedException e) {
       } finally {
-        namesystem.writeLock();
+        namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
       }
     }
   }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
index 24ccf45b91d2..d296ff278b5e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
@@ -80,6 +80,7 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBl
 import 
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
+import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
@@ -317,7 +318,7 @@ public class CacheManager {
   }
 
   public void clearDirectiveStats() {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
     for (CacheDirective directive : directivesById.values()) {
       directive.resetStatistics();
     }
@@ -327,7 +328,7 @@ public class CacheManager {
    * @return Unmodifiable view of the collection of CachePools.
    */
   public Collection<CachePool> getCachePools() {
-    assert namesystem.hasReadLock();
+    assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
     return Collections.unmodifiableCollection(cachePools.values());
   }
 
@@ -335,18 +336,18 @@ public class CacheManager {
    * @return Unmodifiable view of the collection of CacheDirectives.
    */
   public Collection<CacheDirective> getCacheDirectives() {
-    assert namesystem.hasReadLock();
+    assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
     return Collections.unmodifiableCollection(directivesById.values());
   }
   
   @VisibleForTesting
   public GSet<CachedBlock, CachedBlock> getCachedBlocks() {
-    assert namesystem.hasReadLock();
+    assert namesystem.hasReadLock(FSNamesystemLockMode.BM);
     return cachedBlocks;
   }
 
   private long getNextDirectiveId() throws IOException {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
     if (nextDirectiveId >= Long.MAX_VALUE - 1) {
       throw new IOException("No more available IDs.");
     }
@@ -574,7 +575,7 @@ public class CacheManager {
   public CacheDirectiveInfo addDirective(
       CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> 
flags)
       throws IOException {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
     CacheDirective directive;
     try {
       CachePool pool = getCachePool(validatePoolName(info));
@@ -652,7 +653,7 @@ public class CacheManager {
 
   public void modifyDirective(CacheDirectiveInfo info,
       FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
     String idString =
         (info.getId() == null) ?
             "(null)" : info.getId().toString();
@@ -703,7 +704,7 @@ public class CacheManager {
 
   private void removeInternal(CacheDirective directive)
       throws InvalidRequestException {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
     // Remove the corresponding entry in directivesByPath.
     String path = directive.getPath();
     if (!directivesByPath.remove(path, directive)) {
@@ -724,7 +725,7 @@ public class CacheManager {
 
   public void removeDirective(long id, FSPermissionChecker pc)
       throws IOException {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
     try {
       CacheDirective directive = getById(id);
       checkWritePermission(pc, directive.getPool());
@@ -740,7 +741,7 @@ public class CacheManager {
         listCacheDirectives(long prevId,
             CacheDirectiveInfo filter,
             FSPermissionChecker pc) throws IOException {
-    assert namesystem.hasReadLock();
+    assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
     final int NUM_PRE_ALLOCATED_ENTRIES = 16;
     String filterPath = null;
     if (filter.getPath() != null) {
@@ -815,7 +816,7 @@ public class CacheManager {
    */
   public CachePoolInfo addCachePool(CachePoolInfo info)
       throws IOException {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
     CachePool pool;
     try {
       CachePoolInfo.validate(info);
@@ -845,7 +846,7 @@ public class CacheManager {
    */
   public void modifyCachePool(CachePoolInfo info)
       throws IOException {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
     StringBuilder bld = new StringBuilder();
     try {
       CachePoolInfo.validate(info);
@@ -915,7 +916,7 @@ public class CacheManager {
    */
   public void removeCachePool(String poolName)
       throws IOException {
-    assert namesystem.hasWriteLock();
+    assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
     try {
       CachePoolInfo.validateName(poolName);
       CachePool pool = cachePools.remove(poolName);
@@ -941,7 +942,7 @@ public class CacheManager {
 
   public BatchedListEntries<CachePoolEntry>
       listCachePools(FSPermissionChecker pc, String prevKey) {
-    assert namesystem.hasReadLock();
+    assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
     final int NUM_PRE_ALLOCATED_ENTRIES = 16;
     ArrayList<CachePoolEntry> results = 
         new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES);
@@ -1008,7 +1009,7 @@ public class CacheManager {
           datanodeID, DFS_NAMENODE_CACHING_ENABLED_KEY, blockIds.size());
       return;
     }
-    namesystem.writeLock();
+    namesystem.writeLock(FSNamesystemLockMode.BM);
     final long startTime = Time.monotonicNow();
     final long endTime;
     try {
@@ -1022,7 +1023,7 @@ public class CacheManager {
       processCacheReportImpl(datanode, blockIds);
     } finally {
       endTime = Time.monotonicNow();
-      namesystem.writeUnlock("processCacheReport");
+      namesystem.writeUnlock(FSNamesystemLockMode.BM, "processCacheReport");
     }
 
     // Log the block report processing stats from Namenode perspective
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 1fda50be26a9..202c59ac5ec7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -7767,7 +7767,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     checkOperation(OperationCategory.WRITE);
     FSPermissionChecker.setOperationType(operationName);
     try {
-      writeLock();
+      writeLock(FSNamesystemLockMode.FS);
       try {
         checkOperation(OperationCategory.WRITE);
         checkNameNodeSafeMode("Cannot add cache directive");
@@ -7776,7 +7776,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
       } finally {
         effectiveDirectiveStr = effectiveDirective != null ?
             effectiveDirective.toString() : null;
-        writeUnlock(operationName,
+        writeUnlock(FSNamesystemLockMode.FS, operationName,
             getLockReportInfoSupplier(effectiveDirectiveStr));
       }
     } catch (AccessControlException ace) {
@@ -7800,14 +7800,14 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     FSPermissionChecker.setOperationType(operationName);
     checkOperation(OperationCategory.WRITE);
     try {
-      writeLock();
+      writeLock(FSNamesystemLockMode.FS);
       try {
         checkOperation(OperationCategory.WRITE);
         checkNameNodeSafeMode("Cannot add cache directive");
         FSNDNCacheOp.modifyCacheDirective(this, cacheManager, directive, flags,
             logRetryCache);
       } finally {
-        writeUnlock(operationName,
+        writeUnlock(FSNamesystemLockMode.FS, operationName,
             getLockReportInfoSupplier(idStr, directive.toString()));
       }
     } catch (AccessControlException ace) {
@@ -7826,14 +7826,14 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     checkOperation(OperationCategory.WRITE);
     FSPermissionChecker.setOperationType(operationName);
     try {
-      writeLock();
+      writeLock(FSNamesystemLockMode.FS);
       try {
         checkOperation(OperationCategory.WRITE);
         checkNameNodeSafeMode("Cannot remove cache directives");
         FSNDNCacheOp.removeCacheDirective(this, cacheManager, id,
             logRetryCache);
       } finally {
-        writeUnlock(operationName, getLockReportInfoSupplier(idStr));
+        writeUnlock(FSNamesystemLockMode.FS, operationName, 
getLockReportInfoSupplier(idStr));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, idStr, null, null);
@@ -7851,13 +7851,13 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     BatchedListEntries<CacheDirectiveEntry> results;
     cacheManager.waitForRescanIfNeeded();
     try {
-      readLock();
+      readLock(FSNamesystemLockMode.FS);
       try {
         checkOperation(OperationCategory.READ);
         results = FSNDNCacheOp.listCacheDirectives(this, cacheManager, startId,
             filter);
       } finally {
-        readUnlock(operationName,
+        readUnlock(FSNamesystemLockMode.FS, operationName,
             getLockReportInfoSupplier(filter.toString()));
       }
     } catch (AccessControlException ace) {
@@ -7876,7 +7876,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     String poolName = req == null ? null : req.getPoolName();
     checkSuperuserPrivilege(operationName, poolName);
     try {
-      writeLock();
+      writeLock(FSNamesystemLockMode.FS);
       try {
         checkOperation(OperationCategory.WRITE);
         checkNameNodeSafeMode("Cannot add cache pool"
@@ -7885,7 +7885,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
             logRetryCache);
         poolInfoStr = info.toString();
       } finally {
-        writeUnlock(operationName, getLockReportInfoSupplier(poolInfoStr));
+        writeUnlock(FSNamesystemLockMode.FS, operationName, 
getLockReportInfoSupplier(poolInfoStr));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, poolInfoStr);
@@ -7903,14 +7903,14 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
         (req == null ? null : req.getPoolName()) + "}";
     checkSuperuserPrivilege(operationName, poolNameStr);
     try {
-      writeLock();
+      writeLock(FSNamesystemLockMode.FS);
       try {
         checkOperation(OperationCategory.WRITE);
         checkNameNodeSafeMode("Cannot modify cache pool"
             + (req == null ? null : req.getPoolName()));
         FSNDNCacheOp.modifyCachePool(this, cacheManager, req, logRetryCache);
       } finally {
-        writeUnlock(operationName, getLockReportInfoSupplier(poolNameStr,
+        writeUnlock(FSNamesystemLockMode.FS, operationName, 
getLockReportInfoSupplier(poolNameStr,
             req == null ? null : req.toString()));
       }
     } catch (AccessControlException ace) {
@@ -7930,14 +7930,14 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     String poolNameStr = "{poolName: " + cachePoolName + "}";
     checkSuperuserPrivilege(operationName, poolNameStr);
     try {
-      writeLock();
+      writeLock(FSNamesystemLockMode.FS);
       try {
         checkOperation(OperationCategory.WRITE);
         checkNameNodeSafeMode("Cannot modify cache pool" + cachePoolName);
         FSNDNCacheOp.removeCachePool(this, cacheManager, cachePoolName,
             logRetryCache);
       } finally {
-        writeUnlock(operationName, getLockReportInfoSupplier(poolNameStr));
+        writeUnlock(FSNamesystemLockMode.FS, operationName, 
getLockReportInfoSupplier(poolNameStr));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, poolNameStr);
@@ -7955,12 +7955,12 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     FSPermissionChecker.setOperationType(operationName);
     cacheManager.waitForRescanIfNeeded();
     try {
-      readLock();
+      readLock(FSNamesystemLockMode.FS);
       try {
         checkOperation(OperationCategory.READ);
         results = FSNDNCacheOp.listCachePools(this, cacheManager, prevKey);
       } finally {
-        readUnlock(operationName, getLockReportInfoSupplier(null));
+        readUnlock(FSNamesystemLockMode.FS, operationName, 
getLockReportInfoSupplier(null));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, null);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to