[ 
https://issues.apache.org/jira/browse/HDFS-17453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834472#comment-17834472
 ] 

ASF GitHub Bot commented on HDFS-17453:
---------------------------------------

goiri commented on code in PR #6708:
URL: https://github.com/apache/hadoop/pull/6708#discussion_r1554465107


##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java:
##########
@@ -215,4 +229,95 @@ public void testReplaceReceivedBlock() throws 
InterruptedException, IOException
       cluster = null;
     }
   }
+
+  @Test
+  public void testIBRRaceCondition() throws Exception {
+    cluster.shutdown();
+    Configuration conf = new Configuration();
+    HAUtil.setAllowStandbyReads(conf, true);
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+    cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .numDataNodes(3)
+        .build();
+    try {
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+
+      NameNode nn1 = cluster.getNameNode(0);
+      NameNode nn2 = cluster.getNameNode(1);
+      FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
+      List<InvocationOnMock> ibrsToStandby = new ArrayList<>();
+      List<DatanodeProtocolClientSideTranslatorPB> spies = new ArrayList<>();
+      Phaser ibrPhaser = new Phaser(1);
+      for (DataNode dn : cluster.getDataNodes()) {
+        DatanodeProtocolClientSideTranslatorPB nnSpy =
+            InternalDataNodeTestUtils.spyOnBposToNN(dn, nn2);
+        doAnswer((inv) -> {
+          for (StorageReceivedDeletedBlocks srdb :
+              inv.getArgument(2, StorageReceivedDeletedBlocks[].class)) {
+            for (ReceivedDeletedBlockInfo block : srdb.getBlocks()) {
+              if (block.getStatus().equals(BlockStatus.RECEIVED_BLOCK)) {
+                ibrPhaser.arriveAndDeregister();
+              }
+            }
+          }
+          ibrsToStandby.add(inv);
+          return null;
+        }).when(nnSpy).blockReceivedAndDeleted(
+            any(DatanodeRegistration.class),
+            anyString(),
+            any(StorageReceivedDeletedBlocks[].class));
+        spies.add(nnSpy);
+      }
+
+      Thread.sleep(1000);

Review Comment:
   Can we do better than sleep?



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java:
##########
@@ -95,16 +95,27 @@ void removeAllMessagesForDatanode(DatanodeDescriptor dn) {
   
   void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState) {
+    long genStamp = block.getGenerationStamp();
+    Queue<ReportedBlockInfo> queue = null;
     if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
       Block blkId = new Block(BlockIdManager.convertToStripedID(block
           .getBlockId()));
-      getBlockQueue(blkId).add(
-          new ReportedBlockInfo(storageInfo, new Block(block), reportedState));
+      queue = getBlockQueue(blkId);
     } else {
       block = new Block(block);
-      getBlockQueue(block).add(
-          new ReportedBlockInfo(storageInfo, block, reportedState));
+      queue = getBlockQueue(block);
     }
+    // We only want the latest non-future reported block to be queued for each
+    // DataNode. Otherwise, there can be a race condition that causes an old
+    // reported block to be kept in the queue until the SNN switches to ANN and
+    // the old reported block will be processed and marked as corrupt by the 
ANN.
+    // See HDFS-17453
+    int size = queue.size();
+    if (queue.removeIf(rbi -> rbi.storageInfo.equals(storageInfo) &&

Review Comment:
   We could make this more robus to nulls with:
   ```
     void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
         ReplicaState reportedState) {
       if (storageInfo == null || block == null || reportedState == null) {
           return;
       }
       ...
       if (queue.removeIf(rbi -> storageInfo.equals(rbi.storageInfo) &&
       ...
   ````



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java:
##########
@@ -215,4 +229,95 @@ public void testReplaceReceivedBlock() throws 
InterruptedException, IOException
       cluster = null;
     }
   }
+
+  @Test
+  public void testIBRRaceCondition() throws Exception {
+    cluster.shutdown();
+    Configuration conf = new Configuration();
+    HAUtil.setAllowStandbyReads(conf, true);
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+    cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .numDataNodes(3)
+        .build();
+    try {
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+
+      NameNode nn1 = cluster.getNameNode(0);
+      NameNode nn2 = cluster.getNameNode(1);
+      FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
+      List<InvocationOnMock> ibrsToStandby = new ArrayList<>();
+      List<DatanodeProtocolClientSideTranslatorPB> spies = new ArrayList<>();
+      Phaser ibrPhaser = new Phaser(1);
+      for (DataNode dn : cluster.getDataNodes()) {
+        DatanodeProtocolClientSideTranslatorPB nnSpy =
+            InternalDataNodeTestUtils.spyOnBposToNN(dn, nn2);
+        doAnswer((inv) -> {
+          for (StorageReceivedDeletedBlocks srdb :
+              inv.getArgument(2, StorageReceivedDeletedBlocks[].class)) {
+            for (ReceivedDeletedBlockInfo block : srdb.getBlocks()) {
+              if (block.getStatus().equals(BlockStatus.RECEIVED_BLOCK)) {
+                ibrPhaser.arriveAndDeregister();
+              }
+            }
+          }
+          ibrsToStandby.add(inv);
+          return null;
+        }).when(nnSpy).blockReceivedAndDeleted(
+            any(DatanodeRegistration.class),
+            anyString(),
+            any(StorageReceivedDeletedBlocks[].class));
+        spies.add(nnSpy);
+      }
+
+      Thread.sleep(1000);

Review Comment:
   waitFor() something is better.





> IncrementalBlockReport can have race condition with Edit Log Tailer
> -------------------------------------------------------------------
>
>                 Key: HDFS-17453
>                 URL: https://issues.apache.org/jira/browse/HDFS-17453
>             Project: Hadoop HDFS
>          Issue Type: Bug
>          Components: auto-failover, ha, hdfs, namenode
>    Affects Versions: 3.3.0, 3.3.1, 2.10.2, 3.3.2, 3.3.5, 3.3.4, 3.3.6
>            Reporter: Danny Becker
>            Assignee: Danny Becker
>            Priority: Major
>              Labels: pull-request-available
>
> h2. Summary
> There is a race condition between IncrementalBlockReports (IBR) and 
> EditLogTailer in Standby NameNode (SNN) which can lead to leaked IBRs and 
> false corrupt blocks after HA Failover. The race condition occurs when the 
> SNN loads the edit logs before it receives the block reports from DataNode 
> (DN).
> h2. Example
> In the following example there is a block (b1) with 3 generation stamps (gs1, 
> gs2, gs3).
>  # SNN1 loads edit logs for b1gs1 and b1gs2.
>  # DN1 sends the IBR for b1gs1 to SNN1.
>  # SNN1 will determine that the reported block b1gs1 from DN1 is corrupt and 
> it will be queued for later. 
> [BlockManager.java|https://github.com/apache/hadoop/blob/6ed73896f6e8b4b7c720eff64193cb30b3e77fb2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java#L3447C1-L3464C6]
> {code:java}
>     BlockToMarkCorrupt c = checkReplicaCorrupt(
>         block, reportedState, storedBlock, ucState, dn);
>     if (c != null) {
>       if (shouldPostponeBlocksFromFuture) {
>         // If the block is an out-of-date generation stamp or state,
>         // but we're the standby, we shouldn't treat it as corrupt,
>         // but instead just queue it for later processing.
>         // Storing the reported block for later processing, as that is what
>         // comes from the IBR / FBR and hence what we should use to compare
>         // against the memory state.
>         // See HDFS-6289 and HDFS-15422 for more context.
>         queueReportedBlock(storageInfo, block, reportedState,
>             QUEUE_REASON_CORRUPT_STATE);
>       } else {
>         toCorrupt.add(c);
>       }
>       return storedBlock;
>     } {code}
>  # DN1 sends IBR for b1gs2 and b1gs3 to SNN1.
>  # SNN1 processes b1sg2 and updates the blocks map.
>  # SNN1 queues b1gs3 for later because it determines that b1gs3 is a future 
> genstamp.
>  # SNN1 loads b1gs3 edit logs and processes the queued reports for b1.
>  # SNN1 processes b1gs1 first and puts it back in the queue.
>  # SNN1 processes b1gs3 next and updates the blocks map.
>  # Later, SNN1 becomes the Active NameNode (ANN) during an HA Failover.
>  # SNN1 will catch to the latest edit logs, then process all queued block 
> reports to become the ANN.
>  # ANN1 will process b1gs1 and mark it as corrupt.
> If the example above happens for every DN which stores b1, then when the HA 
> failover happens, b1 will be incorrectly marked as corrupt. This will be 
> fixed when the first DN sends a FullBlockReport or an IBR for b1.
> h2. Logs from Active Cluster
> I added the following logs to confirm this issue in an active cluster:
> {code:java}
> BlockToMarkCorrupt c = checkReplicaCorrupt(
>     block, reportedState, storedBlock, ucState, dn);
> if (c != null) {
>   DatanodeStorageInfo storedStorageInfo = storedBlock.findStorageInfo(dn);
>   LOG.info("Found corrupt block {} [{}, {}] from DN {}. Stored block {} from 
> DN {}",
>       block, reportedState.name(), ucState.name(), storageInfo, storedBlock, 
> storedStorageInfo);
>   if (storageInfo.equals(storedStorageInfo) &&
>         storedBlock.getGenerationStamp() > block.getGenerationStamp()) {
>     LOG.info("Stored Block {} from the same DN {} has a newer GenStamp." +
>         storedBlock, storedStorageInfo);
>   }
>   if (shouldPostponeBlocksFromFuture) {
>     // If the block is an out-of-date generation stamp or state,
>     // but we're the standby, we shouldn't treat it as corrupt,
>     // but instead just queue it for later processing.
>     // Storing the reported block for later processing, as that is what
>     // comes from the IBR / FBR and hence what we should use to compare
>     // against the memory state.
>     // See HDFS-6289 and HDFS-15422 for more context.
>     queueReportedBlock(storageInfo, block, reportedState,
>         QUEUE_REASON_CORRUPT_STATE);
>     LOG.info("Queueing the block {} for later processing", block);
>   } else {
>     toCorrupt.add(c);
>     LOG.info("Marking the block {} as corrupt", block);
>   }
>   return storedBlock;
> } {code}
>  
> Logs from nn1 (Active):
> {code:java}
> 2024-04-03T03:00:52.524-0700,INFO,[IPC Server handler 6 on default port 
> 443],org.apache.hadoop.hdfs.server.namenode.FSNamesystem,"updatePipeline(blk_66092666802_65700910634,
>  newGS=65700925027, newLength=10485760, newNodes=[[DN1]:10010, [DN2]:10010, 
> [DN3]:10010, client=client1)"
> 2024-04-03T03:00:52.539-0700,INFO,[IPC Server handler 6 on default port 
> 443],org.apache.hadoop.hdfs.server.namenode.FSNamesystem,"updatePipeline(blk_66092666802_65700910634
>  => blk_66092666802_65700925027) success"
> 2024-04-03T03:01:07.413-0700,INFO,[IPC Server handler 6 on default port 
> 443],org.apache.hadoop.hdfs.server.namenode.FSNamesystem,"updatePipeline(blk_66092666802_65700925027,
>  newGS=65700933553, newLength=20971520, newNodes=[[DN1]:10010, [DN2]:10010, 
> [DN3]:10010, client=client1)"
> 2024-04-03T03:01:07.413-0700,INFO,[IPC Server handler 6 on default port 
> 443],org.apache.hadoop.hdfs.server.namenode.FSNamesystem,"updatePipeline(blk_66092666802_65700925027
>  => blk_66092666802_65700933553) success" {code}
>  
> Logs from nn2 (Standby):
> {code:java}
> 2024-04-03T03:01:23.067-0700,INFO,[Block report 
> processor],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Found 
> corrupt block blk_66092666802_65700925027 [FINALIZED, COMPLETE] from DN 
> [DISK]DS-1:NORMAL:[DN1]:10010. Stored block blk_66092666802_65700933553 from 
> DN null"
> 2024-04-03T03:01:23.067-0700,INFO,[Block report 
> processor],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Queueing
>  the block blk_66092666802_65700925027 for later processing"
> 2024-04-03T03:01:24.159-0700,INFO,[Block report 
> processor],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Found 
> corrupt block blk_66092666802_65700925027 [FINALIZED, COMPLETE] from DN 
> [DISK]DS-3:NORMAL:[DN3]:10010. Stored block blk_66092666802_65700933553 from 
> DN null"
> 2024-04-03T03:01:24.159-0700,INFO,[Block report 
> processor],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Queueing
>  the block blk_66092666802_65700925027 for later processing"
> 2024-04-03T03:01:24.159-0700,INFO,[Block report 
> processor],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Found 
> corrupt block blk_66092666802_65700925027 [FINALIZED, COMPLETE] from DN 
> [DISK]DS-2:NORMAL:[DN2]:10010. Stored block blk_66092666802_65700933553 from 
> DN null"
> 2024-04-03T03:01:24.159-0700,INFO,[Block report 
> processor],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Queueing
>  the block blk_66092666802_65700925027 for later processing" {code}
>  
> Logs from nn2 when it transitions to Active:
> {code:java}
> 2024-04-03T15:39:09.050-0700,INFO,[IPC Server handler 8 on default port 
> 8020],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Found 
> corrupt block blk_66092666802_65700925027 [FINALIZED, COMPLETE] from DN 
> [DISK]DS-1:NORMAL:[DN1]:10010. Stored block blk_66092666802_65700933553 from 
> DN [DISK]DS-1:NORMAL:[DN1]:10010"
> 2024-04-03T15:39:09.050-0700,INFO,[IPC Server handler 8 on default port 
> 8020],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Stored 
> Block blk_66092666802_65700933553 from the same DN 
> [DISK]DS-1:NORMAL:[DN1]:10010 has a newer GenStamp."
> 2024-04-03T15:39:09.050-0700,INFO,[IPC Server handler 8 on default port 
> 8020],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Marking the 
> block blk_66092666802_65700925027 as corrupt"
> 2024-04-03T15:39:09.050-0700,INFO,[IPC Server handler 8 on default port 
> 8020],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Found 
> corrupt block blk_66092666802_65700925027 [FINALIZED, COMPLETE] from DN 
> [DISK]DS-2:NORMAL:[DN2]:10010. Stored block blk_66092666802_65700933553 from 
> DN [DISK]DS-2:NORMAL:[DN2]:10010"
> 2024-04-03T15:39:09.050-0700,INFO,[IPC Server handler 8 on default port 
> 8020],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Stored 
> Block blk_66092666802_65700933553 from the same DN 
> [DISK]DS-2:NORMAL:[DN2]:10010 has a newer GenStamp."
> 2024-04-03T15:39:09.050-0700,INFO,[IPC Server handler 8 on default port 
> 8020],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Marking the 
> block blk_66092666802_65700925027 as corrupt"
> 2024-04-03T15:39:09.050-0700,INFO,[IPC Server handler 8 on default port 
> 8020],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Found 
> corrupt block blk_66092666802_65700925027 [FINALIZED, COMPLETE] from DN 
> [DISK]DS-3:NORMAL:[DN3]:10010. Stored block blk_66092666802_65700933553 from 
> DN [DISK]DS-3:NORMAL:[DN3]:10010"
> 2024-04-03T15:39:09.050-0700,INFO,[IPC Server handler 8 on default port 
> 8020],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Stored 
> Block blk_66092666802_65700933553 from the same DN 
> [DISK]DS-3:NORMAL:[DN3]:10010 has a newer GenStamp."
> 2024-04-03T15:39:09.050-0700,INFO,[IPC Server handler 8 on default port 
> 8020],org.apache.hadoop.hdfs.server.blockmanagement.BlockManager,"Marking the 
> block blk_66092666802_65700925027 as corrupt"
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to