[ https://issues.apache.org/jira/browse/HDFS-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17606869#comment-17606869 ]
ASF GitHub Bot commented on HDFS-16774: --------------------------------------- Hexiaoqiao commented on code in PR #4903: URL: https://github.com/apache/hadoop/pull/4903#discussion_r974869052 ########## hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java: ########## @@ -1830,4 +1833,92 @@ public void testTransferAndNativeCopyMetrics() throws IOException { assertEquals(3, metrics.getNativeCopyIoQuantiles().length); } } + + @Test + public void testAysncDiskServiceDeleteReplica() + throws IOException, InterruptedException, TimeoutException { + HdfsConfiguration conf = new HdfsConfiguration(); + // Bump up replication interval. + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + DistributedFileSystem fs = cluster.getFileSystem(); + String bpid = cluster.getNamesystem().getBlockPoolId(); + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); + final Semaphore semaphore = new Semaphore(0); + try { + cluster.waitActive(); + final DataNodeFaultInjector injector = new DataNodeFaultInjector() { + @Override + public void delayDeleteReplica() { + // Lets wait for the remove replia process + try { + semaphore.acquire(1); + } catch (InterruptedException e) { + // ignore + } + } + }; + DataNodeFaultInjector.set(injector); + + // Create file. + Path path = new Path("/testfile"); + DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0); + DFSTestUtil.waitReplication(fs, path, (short) 3); + LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0); + ExtendedBlock extendedBlock = lb.getBlock(); + DatanodeInfo[] loc = lb.getLocations(); + assertEquals(3, loc.length); + + // DN side. + DataNode dn = cluster.getDataNode(loc[0].getIpcPort()); + final FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn); + List<Block> blockList = com.google.common.collect.Lists.newArrayList(extendedBlock.getLocalBlock()); Review Comment: +1 ########## hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java: ########## @@ -1830,4 +1833,92 @@ public void testTransferAndNativeCopyMetrics() throws IOException { assertEquals(3, metrics.getNativeCopyIoQuantiles().length); } } + + @Test + public void testAysncDiskServiceDeleteReplica() + throws IOException, InterruptedException, TimeoutException { + HdfsConfiguration conf = new HdfsConfiguration(); + // Bump up replication interval. + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + DistributedFileSystem fs = cluster.getFileSystem(); + String bpid = cluster.getNamesystem().getBlockPoolId(); + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); + final Semaphore semaphore = new Semaphore(0); + try { + cluster.waitActive(); + final DataNodeFaultInjector injector = new DataNodeFaultInjector() { + @Override + public void delayDeleteReplica() { + // Lets wait for the remove replia process + try { + semaphore.acquire(1); + } catch (InterruptedException e) { + // ignore + } + } + }; + DataNodeFaultInjector.set(injector); + + // Create file. + Path path = new Path("/testfile"); + DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0); + DFSTestUtil.waitReplication(fs, path, (short) 3); + LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0); + ExtendedBlock extendedBlock = lb.getBlock(); + DatanodeInfo[] loc = lb.getLocations(); + assertEquals(3, loc.length); + + // DN side. + DataNode dn = cluster.getDataNode(loc[0].getIpcPort()); + final FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn); + List<Block> blockList = com.google.common.collect.Lists.newArrayList(extendedBlock.getLocalBlock()); + assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId())); + ds.invalidate(bpid, blockList.toArray(new Block[0])); + + // Test get blocks and datanodes. + loc = DFSTestUtil.getAllBlocks(fs, path).get(0).getLocations(); + assertEquals(3, loc.length); + List<String> uuids = com.google.common.collect.Lists.newArrayList(); + for (DatanodeInfo datanodeInfo : loc) { + uuids.add(datanodeInfo.getDatanodeUuid()); + } + assertTrue(uuids.contains(dn.getDatanodeUuid())); + + // Do verification that the first replication shouldn't be deleted from the memory first. + // Because the namenode still contains this replica, so client will try to read it. + // If this replica is deleted from memory, the client would got an ReplicaNotFoundException. + assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId())); + + // Make it resume the removeReplicaFromMem method + semaphore.release(1); + + // Sleep for 1 second so that datanode can complete invalidate. + GenericTestUtils.waitFor(new com.google.common.base.Supplier<Boolean>() { + @Override public Boolean get() { + return ds.asyncDiskService.countPendingDeletions() == 0; + } + }, 100, 1000); + + // Sleep for two heartbeat times (default a heartbeat interval is 3 second). + Thread.sleep(6000); + + // Test get blocks and datanodes again. + loc = DFSTestUtil.getAllBlocks(fs, path).get(0).getLocations(); + assertEquals(2, loc.length); + uuids = com.google.common.collect.Lists.newArrayList(); Review Comment: Same as above comment. ########## hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java: ########## @@ -359,6 +371,89 @@ public void run() { IOUtils.cleanupWithLogger(null, this.volumeRef); } } + + private boolean removeReplicaFromMem() { Review Comment: IMO, it is more proper to move this method to class `FsDatasetImpl`, what do you think about? ########## hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java: ########## @@ -1830,4 +1833,92 @@ public void testTransferAndNativeCopyMetrics() throws IOException { assertEquals(3, metrics.getNativeCopyIoQuantiles().length); } } + + @Test + public void testAysncDiskServiceDeleteReplica() + throws IOException, InterruptedException, TimeoutException { + HdfsConfiguration conf = new HdfsConfiguration(); + // Bump up replication interval. + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + DistributedFileSystem fs = cluster.getFileSystem(); + String bpid = cluster.getNamesystem().getBlockPoolId(); + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); + final Semaphore semaphore = new Semaphore(0); + try { + cluster.waitActive(); + final DataNodeFaultInjector injector = new DataNodeFaultInjector() { + @Override + public void delayDeleteReplica() { + // Lets wait for the remove replia process + try { + semaphore.acquire(1); + } catch (InterruptedException e) { + // ignore + } + } + }; + DataNodeFaultInjector.set(injector); + + // Create file. + Path path = new Path("/testfile"); + DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0); + DFSTestUtil.waitReplication(fs, path, (short) 3); + LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0); + ExtendedBlock extendedBlock = lb.getBlock(); + DatanodeInfo[] loc = lb.getLocations(); + assertEquals(3, loc.length); + + // DN side. + DataNode dn = cluster.getDataNode(loc[0].getIpcPort()); + final FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn); + List<Block> blockList = com.google.common.collect.Lists.newArrayList(extendedBlock.getLocalBlock()); + assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId())); + ds.invalidate(bpid, blockList.toArray(new Block[0])); + + // Test get blocks and datanodes. + loc = DFSTestUtil.getAllBlocks(fs, path).get(0).getLocations(); + assertEquals(3, loc.length); + List<String> uuids = com.google.common.collect.Lists.newArrayList(); + for (DatanodeInfo datanodeInfo : loc) { + uuids.add(datanodeInfo.getDatanodeUuid()); + } + assertTrue(uuids.contains(dn.getDatanodeUuid())); + + // Do verification that the first replication shouldn't be deleted from the memory first. + // Because the namenode still contains this replica, so client will try to read it. + // If this replica is deleted from memory, the client would got an ReplicaNotFoundException. + assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId())); + + // Make it resume the removeReplicaFromMem method + semaphore.release(1); + + // Sleep for 1 second so that datanode can complete invalidate. + GenericTestUtils.waitFor(new com.google.common.base.Supplier<Boolean>() { + @Override public Boolean get() { + return ds.asyncDiskService.countPendingDeletions() == 0; + } + }, 100, 1000); + + // Sleep for two heartbeat times (default a heartbeat interval is 3 second). + Thread.sleep(6000); Review Comment: Is it possible to meet some unexpected case if the default heartbeat interval changes? I think we could get heartbeat config item here. ########## hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java: ########## @@ -2310,10 +2310,10 @@ private void invalidate(String bpid, Block[] invalidBlks, boolean async) throws IOException { final List<String> errors = new ArrayList<String>(); for (int i = 0; i < invalidBlks.length; i++) { - final ReplicaInfo removing; + final ReplicaInfo info; Review Comment: Is it possible to move segment L2313~L2349 to async also? Because it includes some IO request here. # this is not blocker issue. > Improve async delete replica on datanode > ---------------------------------------- > > Key: HDFS-16774 > URL: https://issues.apache.org/jira/browse/HDFS-16774 > Project: Hadoop HDFS > Issue Type: Improvement > Reporter: Haiyang Hu > Assignee: Haiyang Hu > Priority: Major > Labels: pull-request-available > > In our online cluster, a large number of ReplicaNotFoundExceptions occur when > client reads the data. > After tracing the root cause, it is caused by the asynchronous deletion of > the replica operation has many stacked pending deletion caused > ReplicationNotFoundException. > Current the asynchronous delete of the replica operation process is as > follows: > 1.remove the replica from the ReplicaMap > 2.delete the replica file on the disk [blocked in threadpool] > 3.notifying namenode through IBR [blocked in threadpool] > In order to avoid similar problems as much as possible, consider optimizing > the execution flow: > The deleting replica from ReplicaMap, deleting replica from disk and > notifying namenode through IBR are processed in the same asynchronous thread. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org