This is an automated email from the ASF dual-hosted git repository. zanderxu pushed a commit to branch HDFS-17384 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 07c4b73dc5008e4d3815a0b77e9528049b519d5c Author: ZanderXu <zande...@apache.org> AuthorDate: Wed Mar 27 09:45:17 2024 +0800 HDFS-17417. [FGL] HeartbeatManager and DatanodeAdminMonitor support fine-grained locking (#6656) --- .../hdfs/server/blockmanagement/BlockManager.java | 10 +++--- .../DatanodeAdminBackoffMonitor.java | 38 ++++++++++++---------- .../DatanodeAdminDefaultMonitor.java | 11 ++++--- .../server/blockmanagement/DatanodeManager.java | 5 +-- .../server/blockmanagement/HeartbeatManager.java | 9 ++--- 5 files changed, 41 insertions(+), 32 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 82b82433e70e..23a864f731b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1825,7 +1825,7 @@ public class BlockManager implements BlockStatsMXBean { /** Remove the blocks associated to the given DatanodeStorageInfo. */ void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.BM); final Iterator<BlockInfo> it = storageInfo.getBlockIterator(); DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); while(it.hasNext()) { @@ -4876,6 +4876,7 @@ public class BlockManager implements BlockStatsMXBean { NumberReplicas num = countNodes(block); if (shouldProcessExtraRedundancy(num, expectedReplication)) { // extra redundancy block + // Here involves storage policy ID. processExtraRedundancyBlock(block, (short) expectedReplication, null, null); numExtraRedundancy++; @@ -4884,14 +4885,15 @@ public class BlockManager implements BlockStatsMXBean { // When called by tests like TestDefaultBlockPlacementPolicy. // testPlacementWithLocalRackNodesDecommissioned, it is not protected by // lock, only when called by DatanodeManager.refreshNodes have writeLock - if (namesystem.hasWriteLock()) { - namesystem.writeUnlock("processExtraRedundancyBlocksOnInService"); + if (namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL)) { + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, + "processExtraRedundancyBlocksOnInService"); try { Thread.sleep(1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); } } LOG.info("Invalidated {} extra redundancy blocks on {} after " diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java index 79d5a065b08a..d212d142d441 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode; import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.namenode.INode; @@ -170,7 +171,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase numBlocksChecked = 0; // Check decommission or maintenance progress. try { - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.BM); try { /** * Other threads can modify the pendingNode list and the cancelled @@ -208,7 +209,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase processPendingNodes(); } finally { - namesystem.writeUnlock("DatanodeAdminMonitorV2Thread"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "DatanodeAdminMonitorV2Thread"); } // After processing the above, various parts of the check() method will // take and drop the read / write lock as needed. Aside from the @@ -326,7 +327,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase */ private void processMaintenanceNodes() { // Check for any maintenance state nodes which need to be expired - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); try { for (DatanodeDescriptor dn : outOfServiceNodeBlocks.keySet()) { if (dn.isMaintenance() && dn.maintenanceExpired()) { @@ -338,12 +339,12 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase // which added the node to the cancelled list. Therefore expired // maintenance nodes do not need to be added to the toRemove list. dnAdmin.stopMaintenance(dn); - namesystem.writeUnlock("processMaintenanceNodes"); - namesystem.writeLock(); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processMaintenanceNodes"); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); } } } finally { - namesystem.writeUnlock("processMaintenanceNodes"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processMaintenanceNodes"); } } @@ -360,7 +361,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase // taking the write lock at all. return; } - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.BM); try { for (DatanodeDescriptor dn : toRemove) { final boolean isHealthy = @@ -402,7 +403,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase } } } finally { - namesystem.writeUnlock("processCompletedNodes"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "processCompletedNodes"); } } @@ -486,7 +487,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase return; } - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); try { long repQueueSize = blockManager.getLowRedundancyBlocksCount(); @@ -524,8 +525,8 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase // replication if (blocksProcessed >= blocksPerLock) { blocksProcessed = 0; - namesystem.writeUnlock("moveBlocksToPending"); - namesystem.writeLock(); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "moveBlocksToPending"); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); } blocksProcessed++; if (nextBlockAddedToPending(blockIt, dn)) { @@ -546,7 +547,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase } } } finally { - namesystem.writeUnlock("moveBlocksToPending"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "moveBlocksToPending"); } LOG.debug("{} blocks are now pending replication", pendingCount); } @@ -626,15 +627,16 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase } DatanodeStorageInfo[] storage; - namesystem.readLock(); + namesystem.readLock(FSNamesystemLockMode.BM); try { storage = dn.getStorageInfos(); } finally { - namesystem.readUnlock("scanDatanodeStorage"); + namesystem.readUnlock(FSNamesystemLockMode.BM, "scanDatanodeStorage"); } for (DatanodeStorageInfo s : storage) { - namesystem.readLock(); + // isBlockReplicatedOk involves FS. + namesystem.readLock(FSNamesystemLockMode.GLOBAL); try { // As the lock is dropped and re-taken between each storage, we need // to check the storage is still present before processing it, as it @@ -660,7 +662,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase numBlocksChecked++; } } finally { - namesystem.readUnlock("scanDatanodeStorage"); + namesystem.readUnlock(FSNamesystemLockMode.GLOBAL, "scanDatanodeStorage"); } } } @@ -683,7 +685,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase * namenode write lock while it runs. */ private void processPendingReplication() { - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); try { for (Iterator<Map.Entry<DatanodeDescriptor, List<BlockInfo>>> entIt = pendingRep.entrySet().iterator(); entIt.hasNext();) { @@ -715,7 +717,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase suspectBlocks.getOutOfServiceBlockCount()); } } finally { - namesystem.writeUnlock("processPendingReplication"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processPendingReplication"); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java index 94049b35dc48..cf06d53b8bae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeID; @@ -182,7 +183,9 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase numBlocksCheckedPerLock = 0; numNodesChecked = 0; // Check decommission or maintenance progress. - namesystem.writeLock(); + // dnAdmin.stopMaintenance(dn) needs FSReadLock + // since processExtraRedundancyBlock involves storage policy and isSufficient involves bc. + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); try { processCancelledNodes(); processPendingNodes(); @@ -191,7 +194,7 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase LOG.warn("DatanodeAdminMonitor caught exception when processing node.", e); } finally { - namesystem.writeUnlock("DatanodeAdminMonitorThread"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "DatanodeAdminMonitorThread"); } if (numBlocksChecked + numNodesChecked > 0) { LOG.info("Checked {} blocks and {} nodes this tick. {} nodes are now " + @@ -426,7 +429,7 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase // lock. // Yielding is required in case of block number is greater than the // configured per-iteration-limit. - namesystem.writeUnlock("processBlocksInternal"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processBlocksInternal"); try { LOG.debug("Yielded lock during decommission/maintenance check"); Thread.sleep(0, 500); @@ -435,7 +438,7 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase } // reset numBlocksCheckedPerLock = 0; - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); } numBlocksChecked++; numBlocksCheckedPerLock++; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index dc22fe22c96e..392f86d79fd5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1335,12 +1335,13 @@ public class DatanodeManager { */ public void refreshNodes(final Configuration conf) throws IOException { refreshHostsReader(conf); - namesystem.writeLock(); + // processExtraRedundancyBlocksOnInService involves FS in stopMaintenance and stopDecommission. + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); try { refreshDatanodes(); countSoftwareVersions(); } finally { - namesystem.writeUnlock("refreshNodes"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "refreshNodes"); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index 429d40d9fbdd..6961e9912c55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.util.Daemon; @@ -514,20 +515,20 @@ class HeartbeatManager implements DatanodeStatistics { for (DatanodeDescriptor dead : deadDatanodes) { // acquire the fsnamesystem lock, and then remove the dead node. - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.BM); try { dm.removeDeadDatanode(dead, !dead.isMaintenance()); } finally { - namesystem.writeUnlock("removeDeadDatanode"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "removeDeadDatanode"); } } for (DatanodeStorageInfo failedStorage : failedStorages) { // acquire the fsnamesystem lock, and remove blocks on the storage. - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.BM); try { blockManager.removeBlocksAssociatedTo(failedStorage); } finally { - namesystem.writeUnlock("removeBlocksAssociatedTo"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "removeBlocksAssociatedTo"); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org