[ 
https://issues.apache.org/jira/browse/HDFS-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17606869#comment-17606869
 ] 

ASF GitHub Bot commented on HDFS-16774:
---------------------------------------

Hexiaoqiao commented on code in PR #4903:
URL: https://github.com/apache/hadoop/pull/4903#discussion_r974869052


##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java:
##########
@@ -1830,4 +1833,92 @@ public void testTransferAndNativeCopyMetrics() throws 
IOException {
       assertEquals(3, metrics.getNativeCopyIoQuantiles().length);
     }
   }
+
+  @Test
+  public void testAysncDiskServiceDeleteReplica()
+      throws IOException, InterruptedException, TimeoutException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    // Bump up replication interval.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 
10);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+    final Semaphore semaphore = new Semaphore(0);
+    try {
+      cluster.waitActive();
+      final DataNodeFaultInjector injector = new DataNodeFaultInjector() {
+        @Override
+        public void delayDeleteReplica() {
+          // Lets wait for the remove replia process
+          try {
+            semaphore.acquire(1);
+          } catch (InterruptedException e) {
+            // ignore
+          }
+        }
+      };
+      DataNodeFaultInjector.set(injector);
+
+      // Create file.
+      Path path = new Path("/testfile");
+      DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0);
+      DFSTestUtil.waitReplication(fs, path, (short) 3);
+      LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0);
+      ExtendedBlock extendedBlock = lb.getBlock();
+      DatanodeInfo[] loc = lb.getLocations();
+      assertEquals(3, loc.length);
+
+      // DN side.
+      DataNode dn = cluster.getDataNode(loc[0].getIpcPort());
+      final FsDatasetImpl ds = (FsDatasetImpl) 
DataNodeTestUtils.getFSDataset(dn);
+      List<Block> blockList = 
com.google.common.collect.Lists.newArrayList(extendedBlock.getLocalBlock());

Review Comment:
   +1



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java:
##########
@@ -1830,4 +1833,92 @@ public void testTransferAndNativeCopyMetrics() throws 
IOException {
       assertEquals(3, metrics.getNativeCopyIoQuantiles().length);
     }
   }
+
+  @Test
+  public void testAysncDiskServiceDeleteReplica()
+      throws IOException, InterruptedException, TimeoutException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    // Bump up replication interval.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 
10);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+    final Semaphore semaphore = new Semaphore(0);
+    try {
+      cluster.waitActive();
+      final DataNodeFaultInjector injector = new DataNodeFaultInjector() {
+        @Override
+        public void delayDeleteReplica() {
+          // Lets wait for the remove replia process
+          try {
+            semaphore.acquire(1);
+          } catch (InterruptedException e) {
+            // ignore
+          }
+        }
+      };
+      DataNodeFaultInjector.set(injector);
+
+      // Create file.
+      Path path = new Path("/testfile");
+      DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0);
+      DFSTestUtil.waitReplication(fs, path, (short) 3);
+      LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0);
+      ExtendedBlock extendedBlock = lb.getBlock();
+      DatanodeInfo[] loc = lb.getLocations();
+      assertEquals(3, loc.length);
+
+      // DN side.
+      DataNode dn = cluster.getDataNode(loc[0].getIpcPort());
+      final FsDatasetImpl ds = (FsDatasetImpl) 
DataNodeTestUtils.getFSDataset(dn);
+      List<Block> blockList = 
com.google.common.collect.Lists.newArrayList(extendedBlock.getLocalBlock());
+      assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));
+      ds.invalidate(bpid, blockList.toArray(new Block[0]));
+
+      // Test get blocks and datanodes.
+      loc = DFSTestUtil.getAllBlocks(fs, path).get(0).getLocations();
+      assertEquals(3, loc.length);
+      List<String> uuids = com.google.common.collect.Lists.newArrayList();
+      for (DatanodeInfo datanodeInfo : loc) {
+        uuids.add(datanodeInfo.getDatanodeUuid());
+      }
+      assertTrue(uuids.contains(dn.getDatanodeUuid()));
+
+      // Do verification that the first replication shouldn't be deleted from 
the memory first.
+      // Because the namenode still contains this replica, so client will try 
to read it.
+      // If this replica is deleted from memory, the client would got an 
ReplicaNotFoundException.
+      assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));
+
+      // Make it resume the removeReplicaFromMem method
+      semaphore.release(1);
+
+      // Sleep for 1 second so that datanode can complete invalidate.
+      GenericTestUtils.waitFor(new com.google.common.base.Supplier<Boolean>() {
+        @Override public Boolean get() {
+          return ds.asyncDiskService.countPendingDeletions() == 0;
+        }
+      }, 100, 1000);
+
+      // Sleep for two heartbeat times (default a heartbeat interval is 3 
second).
+      Thread.sleep(6000);
+
+      // Test get blocks and datanodes again.
+      loc = DFSTestUtil.getAllBlocks(fs, path).get(0).getLocations();
+      assertEquals(2, loc.length);
+      uuids = com.google.common.collect.Lists.newArrayList();

Review Comment:
   Same as above comment.



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java:
##########
@@ -359,6 +371,89 @@ public void run() {
         IOUtils.cleanupWithLogger(null, this.volumeRef);
       }
     }
+
+    private boolean removeReplicaFromMem() {

Review Comment:
   IMO, it is more proper to move this method to class `FsDatasetImpl`, what do 
you think about?



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java:
##########
@@ -1830,4 +1833,92 @@ public void testTransferAndNativeCopyMetrics() throws 
IOException {
       assertEquals(3, metrics.getNativeCopyIoQuantiles().length);
     }
   }
+
+  @Test
+  public void testAysncDiskServiceDeleteReplica()
+      throws IOException, InterruptedException, TimeoutException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    // Bump up replication interval.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 
10);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+    final Semaphore semaphore = new Semaphore(0);
+    try {
+      cluster.waitActive();
+      final DataNodeFaultInjector injector = new DataNodeFaultInjector() {
+        @Override
+        public void delayDeleteReplica() {
+          // Lets wait for the remove replia process
+          try {
+            semaphore.acquire(1);
+          } catch (InterruptedException e) {
+            // ignore
+          }
+        }
+      };
+      DataNodeFaultInjector.set(injector);
+
+      // Create file.
+      Path path = new Path("/testfile");
+      DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0);
+      DFSTestUtil.waitReplication(fs, path, (short) 3);
+      LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0);
+      ExtendedBlock extendedBlock = lb.getBlock();
+      DatanodeInfo[] loc = lb.getLocations();
+      assertEquals(3, loc.length);
+
+      // DN side.
+      DataNode dn = cluster.getDataNode(loc[0].getIpcPort());
+      final FsDatasetImpl ds = (FsDatasetImpl) 
DataNodeTestUtils.getFSDataset(dn);
+      List<Block> blockList = 
com.google.common.collect.Lists.newArrayList(extendedBlock.getLocalBlock());
+      assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));
+      ds.invalidate(bpid, blockList.toArray(new Block[0]));
+
+      // Test get blocks and datanodes.
+      loc = DFSTestUtil.getAllBlocks(fs, path).get(0).getLocations();
+      assertEquals(3, loc.length);
+      List<String> uuids = com.google.common.collect.Lists.newArrayList();
+      for (DatanodeInfo datanodeInfo : loc) {
+        uuids.add(datanodeInfo.getDatanodeUuid());
+      }
+      assertTrue(uuids.contains(dn.getDatanodeUuid()));
+
+      // Do verification that the first replication shouldn't be deleted from 
the memory first.
+      // Because the namenode still contains this replica, so client will try 
to read it.
+      // If this replica is deleted from memory, the client would got an 
ReplicaNotFoundException.
+      assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));
+
+      // Make it resume the removeReplicaFromMem method
+      semaphore.release(1);
+
+      // Sleep for 1 second so that datanode can complete invalidate.
+      GenericTestUtils.waitFor(new com.google.common.base.Supplier<Boolean>() {
+        @Override public Boolean get() {
+          return ds.asyncDiskService.countPendingDeletions() == 0;
+        }
+      }, 100, 1000);
+
+      // Sleep for two heartbeat times (default a heartbeat interval is 3 
second).
+      Thread.sleep(6000);

Review Comment:
   Is it possible to meet some unexpected case if the default heartbeat 
interval changes?  I think we could get heartbeat config item here.



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java:
##########
@@ -2310,10 +2310,10 @@ private void invalidate(String bpid, Block[] 
invalidBlks, boolean async)
       throws IOException {
     final List<String> errors = new ArrayList<String>();
     for (int i = 0; i < invalidBlks.length; i++) {
-      final ReplicaInfo removing;
+      final ReplicaInfo info;

Review Comment:
   Is it possible to move segment L2313~L2349 to async also? Because it 
includes some IO request here. # this is not blocker issue.





> Improve async delete replica on datanode
> ----------------------------------------
>
>                 Key: HDFS-16774
>                 URL: https://issues.apache.org/jira/browse/HDFS-16774
>             Project: Hadoop HDFS
>          Issue Type: Improvement
>            Reporter: Haiyang Hu
>            Assignee: Haiyang Hu
>            Priority: Major
>              Labels: pull-request-available
>
> In our online cluster, a large number of ReplicaNotFoundExceptions occur when 
> client reads the data.
> After tracing the root cause, it is caused by the asynchronous deletion of 
> the replica operation has  many stacked pending deletion  caused 
> ReplicationNotFoundException.
> Current the asynchronous delete of the replica operation process is as 
> follows:
> 1.remove the replica from the ReplicaMap
> 2.delete the replica file on the disk [blocked in threadpool]
> 3.notifying namenode through IBR [blocked in threadpool]
> In order to avoid similar problems as much as possible, consider optimizing 
> the execution flow:
> The deleting replica from ReplicaMap, deleting replica from disk and 
> notifying namenode through IBR are processed in the same asynchronous thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to