This is an automated email from the ASF dual-hosted git repository. zanderxu pushed a commit to branch HDFS-17384 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 213f478f58cbbb57b67ae4f37b2c1639677742c7 Author: ZanderXu <zande...@apache.org> AuthorDate: Fri Mar 22 10:08:46 2024 +0800 HDFS-17413. [FGL] CacheReplicationMonitor supports fine-grained lock (#6641) --- .../blockmanagement/CacheReplicationMonitor.java | 13 +++++---- .../hadoop/hdfs/server/namenode/CacheManager.java | 33 +++++++++++----------- .../hadoop/hdfs/server/namenode/FSNamesystem.java | 32 ++++++++++----------- 3 files changed, 40 insertions(+), 38 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java index f9036c550e85..fad9f4248b4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INodeDirectory; import org.apache.hadoop.hdfs.server.namenode.INodeFile; +import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.util.ReadOnlyList; import org.apache.hadoop.util.GSet; @@ -223,7 +224,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { * after are not atomic. */ public void waitForRescanIfNeeded() { - Preconditions.checkArgument(!namesystem.hasWriteLock(), + Preconditions.checkArgument(!namesystem.hasWriteLock(FSNamesystemLockMode.FS), "Must not hold the FSN write lock when waiting for a rescan."); Preconditions.checkArgument(lock.isHeldByCurrentThread(), "Must hold the CRM lock when waiting for a rescan."); @@ -268,7 +269,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { */ @Override public void close() throws IOException { - Preconditions.checkArgument(namesystem.hasWriteLock()); + Preconditions.checkArgument(namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL)); lock.lock(); try { if (shutdown) return; @@ -291,7 +292,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { scannedBlocks = 0; lastScanTimeMs = Time.monotonicNow(); try { - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); try { lock.lock(); if (shutdown) { @@ -308,7 +309,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { rescanCachedBlockMap(); blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime(); } finally { - namesystem.writeUnlock("cacheReplicationMonitorRescan"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "cacheReplicationMonitorRescan"); } } @@ -325,11 +326,11 @@ public class CacheReplicationMonitor extends Thread implements Closeable { long now = Time.monotonicNow(); if (now - last > cacheManager.getMaxLockTimeMs()) { try { - namesystem.writeUnlock(); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "cacheReplicationMonitorRescan"); Thread.sleep(cacheManager.getSleepTimeMs()); } catch (InterruptedException e) { } finally { - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java index 24ccf45b91d2..d296ff278b5e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -80,6 +80,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBl import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type; import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection; +import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; @@ -317,7 +318,7 @@ public class CacheManager { } public void clearDirectiveStats() { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.FS); for (CacheDirective directive : directivesById.values()) { directive.resetStatistics(); } @@ -327,7 +328,7 @@ public class CacheManager { * @return Unmodifiable view of the collection of CachePools. */ public Collection<CachePool> getCachePools() { - assert namesystem.hasReadLock(); + assert namesystem.hasReadLock(FSNamesystemLockMode.FS); return Collections.unmodifiableCollection(cachePools.values()); } @@ -335,18 +336,18 @@ public class CacheManager { * @return Unmodifiable view of the collection of CacheDirectives. */ public Collection<CacheDirective> getCacheDirectives() { - assert namesystem.hasReadLock(); + assert namesystem.hasReadLock(FSNamesystemLockMode.FS); return Collections.unmodifiableCollection(directivesById.values()); } @VisibleForTesting public GSet<CachedBlock, CachedBlock> getCachedBlocks() { - assert namesystem.hasReadLock(); + assert namesystem.hasReadLock(FSNamesystemLockMode.BM); return cachedBlocks; } private long getNextDirectiveId() throws IOException { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.FS); if (nextDirectiveId >= Long.MAX_VALUE - 1) { throw new IOException("No more available IDs."); } @@ -574,7 +575,7 @@ public class CacheManager { public CacheDirectiveInfo addDirective( CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.FS); CacheDirective directive; try { CachePool pool = getCachePool(validatePoolName(info)); @@ -652,7 +653,7 @@ public class CacheManager { public void modifyDirective(CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.FS); String idString = (info.getId() == null) ? "(null)" : info.getId().toString(); @@ -703,7 +704,7 @@ public class CacheManager { private void removeInternal(CacheDirective directive) throws InvalidRequestException { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.FS); // Remove the corresponding entry in directivesByPath. String path = directive.getPath(); if (!directivesByPath.remove(path, directive)) { @@ -724,7 +725,7 @@ public class CacheManager { public void removeDirective(long id, FSPermissionChecker pc) throws IOException { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.FS); try { CacheDirective directive = getById(id); checkWritePermission(pc, directive.getPool()); @@ -740,7 +741,7 @@ public class CacheManager { listCacheDirectives(long prevId, CacheDirectiveInfo filter, FSPermissionChecker pc) throws IOException { - assert namesystem.hasReadLock(); + assert namesystem.hasReadLock(FSNamesystemLockMode.FS); final int NUM_PRE_ALLOCATED_ENTRIES = 16; String filterPath = null; if (filter.getPath() != null) { @@ -815,7 +816,7 @@ public class CacheManager { */ public CachePoolInfo addCachePool(CachePoolInfo info) throws IOException { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.FS); CachePool pool; try { CachePoolInfo.validate(info); @@ -845,7 +846,7 @@ public class CacheManager { */ public void modifyCachePool(CachePoolInfo info) throws IOException { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.FS); StringBuilder bld = new StringBuilder(); try { CachePoolInfo.validate(info); @@ -915,7 +916,7 @@ public class CacheManager { */ public void removeCachePool(String poolName) throws IOException { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.FS); try { CachePoolInfo.validateName(poolName); CachePool pool = cachePools.remove(poolName); @@ -941,7 +942,7 @@ public class CacheManager { public BatchedListEntries<CachePoolEntry> listCachePools(FSPermissionChecker pc, String prevKey) { - assert namesystem.hasReadLock(); + assert namesystem.hasReadLock(FSNamesystemLockMode.FS); final int NUM_PRE_ALLOCATED_ENTRIES = 16; ArrayList<CachePoolEntry> results = new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES); @@ -1008,7 +1009,7 @@ public class CacheManager { datanodeID, DFS_NAMENODE_CACHING_ENABLED_KEY, blockIds.size()); return; } - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.BM); final long startTime = Time.monotonicNow(); final long endTime; try { @@ -1022,7 +1023,7 @@ public class CacheManager { processCacheReportImpl(datanode, blockIds); } finally { endTime = Time.monotonicNow(); - namesystem.writeUnlock("processCacheReport"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "processCacheReport"); } // Log the block report processing stats from Namenode perspective diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 1fda50be26a9..202c59ac5ec7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -7767,7 +7767,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, checkOperation(OperationCategory.WRITE); FSPermissionChecker.setOperationType(operationName); try { - writeLock(); + writeLock(FSNamesystemLockMode.FS); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot add cache directive"); @@ -7776,7 +7776,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } finally { effectiveDirectiveStr = effectiveDirective != null ? effectiveDirective.toString() : null; - writeUnlock(operationName, + writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(effectiveDirectiveStr)); } } catch (AccessControlException ace) { @@ -7800,14 +7800,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, FSPermissionChecker.setOperationType(operationName); checkOperation(OperationCategory.WRITE); try { - writeLock(); + writeLock(FSNamesystemLockMode.FS); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot add cache directive"); FSNDNCacheOp.modifyCacheDirective(this, cacheManager, directive, flags, logRetryCache); } finally { - writeUnlock(operationName, + writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(idStr, directive.toString())); } } catch (AccessControlException ace) { @@ -7826,14 +7826,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, checkOperation(OperationCategory.WRITE); FSPermissionChecker.setOperationType(operationName); try { - writeLock(); + writeLock(FSNamesystemLockMode.FS); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot remove cache directives"); FSNDNCacheOp.removeCacheDirective(this, cacheManager, id, logRetryCache); } finally { - writeUnlock(operationName, getLockReportInfoSupplier(idStr)); + writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(idStr)); } } catch (AccessControlException ace) { logAuditEvent(false, operationName, idStr, null, null); @@ -7851,13 +7851,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, BatchedListEntries<CacheDirectiveEntry> results; cacheManager.waitForRescanIfNeeded(); try { - readLock(); + readLock(FSNamesystemLockMode.FS); try { checkOperation(OperationCategory.READ); results = FSNDNCacheOp.listCacheDirectives(this, cacheManager, startId, filter); } finally { - readUnlock(operationName, + readUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(filter.toString())); } } catch (AccessControlException ace) { @@ -7876,7 +7876,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, String poolName = req == null ? null : req.getPoolName(); checkSuperuserPrivilege(operationName, poolName); try { - writeLock(); + writeLock(FSNamesystemLockMode.FS); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot add cache pool" @@ -7885,7 +7885,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, logRetryCache); poolInfoStr = info.toString(); } finally { - writeUnlock(operationName, getLockReportInfoSupplier(poolInfoStr)); + writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(poolInfoStr)); } } catch (AccessControlException ace) { logAuditEvent(false, operationName, poolInfoStr); @@ -7903,14 +7903,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, (req == null ? null : req.getPoolName()) + "}"; checkSuperuserPrivilege(operationName, poolNameStr); try { - writeLock(); + writeLock(FSNamesystemLockMode.FS); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot modify cache pool" + (req == null ? null : req.getPoolName())); FSNDNCacheOp.modifyCachePool(this, cacheManager, req, logRetryCache); } finally { - writeUnlock(operationName, getLockReportInfoSupplier(poolNameStr, + writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(poolNameStr, req == null ? null : req.toString())); } } catch (AccessControlException ace) { @@ -7930,14 +7930,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, String poolNameStr = "{poolName: " + cachePoolName + "}"; checkSuperuserPrivilege(operationName, poolNameStr); try { - writeLock(); + writeLock(FSNamesystemLockMode.FS); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot modify cache pool" + cachePoolName); FSNDNCacheOp.removeCachePool(this, cacheManager, cachePoolName, logRetryCache); } finally { - writeUnlock(operationName, getLockReportInfoSupplier(poolNameStr)); + writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(poolNameStr)); } } catch (AccessControlException ace) { logAuditEvent(false, operationName, poolNameStr); @@ -7955,12 +7955,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, FSPermissionChecker.setOperationType(operationName); cacheManager.waitForRescanIfNeeded(); try { - readLock(); + readLock(FSNamesystemLockMode.FS); try { checkOperation(OperationCategory.READ); results = FSNDNCacheOp.listCachePools(this, cacheManager, prevKey); } finally { - readUnlock(operationName, getLockReportInfoSupplier(null)); + readUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(null)); } } catch (AccessControlException ace) { logAuditEvent(false, operationName, null); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org