This is an automated email from the ASF dual-hosted git repository. captainzmc pushed a commit to branch ozone-1.3 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 36490993db7fd123fe29e63848da6d4185aa7cbf Author: Stephen O'Donnell <[email protected]> AuthorDate: Wed Oct 19 09:53:30 2022 +0100 HDDS-7341. EC: Close pipelines with unregistered nodes (#3850) --- .../hdds/scm/pipeline/PipelineManagerImpl.java | 40 ++++++++++++++++++--- .../hdds/scm/pipeline/TestPipelineManagerImpl.java | 41 +++++++++++++++++----- 2 files changed, 68 insertions(+), 13 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index aaa3088f9a..044cbc0166 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@ -509,22 +509,52 @@ public class PipelineManagerImpl implements PipelineManager { if (p.getPipelineState() == Pipeline.PipelineState.ALLOCATED && (currentTime.toEpochMilli() - p.getCreationTimestamp() .toEpochMilli() >= pipelineScrubTimeoutInMills)) { - LOG.info("Scrubbing pipeline: id: " + p.getId().toString() + - " since it stays at ALLOCATED stage for " + + LOG.info("Scrubbing pipeline: id: {} since it stays at ALLOCATED " + + "stage for {} mins.", p.getId(), Duration.between(currentTime, p.getCreationTimestamp()) - .toMinutes() + " mins."); + .toMinutes()); closePipeline(p, false); } // scrub pipelines who stay CLOSED for too long. if (p.getPipelineState() == Pipeline.PipelineState.CLOSED) { - LOG.info("Scrubbing pipeline: id: " + p.getId().toString() + - " since it stays at CLOSED stage."); + LOG.info("Scrubbing pipeline: id: {} since it stays at CLOSED stage.", + p.getId()); closeContainersForPipeline(p.getId()); removePipeline(p); } + // If a datanode is stopped and then SCM is restarted, a pipeline can get + // stuck in an open state. For Ratis, provided some other DNs that were + // part of the open pipeline register to SCM after the restart, the Ratis + // pipeline close will get triggered by the DNs. For EC that will never + // happen, as the DNs are not aware of the pipeline. Therefore we should + // close any pipelines in the scrubber if they have nodes which are not + // registered + if (isOpenWithUnregisteredNodes(p)) { + LOG.info("Scrubbing pipeline: id: {} as it has unregistered nodes", + p.getId()); + closeContainersForPipeline(p.getId()); + closePipeline(p, true); + } } } + /** + * @param pipeline The pipeline to check + * @return True if the pipeline is open and contains unregistered nodes. False + * otherwise. + */ + private boolean isOpenWithUnregisteredNodes(Pipeline pipeline) { + if (!pipeline.isOpen()) { + return false; + } + for (DatanodeDetails dn : pipeline.getNodes()) { + if (nodeManager.getNodeByUuid(dn.getUuidString()) == null) { + return true; + } + } + return false; + } + /** * Schedules a fixed interval job to create pipelines. */ diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java index 8fa45f5842..1ed9b845ac 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import com.google.common.base.Supplier; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -29,12 +30,14 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBuffer; import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBufferStub; @@ -125,6 +128,10 @@ public class TestPipelineManagerImpl { GenericTestUtils.getRandomizedTempPath()); scm = HddsTestUtils.getScm(conf); conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); + // Mock Node Manager is not able to correctly set up things for the EC + // placement policy (Rack Scatter), so just use the random one. + conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_EC_IMPL_KEY, + SCMContainerPlacementRandom.class.getName()); dbStore = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition()); nodeManager = new MockNodeManager(true, 20); maxPipelineCount = nodeManager.getNodeCount( @@ -151,7 +158,7 @@ public class TestPipelineManagerImpl { throws IOException { return PipelineManagerImpl.newPipelineManager(conf, SCMHAManagerStub.getInstance(isLeader), - new MockNodeManager(true, 20), + nodeManager, SCMDBDefinition.PIPELINES.getTable(dbStore), new EventQueue(), scmContext, @@ -163,7 +170,7 @@ public class TestPipelineManagerImpl { boolean isLeader, SCMHADBTransactionBuffer buffer) throws IOException { return PipelineManagerImpl.newPipelineManager(conf, SCMHAManagerStub.getInstance(isLeader, buffer), - new MockNodeManager(true, 20), + nodeManager, SCMDBDefinition.PIPELINES.getTable(dbStore), new EventQueue(), SCMContext.emptyContext(), @@ -341,7 +348,6 @@ public class TestPipelineManagerImpl { @Test public void testRemovePipeline() throws Exception { PipelineManagerImpl pipelineManager = createPipelineManager(true); - pipelineManager.setScmContext(scmContext); // Create a pipeline Pipeline pipeline = pipelineManager.createPipeline( RatisReplicationConfig.getInstance(ReplicationFactor.THREE)); @@ -391,7 +397,6 @@ public class TestPipelineManagerImpl { @Test public void testClosePipelineShouldFailOnFollower() throws Exception { PipelineManagerImpl pipelineManager = createPipelineManager(true); - pipelineManager.setScmContext(scmContext); Pipeline pipeline = pipelineManager.createPipeline( RatisReplicationConfig.getInstance(ReplicationFactor.THREE)); Assertions.assertEquals(1, pipelineManager.getPipelines().size()); @@ -413,7 +418,6 @@ public class TestPipelineManagerImpl { @Test public void testPipelineReport() throws Exception { PipelineManagerImpl pipelineManager = createPipelineManager(true); - pipelineManager.setScmContext(scmContext); SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(conf, new ArrayList<>(), null, pipelineManager, new EventQueue(), serviceManager, scmContext); @@ -571,7 +575,6 @@ public class TestPipelineManagerImpl { OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, 50, TimeUnit.SECONDS); PipelineManagerImpl pipelineManager = createPipelineManager(true); - pipelineManager.setScmContext(scmContext); Pipeline allocatedPipeline = pipelineManager .createPipeline(RatisReplicationConfig .getInstance(ReplicationFactor.THREE)); @@ -628,13 +631,36 @@ public class TestPipelineManagerImpl { pipelineManager.close(); } + @Test + public void testScrubOpenWithUnregisteredNodes() throws Exception { + PipelineManagerImpl pipelineManager = createPipelineManager(true); + Pipeline pipeline = pipelineManager + .createPipeline(new ECReplicationConfig(3, 2)); + pipelineManager.openPipeline(pipeline.getId()); + + // Scrubbing the pipelines should not affect this pipeline + pipelineManager.scrubPipelines(); + pipeline = pipelineManager.getPipeline(pipeline.getId()); + Assertions.assertEquals(Pipeline.PipelineState.OPEN, + pipeline.getPipelineState()); + + // Now, "unregister" one of the nodes in the pipeline + DatanodeDetails firstDN = nodeManager.getNodeByUuid( + pipeline.getNodes().get(0).getUuidString()); + nodeManager.getClusterNetworkTopologyMap().remove(firstDN); + + pipelineManager.scrubPipelines(); + pipeline = pipelineManager.getPipeline(pipeline.getId()); + Assertions.assertEquals(Pipeline.PipelineState.CLOSED, + pipeline.getPipelineState()); + } + @Test public void testScrubPipelinesShouldFailOnFollower() throws Exception { conf.setTimeDuration( OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, 10, TimeUnit.SECONDS); PipelineManagerImpl pipelineManager = createPipelineManager(true); - pipelineManager.setScmContext(scmContext); Pipeline pipeline = pipelineManager .createPipeline(RatisReplicationConfig .getInstance(ReplicationFactor.THREE)); @@ -765,7 +791,6 @@ public class TestPipelineManagerImpl { GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer .captureLogs(LoggerFactory.getLogger(PipelineManagerImpl.class)); PipelineManagerImpl pipelineManager = createPipelineManager(true); - pipelineManager.setScmContext(scmContext); Pipeline pipeline = pipelineManager.createPipeline( RatisReplicationConfig .getInstance(HddsProtos.ReplicationFactor.THREE)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
