[ https://issues.apache.org/jira/browse/HDFS-17453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834472#comment-17834472 ]
ASF GitHub Bot commented on HDFS-17453: --------------------------------------- goiri commented on code in PR #6708: URL: https://github.com/apache/hadoop/pull/6708#discussion_r1554465107 ########## hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java: ########## @@ -215,4 +229,95 @@ public void testReplaceReceivedBlock() throws InterruptedException, IOException cluster = null; } } + + @Test + public void testIBRRaceCondition() throws Exception { + cluster.shutdown(); + Configuration conf = new Configuration(); + HAUtil.setAllowStandbyReads(conf, true); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(3) + .build(); + try { + cluster.waitActive(); + cluster.transitionToActive(0); + + NameNode nn1 = cluster.getNameNode(0); + NameNode nn2 = cluster.getNameNode(1); + FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); + List<InvocationOnMock> ibrsToStandby = new ArrayList<>(); + List<DatanodeProtocolClientSideTranslatorPB> spies = new ArrayList<>(); + Phaser ibrPhaser = new Phaser(1); + for (DataNode dn : cluster.getDataNodes()) { + DatanodeProtocolClientSideTranslatorPB nnSpy = + InternalDataNodeTestUtils.spyOnBposToNN(dn, nn2); + doAnswer((inv) -> { + for (StorageReceivedDeletedBlocks srdb : + inv.getArgument(2, StorageReceivedDeletedBlocks[].class)) { + for (ReceivedDeletedBlockInfo block : srdb.getBlocks()) { + if (block.getStatus().equals(BlockStatus.RECEIVED_BLOCK)) { + ibrPhaser.arriveAndDeregister(); + } + } + } + ibrsToStandby.add(inv); + return null; + }).when(nnSpy).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + spies.add(nnSpy); + } + + Thread.sleep(1000); Review Comment: Can we do better than sleep? ########## hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java: ########## @@ -95,16 +95,27 @@ void removeAllMessagesForDatanode(DatanodeDescriptor dn) { void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState) { + long genStamp = block.getGenerationStamp(); + Queue<ReportedBlockInfo> queue = null; if (BlockIdManager.isStripedBlockID(block.getBlockId())) { Block blkId = new Block(BlockIdManager.convertToStripedID(block .getBlockId())); - getBlockQueue(blkId).add( - new ReportedBlockInfo(storageInfo, new Block(block), reportedState)); + queue = getBlockQueue(blkId); } else { block = new Block(block); - getBlockQueue(block).add( - new ReportedBlockInfo(storageInfo, block, reportedState)); + queue = getBlockQueue(block); } + // We only want the latest non-future reported block to be queued for each + // DataNode. Otherwise, there can be a race condition that causes an old + // reported block to be kept in the queue until the SNN switches to ANN and + // the old reported block will be processed and marked as corrupt by the ANN. + // See HDFS-17453 + int size = queue.size(); + if (queue.removeIf(rbi -> rbi.storageInfo.equals(storageInfo) && Review Comment: We could make this more robus to nulls with: ``` void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState) { if (storageInfo == null || block == null || reportedState == null) { return; } ... if (queue.removeIf(rbi -> storageInfo.equals(rbi.storageInfo) && ... ```` ########## hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java: ########## @@ -215,4 +229,95 @@ public void testReplaceReceivedBlock() throws InterruptedException, IOException cluster = null; } } + + @Test + public void testIBRRaceCondition() throws Exception { + cluster.shutdown(); + Configuration conf = new Configuration(); + HAUtil.setAllowStandbyReads(conf, true); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(3) + .build(); + try { + cluster.waitActive(); + cluster.transitionToActive(0); + + NameNode nn1 = cluster.getNameNode(0); + NameNode nn2 = cluster.getNameNode(1); + FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); + List<InvocationOnMock> ibrsToStandby = new ArrayList<>(); + List<DatanodeProtocolClientSideTranslatorPB> spies = new ArrayList<>(); + Phaser ibrPhaser = new Phaser(1); + for (DataNode dn : cluster.getDataNodes()) { + DatanodeProtocolClientSideTranslatorPB nnSpy = + InternalDataNodeTestUtils.spyOnBposToNN(dn, nn2); + doAnswer((inv) -> { + for (StorageReceivedDeletedBlocks srdb : + inv.getArgument(2, StorageReceivedDeletedBlocks[].class)) { + for (ReceivedDeletedBlockInfo block : srdb.getBlocks()) { + if (block.getStatus().equals(BlockStatus.RECEIVED_BLOCK)) { + ibrPhaser.arriveAndDeregister(); + } + } + } + ibrsToStandby.add(inv); + return null; + }).when(nnSpy).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + spies.add(nnSpy); + } + + Thread.sleep(1000); Review Comment: waitFor() something is better. > IncrementalBlockReport can have race condition with Edit Log Tailer > ------------------------------------------------------------------- > > Key: HDFS-17453 > URL: https://issues.apache.org/jira/browse/HDFS-17453 > Project: Hadoop HDFS > Issue Type: Bug > Components: auto-failover, ha, hdfs, namenode > Affects Versions: 3.3.0, 3.3.1, 2.10.2, 3.3.2, 3.3.5, 3.3.4, 3.3.6 > Reporter: Danny Becker > Assignee: Danny Becker > Priority: Major > Labels: pull-request-available > > h2. Summary > There is a race condition between IncrementalBlockReports (IBR) and > EditLogTailer in Standby NameNode (SNN) which can lead to leaked IBRs and > false corrupt blocks after HA Failover. The race condition occurs when the > SNN loads the edit logs before it receives the block reports from DataNode > (DN). > h2. Example > In the following example there is a block (b1) with 3 generation stamps (gs1, > gs2, gs3). > # SNN1 loads edit logs for b1gs1 and b1gs2. > # DN1 sends the IBR for b1gs1 to SNN1. > # SNN1 will determine that the reported block b1gs1 from DN1 is corrupt and > it will be queued for later. > [BlockManager.java|https://github.com/apache/hadoop/blob/6ed73896f6e8b4b7c720eff64193cb30b3e77fb2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java#L3447C1-L3464C6] > {code:java} > BlockToMarkCorrupt c = checkReplicaCorrupt( > block, reportedState, storedBlock, ucState, dn); > if (c != null) { > if (shouldPostponeBlocksFromFuture) { > // If the block is an out-of-date generation stamp or state, > // but we're the standby, we shouldn't treat it as corrupt, > // but instead just queue it for later processing. > // Storing the reported block for later processing, as that is what > // comes from the IBR / FBR and hence what we should use to compare > // against the memory state. > // See HDFS-6289 and HDFS-15422 for more context. > queueReportedBlock(storageInfo, block, reportedState, > QUEUE_REASON_CORRUPT_STATE); > } else { > toCorrupt.add(c); > } > return storedBlock; > } {code} > # DN1 sends IBR for b1gs2 and b1gs3 to SNN1. > # SNN1 processes b1sg2 and updates the blocks map. > # SNN1 queues b1gs3 for later because it determines that b1gs3 is a future > genstamp. > # SNN1 loads b1gs3 edit logs and processes the queued reports for b1. > # SNN1 processes b1gs1 first and puts it back in the queue. > # SNN1 processes b1gs3 next and updates the blocks map. > # Later, SNN1 becomes the Active NameNode (ANN) during an HA Failover. > # SNN1 will catch to the latest edit logs, then process all queued block > reports to become the ANN. > # ANN1 will process b1gs1 and mark it as corrupt. > If the example above happens for every DN which stores b1, then when the HA > failover happens, b1 will be incorrectly marked as corrupt. This will be > fixed when the first DN sends a FullBlockReport or an IBR for b1. > h2. Logs from Active Cluster > I added the following logs to confirm this issue in an active cluster: > {code:java} > BlockToMarkCorrupt c = checkReplicaCorrupt( > block, reportedState, storedBlock, ucState, dn); > if (c != null) { > DatanodeStorageInfo storedStorageInfo = storedBlock.findStorageInfo(dn); > LOG.info("Found corrupt block {} [{}, {}] from DN {}. Stored block {} from > DN {}", > block, reportedState.name(), ucState.name(), storageInfo, storedBlock, > storedStorageInfo); > if (storageInfo.equals(storedStorageInfo) && > storedBlock.getGenerationStamp() > block.getGenerationStamp()) { > LOG.info("Stored Block {} from the same DN {} has a newer GenStamp." + > storedBlock, storedStorageInfo); > } > if (shouldPostponeBlocksFromFuture) { > // If the block is an out-of-date generation stamp or state, > // but we're the standby, we shouldn't treat it as corrupt, > // but instead just queue it for later processing. > // Storing the reported block for later processing, as that is what > // comes from the IBR / FBR and hence what we should use to compare > // against the memory state. > // See HDFS-6289 and HDFS-15422 for more context. > queueReportedBlock(storageInfo, block, reportedState, > QUEUE_REASON_CORRUPT_STATE); > LOG.info("Queueing the block {} for later processing", block); > } else { > toCorrupt.add(c); > LOG.info("Marking the block {} as corrupt", block); > } > return storedBlock; > } {code} > > Logs from nn1 (Active): > {code:java} > 2024-04-03T03:00:52.524-0700,INFO,[IPC Server handler 6 on default port > 443],org.apache.hadoop.hdfs.server.namenode.FSNamesystem,"updatePipeline(blk_66092666802_65700910634, > newGS=65700925027, newLength=10485760, newNodes=[[DN1]:10010, [DN2]:10010, > [DN3]:10010, client=client1)" > 2024-04-03T03:00:52.539-0700,INFO,[IPC Server handler 6 on default port > 443],org.apache.hadoop.hdfs.server.namenode.FSNamesystem,"updatePipeline(blk_66092666802_65700910634 > => blk_66092666802_65700925027) success" > 2024-04-03T03:01:07.413-0700,INFO,[IPC Server handler 6 on default port > 443],org.apache.hadoop.hdfs.server.namenode.FSNamesystem,"updatePipeline(blk_66092666802_65700925027, > newGS=65700933553, newLength=20971520, newNodes=[[DN1]:10010, [DN2]:10010, > [DN3]:10010, client=client1)" > 2024-04-03T03:01:07.413-0700,INFO,[IPC Server handler 6 on default port > 443],org.apache.hadoop.hdfs.server.namenode.FSNamesystem,"updatePipeline(blk_66092666802_65700925027 > => blk_66092666802_65700933553) success" {code} > > Logs from nn2 (Standby): > {code:java} > 2024-04-03T03:01:23.067-0700,INFO,[Block report > processor],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Found > corrupt block blk_66092666802_65700925027 [FINALIZED, COMPLETE] from DN > [DISK]DS-1:NORMAL:[DN1]:10010. Stored block blk_66092666802_65700933553 from > DN null" > 2024-04-03T03:01:23.067-0700,INFO,[Block report > processor],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Queueing > the block blk_66092666802_65700925027 for later processing" > 2024-04-03T03:01:24.159-0700,INFO,[Block report > processor],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Found > corrupt block blk_66092666802_65700925027 [FINALIZED, COMPLETE] from DN > [DISK]DS-3:NORMAL:[DN3]:10010. Stored block blk_66092666802_65700933553 from > DN null" > 2024-04-03T03:01:24.159-0700,INFO,[Block report > processor],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Queueing > the block blk_66092666802_65700925027 for later processing" > 2024-04-03T03:01:24.159-0700,INFO,[Block report > processor],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Found > corrupt block blk_66092666802_65700925027 [FINALIZED, COMPLETE] from DN > [DISK]DS-2:NORMAL:[DN2]:10010. Stored block blk_66092666802_65700933553 from > DN null" > 2024-04-03T03:01:24.159-0700,INFO,[Block report > processor],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Queueing > the block blk_66092666802_65700925027 for later processing" {code} > > Logs from nn2 when it transitions to Active: > {code:java} > 2024-04-03T15:39:09.050-0700,INFO,[IPC Server handler 8 on default port > 8020],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Found > corrupt block blk_66092666802_65700925027 [FINALIZED, COMPLETE] from DN > [DISK]DS-1:NORMAL:[DN1]:10010. Stored block blk_66092666802_65700933553 from > DN [DISK]DS-1:NORMAL:[DN1]:10010" > 2024-04-03T15:39:09.050-0700,INFO,[IPC Server handler 8 on default port > 8020],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Stored > Block blk_66092666802_65700933553 from the same DN > [DISK]DS-1:NORMAL:[DN1]:10010 has a newer GenStamp." > 2024-04-03T15:39:09.050-0700,INFO,[IPC Server handler 8 on default port > 8020],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Marking the > block blk_66092666802_65700925027 as corrupt" > 2024-04-03T15:39:09.050-0700,INFO,[IPC Server handler 8 on default port > 8020],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Found > corrupt block blk_66092666802_65700925027 [FINALIZED, COMPLETE] from DN > [DISK]DS-2:NORMAL:[DN2]:10010. Stored block blk_66092666802_65700933553 from > DN [DISK]DS-2:NORMAL:[DN2]:10010" > 2024-04-03T15:39:09.050-0700,INFO,[IPC Server handler 8 on default port > 8020],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Stored > Block blk_66092666802_65700933553 from the same DN > [DISK]DS-2:NORMAL:[DN2]:10010 has a newer GenStamp." > 2024-04-03T15:39:09.050-0700,INFO,[IPC Server handler 8 on default port > 8020],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Marking the > block blk_66092666802_65700925027 as corrupt" > 2024-04-03T15:39:09.050-0700,INFO,[IPC Server handler 8 on default port > 8020],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Found > corrupt block blk_66092666802_65700925027 [FINALIZED, COMPLETE] from DN > [DISK]DS-3:NORMAL:[DN3]:10010. Stored block blk_66092666802_65700933553 from > DN [DISK]DS-3:NORMAL:[DN3]:10010" > 2024-04-03T15:39:09.050-0700,INFO,[IPC Server handler 8 on default port > 8020],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Stored > Block blk_66092666802_65700933553 from the same DN > [DISK]DS-3:NORMAL:[DN3]:10010 has a newer GenStamp." > 2024-04-03T15:39:09.050-0700,INFO,[IPC Server handler 8 on default port > 8020],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Marking the > block blk_66092666802_65700925027 as corrupt" > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org