goiri commented on code in PR #6708:
URL: https://github.com/apache/hadoop/pull/6708#discussion_r1554465107


##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java:
##########
@@ -215,4 +229,95 @@ public void testReplaceReceivedBlock() throws 
InterruptedException, IOException
       cluster = null;
     }
   }
+
+  @Test
+  public void testIBRRaceCondition() throws Exception {
+    cluster.shutdown();
+    Configuration conf = new Configuration();
+    HAUtil.setAllowStandbyReads(conf, true);
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+    cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .numDataNodes(3)
+        .build();
+    try {
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+
+      NameNode nn1 = cluster.getNameNode(0);
+      NameNode nn2 = cluster.getNameNode(1);
+      FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
+      List<InvocationOnMock> ibrsToStandby = new ArrayList<>();
+      List<DatanodeProtocolClientSideTranslatorPB> spies = new ArrayList<>();
+      Phaser ibrPhaser = new Phaser(1);
+      for (DataNode dn : cluster.getDataNodes()) {
+        DatanodeProtocolClientSideTranslatorPB nnSpy =
+            InternalDataNodeTestUtils.spyOnBposToNN(dn, nn2);
+        doAnswer((inv) -> {
+          for (StorageReceivedDeletedBlocks srdb :
+              inv.getArgument(2, StorageReceivedDeletedBlocks[].class)) {
+            for (ReceivedDeletedBlockInfo block : srdb.getBlocks()) {
+              if (block.getStatus().equals(BlockStatus.RECEIVED_BLOCK)) {
+                ibrPhaser.arriveAndDeregister();
+              }
+            }
+          }
+          ibrsToStandby.add(inv);
+          return null;
+        }).when(nnSpy).blockReceivedAndDeleted(
+            any(DatanodeRegistration.class),
+            anyString(),
+            any(StorageReceivedDeletedBlocks[].class));
+        spies.add(nnSpy);
+      }
+
+      Thread.sleep(1000);

Review Comment:
   Can we do better than sleep?



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java:
##########
@@ -95,16 +95,27 @@ void removeAllMessagesForDatanode(DatanodeDescriptor dn) {
   
   void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState) {
+    long genStamp = block.getGenerationStamp();
+    Queue<ReportedBlockInfo> queue = null;
     if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
       Block blkId = new Block(BlockIdManager.convertToStripedID(block
           .getBlockId()));
-      getBlockQueue(blkId).add(
-          new ReportedBlockInfo(storageInfo, new Block(block), reportedState));
+      queue = getBlockQueue(blkId);
     } else {
       block = new Block(block);
-      getBlockQueue(block).add(
-          new ReportedBlockInfo(storageInfo, block, reportedState));
+      queue = getBlockQueue(block);
     }
+    // We only want the latest non-future reported block to be queued for each
+    // DataNode. Otherwise, there can be a race condition that causes an old
+    // reported block to be kept in the queue until the SNN switches to ANN and
+    // the old reported block will be processed and marked as corrupt by the 
ANN.
+    // See HDFS-17453
+    int size = queue.size();
+    if (queue.removeIf(rbi -> rbi.storageInfo.equals(storageInfo) &&

Review Comment:
   We could make this more robus to nulls with:
   ```
     void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
         ReplicaState reportedState) {
       if (storageInfo == null || block == null || reportedState == null) {
           return;
       }
       ...
       if (queue.removeIf(rbi -> storageInfo.equals(rbi.storageInfo) &&
       ...
   ````



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java:
##########
@@ -215,4 +229,95 @@ public void testReplaceReceivedBlock() throws 
InterruptedException, IOException
       cluster = null;
     }
   }
+
+  @Test
+  public void testIBRRaceCondition() throws Exception {
+    cluster.shutdown();
+    Configuration conf = new Configuration();
+    HAUtil.setAllowStandbyReads(conf, true);
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+    cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .numDataNodes(3)
+        .build();
+    try {
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+
+      NameNode nn1 = cluster.getNameNode(0);
+      NameNode nn2 = cluster.getNameNode(1);
+      FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
+      List<InvocationOnMock> ibrsToStandby = new ArrayList<>();
+      List<DatanodeProtocolClientSideTranslatorPB> spies = new ArrayList<>();
+      Phaser ibrPhaser = new Phaser(1);
+      for (DataNode dn : cluster.getDataNodes()) {
+        DatanodeProtocolClientSideTranslatorPB nnSpy =
+            InternalDataNodeTestUtils.spyOnBposToNN(dn, nn2);
+        doAnswer((inv) -> {
+          for (StorageReceivedDeletedBlocks srdb :
+              inv.getArgument(2, StorageReceivedDeletedBlocks[].class)) {
+            for (ReceivedDeletedBlockInfo block : srdb.getBlocks()) {
+              if (block.getStatus().equals(BlockStatus.RECEIVED_BLOCK)) {
+                ibrPhaser.arriveAndDeregister();
+              }
+            }
+          }
+          ibrsToStandby.add(inv);
+          return null;
+        }).when(nnSpy).blockReceivedAndDeleted(
+            any(DatanodeRegistration.class),
+            anyString(),
+            any(StorageReceivedDeletedBlocks[].class));
+        spies.add(nnSpy);
+      }
+
+      Thread.sleep(1000);

Review Comment:
   waitFor() something is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to