This is an automated email from the ASF dual-hosted git repository. zanderxu pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit ac0605db419294166a68ad03f65b23a1ad89a329 Author: ZanderXu <zande...@apache.org> AuthorDate: Tue Mar 26 10:50:20 2024 +0800 HDFS-17416. [FGL] Monitor threads in BlockManager.class support fine-grained lock (#6647) --- .../hdfs/server/blockmanagement/BlockManager.java | 76 +++++++++++++--------- .../server/blockmanagement/ProvidedStorageMap.java | 2 +- .../hadoop/hdfs/server/namenode/FSNamesystem.java | 9 ++- .../server/blockmanagement/TestBlockManager.java | 2 +- .../blockmanagement/TestProvidedStorageMap.java | 3 +- .../blockmanagement/TestReplicationPolicy.java | 7 ++ 6 files changed, 64 insertions(+), 35 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 18049d510b6..f74c1de0b8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2107,7 +2107,9 @@ int computeInvalidateWork(int nodesToProcess) { */ int computeBlockReconstructionWork(int blocksToProcess) { List<List<BlockInfo>> blocksToReconstruct = null; - namesystem.writeLock(); + // TODO: Change it to readLock(FSNamesystemLockMode.BM) + // since chooseLowRedundancyBlocks is thread safe. + namesystem.writeLock(FSNamesystemLockMode.BM); try { boolean reset = false; if (replQueueResetToHeadThreshold > 0) { @@ -2122,7 +2124,7 @@ int computeBlockReconstructionWork(int blocksToProcess) { blocksToReconstruct = neededReconstruction .chooseLowRedundancyBlocks(blocksToProcess, reset); } finally { - namesystem.writeUnlock("computeBlockReconstructionWork"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "computeBlockReconstructionWork"); } return computeReconstructionWorkForBlocks(blocksToReconstruct); } @@ -2141,7 +2143,9 @@ int computeReconstructionWorkForBlocks( List<BlockReconstructionWork> reconWork = new ArrayList<>(); // Step 1: categorize at-risk blocks into replication and EC tasks - namesystem.writeLock(); + // TODO: Change to readLock(FSNamesystemLockMode.GLOBAL) + // since neededReconstruction is thread safe. + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); try { synchronized (neededReconstruction) { for (int priority = 0; priority < blocksToReconstruct @@ -2156,7 +2160,7 @@ int computeReconstructionWorkForBlocks( } } } finally { - namesystem.writeUnlock("computeReconstructionWorkForBlocks"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "computeReconstructionWorkForBlocks"); } // Step 2: choose target nodes for each reconstruction task @@ -2181,7 +2185,9 @@ int computeReconstructionWorkForBlocks( } // Step 3: add tasks to the DN - namesystem.writeLock(); + // TODO: Change to readLock(FSNamesystemLockMode.BM) + // since pendingReconstruction and neededReconstruction are thread safe. + namesystem.writeLock(FSNamesystemLockMode.BM); try { for (BlockReconstructionWork rw : reconWork) { final DatanodeStorageInfo[] targets = rw.getTargets(); @@ -2197,7 +2203,7 @@ int computeReconstructionWorkForBlocks( } } } finally { - namesystem.writeUnlock("computeReconstructionWorkForBlocks"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "computeReconstructionWorkForBlocks"); } if (blockLog.isDebugEnabled()) { @@ -2688,7 +2694,9 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, void processPendingReconstructions() { BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks(); if (timedOutItems != null) { - namesystem.writeLock(); + // TODO: Change to readLock(FSNamesystemLockMode.BM) + // since neededReconstruction is thread safe. + namesystem.writeLock(FSNamesystemLockMode.BM); try { for (int i = 0; i < timedOutItems.length; i++) { /* @@ -2707,7 +2715,7 @@ void processPendingReconstructions() { } } } finally { - namesystem.writeUnlock("processPendingReconstructions"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "processPendingReconstructions"); } /* If we know the target datanodes where the replication timedout, * we could invoke decBlocksScheduled() on it. Its ok for now. @@ -2902,7 +2910,7 @@ public boolean processReport(final DatanodeID nodeID, final DatanodeStorage storage, final BlockListAsLongs newReport, BlockReportContext context) throws IOException { - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); final long startTime = Time.monotonicNow(); //after acquiring write lock final long endTime; DatanodeDescriptor node; @@ -2960,7 +2968,7 @@ public boolean processReport(final DatanodeID nodeID, storageInfo.receivedBlockReport(); } finally { endTime = Time.monotonicNow(); - namesystem.writeUnlock("processReport"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processReport"); } if (blockLog.isDebugEnabled()) { @@ -3033,7 +3041,7 @@ void rescanPostponedMisreplicatedBlocks() { if (getPostponedMisreplicatedBlocksCount() == 0) { return; } - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.GLOBAL); long startTime = Time.monotonicNow(); long startSize = postponedMisreplicatedBlocks.size(); try { @@ -3062,7 +3070,8 @@ void rescanPostponedMisreplicatedBlocks() { postponedMisreplicatedBlocks.addAll(rescannedMisreplicatedBlocks); rescannedMisreplicatedBlocks.clear(); long endSize = postponedMisreplicatedBlocks.size(); - namesystem.writeUnlock("rescanPostponedMisreplicatedBlocks"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, + "rescanPostponedMisreplicatedBlocks"); LOG.info("Rescan of postponedMisreplicatedBlocks completed in {}" + " msecs. {} blocks are left. {} blocks were removed.", (Time.monotonicNow() - startTime), endSize, (startSize - endSize)); @@ -3104,7 +3113,8 @@ void processTimedOutExcessBlocks() { if (excessRedundancyMap.size() == 0) { return; } - namesystem.writeLock(); + // TODO: Change to readLock(FSNamesysteLockMode.BM) since invalidateBlocks is thread safe. + namesystem.writeLock(FSNamesystemLockMode.BM); long now = Time.monotonicNow(); int processed = 0; try { @@ -3158,7 +3168,7 @@ void processTimedOutExcessBlocks() { } } } finally { - namesystem.writeUnlock("processTimedOutExcessBlocks"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "processTimedOutExcessBlocks"); LOG.info("processTimedOutExcessBlocks {} msecs.", (Time.monotonicNow() - now)); } } @@ -3264,7 +3274,7 @@ void processFirstBlockReport( final DatanodeStorageInfo storageInfo, final BlockListAsLongs report) throws IOException { if (report == null) return; - assert (namesystem.hasWriteLock()); + assert (namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL)); assert (storageInfo.getBlockReportCount() == 0); for (BlockReportReplica iblk : report) { @@ -3316,6 +3326,7 @@ void processFirstBlockReport( // OpenFileBlocks only inside snapshots also will be added to safemode // threshold. So we need to update such blocks to safemode // refer HDFS-5283 + // isInSnapshot involves the full path, so it needs FSReadLock. if (namesystem.isInSnapshot(storedBlock.getBlockCollectionId())) { int numOfReplicas = storedBlock.getUnderConstructionFeature() .getNumExpectedLocations(); @@ -3731,7 +3742,7 @@ void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock, private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported, DatanodeStorageInfo storageInfo) throws IOException { - assert (storedBlock != null && namesystem.hasWriteLock()); + assert (storedBlock != null && namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL)); if (!namesystem.isInStartupSafeMode() || isPopulatingReplQueues()) { addStoredBlock(storedBlock, reported, storageInfo, null, false); @@ -3766,7 +3777,7 @@ private Block addStoredBlock(final BlockInfo block, DatanodeDescriptor delNodeHint, boolean logEveryBlock) throws IOException { - assert block != null && namesystem.hasWriteLock(); + assert block != null && namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL); BlockInfo storedBlock; DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); if (!block.isComplete()) { @@ -4002,7 +4013,7 @@ private void processMisReplicatesAsync() throws InterruptedException { while (namesystem.isRunning() && !Thread.currentThread().isInterrupted()) { int processed = 0; - namesystem.writeLockInterruptibly(); + namesystem.writeLockInterruptibly(FSNamesystemLockMode.GLOBAL); try { while (processed < numBlocksPerIteration && blocksItr.hasNext()) { BlockInfo block = blocksItr.next(); @@ -4061,7 +4072,7 @@ private void processMisReplicatesAsync() throws InterruptedException { break; } } finally { - namesystem.writeUnlock("processMisReplicatesAsync"); + namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processMisReplicatesAsync"); LOG.info("Reconstruction queues initialisation progress: {}, total number of blocks " + "processed: {}/{}", reconstructionQueuesInitProgress, totalProcessed, totalBlocks); // Make sure it is out of the write lock for sufficiently long time. @@ -4214,7 +4225,7 @@ private void processExtraRedundancyBlock(final BlockInfo block, private boolean processExtraRedundancyBlockWithoutPostpone(final BlockInfo block, final short replication, final DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL); if (addedNode == delNodeHint) { delNodeHint = null; } @@ -4258,7 +4269,10 @@ private void chooseExcessRedundancies( BlockInfo storedBlock, short replication, DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) { - assert namesystem.hasWriteLock(); + // bc.getStoragePolicyID() needs FSReadLock. + // TODO: Change to hasReadLock(FSNamesystemLockMode.GLOBAL) + // since chooseExcessRedundancyContiguous is thread safe. + assert namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL); // first form a rack to datanodes map and BlockCollection bc = getBlockCollection(storedBlock); if (storedBlock.isStriped()) { @@ -4627,7 +4641,7 @@ private boolean processAndHandleReportedBlock( */ public void processIncrementalBlockReport(final DatanodeID nodeID, final StorageReceivedDeletedBlocks srdb) throws IOException { - assert namesystem.hasWriteLock(); + assert namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL); final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); if (node == null || !node.isRegistered()) { blockLog.warn("BLOCK* processIncrementalBlockReport" @@ -4991,6 +5005,8 @@ public void updateLastBlock(BlockInfo lastBlock, ExtendedBlock newBlock) { /** updates a block in needed reconstruction queue. */ private void updateNeededReconstructions(final BlockInfo block, final int curReplicasDelta, int expectedReplicasDelta) { + // TODO: Change to readLock(FSNamesystemLockMode.BM) + // since pendingReconstruction and neededReconstruction are thread safe. namesystem.writeLock(FSNamesystemLockMode.BM); try { if (!isPopulatingReplQueues() || !block.isComplete()) { @@ -5042,8 +5058,9 @@ public void checkRedundancy(BlockCollection bc) { */ private int invalidateWorkForOneNode(DatanodeInfo dn) { final List<Block> toInvalidate; - - namesystem.writeLock(); + + // TODO: Change to readLock(FSNamesystemLockMode.BM) since invalidateBlocks is thread safe. + namesystem.writeLock(FSNamesystemLockMode.BM); try { // blocks should not be replicated or removed if safe mode is on if (namesystem.isInSafeMode()) { @@ -5067,7 +5084,7 @@ private int invalidateWorkForOneNode(DatanodeInfo dn) { return 0; } } finally { - namesystem.writeUnlock("invalidateWorkForOneNode"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "invalidateWorkForOneNode"); } if (blockLog.isDebugEnabled()) { blockLog.debug("BLOCK* {}: ask {} to delete {}", @@ -5295,7 +5312,7 @@ private class MarkedDeleteBlockScrubber implements Runnable { private void remove(long time) { if (checkToDeleteIterator()) { - namesystem.writeLock(); + namesystem.writeLock(FSNamesystemLockMode.BM); try { while (toDeleteIterator.hasNext()) { removeBlock(toDeleteIterator.next()); @@ -5306,7 +5323,7 @@ private void remove(long time) { } } } finally { - namesystem.writeUnlock("markedDeleteBlockScrubberThread"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "markedDeleteBlockScrubberThread"); } } } @@ -5420,12 +5437,13 @@ int computeDatanodeWork() { int workFound = this.computeBlockReconstructionWork(blocksToProcess); // Update counters - namesystem.writeLock(); + // TODO: Make corruptReplicas thread safe to remove this lock. + namesystem.writeLock(FSNamesystemLockMode.BM); try { this.updateState(); this.scheduledReplicationBlocksCount = workFound; } finally { - namesystem.writeUnlock("computeDatanodeWork"); + namesystem.writeUnlock(FSNamesystemLockMode.BM, "computeDatanodeWork"); } workFound += this.computeInvalidateWork(nodesToProcess); return workFound; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java index 4e4464b3259..558c43d0677 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java @@ -145,7 +145,7 @@ DatanodeStorageInfo getStorage(DatanodeDescriptor dn, DatanodeStorage s) private void processProvidedStorageReport() throws IOException { - assert lock.hasWriteLock() : "Not holding write lock"; + assert lock.hasWriteLock(FSNamesystemLockMode.GLOBAL) : "Not holding write lock"; if (providedStorageInfo.getBlockReportCount() == 0 || providedDescriptor.activeProvidedDatanodes() == 0) { LOG.info("Calling process first blk report from storage: " diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 7e1c90966ae..22ab3a5e4a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3973,7 +3973,7 @@ BlockInfo getStoredBlock(Block block) { @Override public boolean isInSnapshot(long blockCollectionID) { - assert hasReadLock(); + assert hasReadLock(FSNamesystemLockMode.FS); final INodeFile bc = getBlockCollection(blockCollectionID); if (bc == null || !bc.isUnderConstruction()) { return false; @@ -5371,11 +5371,14 @@ NamenodeCommand startCheckpoint(NamenodeRegistration backupNode, public void processIncrementalBlockReport(final DatanodeID nodeID, final StorageReceivedDeletedBlocks srdb) throws IOException { - writeLock(); + // completeBlock will updateQuota, so it needs BMWriteLock and FSWriteLock. + // processExtraRedundancyBlock chooses excess replicas depending on storage policyId, + // so it needs FSReadLock. + writeLock(FSNamesystemLockMode.GLOBAL); try { blockManager.processIncrementalBlockReport(nodeID, srdb); } finally { - writeUnlock("processIncrementalBlockReport"); + writeUnlock(FSNamesystemLockMode.GLOBAL, "processIncrementalBlockReport"); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 697c4975e2f..22d203c98d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -2334,4 +2334,4 @@ public void delayDeleteReplica() { DataNodeFaultInjector.set(oldInjector); } } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java index c7f83797862..aa692185cb3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestProvidedImpl; +import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.util.RwLock; import org.junit.Before; @@ -87,7 +88,7 @@ public void testProvidedStorageMap() throws IOException { DatanodeStorage dn1DiskStorage = new DatanodeStorage( "sid-1", DatanodeStorage.State.NORMAL, StorageType.DISK); - when(nameSystemLock.hasWriteLock()).thenReturn(true); + when(nameSystemLock.hasWriteLock(FSNamesystemLockMode.GLOBAL)).thenReturn(true); DatanodeStorageInfo dns1Provided = providedMap.getStorage(dn1, dn1ProvidedStorage); DatanodeStorageInfo dns1Disk = providedMap.getStorage(dn1, dn1DiskStorage); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index b99e060ee38..fd7ea14446a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.TestINodeFile; +import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.net.Node; import org.apache.hadoop.util.ReflectionUtils; @@ -1406,6 +1407,12 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication() FSNamesystem mockNS = mock(FSNamesystem.class); when(mockNS.hasWriteLock()).thenReturn(true); when(mockNS.hasReadLock()).thenReturn(true); + when(mockNS.hasWriteLock(FSNamesystemLockMode.GLOBAL)).thenReturn(true); + when(mockNS.hasReadLock(FSNamesystemLockMode.GLOBAL)).thenReturn(true); + when(mockNS.hasWriteLock(FSNamesystemLockMode.BM)).thenReturn(true); + when(mockNS.hasReadLock(FSNamesystemLockMode.BM)).thenReturn(true); + when(mockNS.hasWriteLock(FSNamesystemLockMode.FS)).thenReturn(true); + when(mockNS.hasReadLock(FSNamesystemLockMode.FS)).thenReturn(true); BlockManager bm = new BlockManager(mockNS, false, new HdfsConfiguration()); LowRedundancyBlocks lowRedundancyBlocks = bm.neededReconstruction; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org