This is an automated email from the ASF dual-hosted git repository. sammichen pushed a commit to branch HDDS-1564 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 2a3638dddd7239b5f9407eb4ed9d53b6c3fa5310 Author: Li Cheng <bloodhell2...@gmail.com> AuthorDate: Wed Nov 27 20:18:50 2019 +0800 HDDS-1572 Implement a Pipeline scrubber to clean up non-OPEN pipeline. (#237) --- .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 7 ++++ .../apache/hadoop/hdds/scm/pipeline/Pipeline.java | 31 +++++++++++++++- hadoop-hdds/common/src/main/proto/hdds.proto | 1 + .../common/src/main/resources/ozone-default.xml | 12 ++++++ .../scm/pipeline/BackgroundPipelineCreator.java | 8 +++- .../hadoop/hdds/scm/pipeline/PipelineManager.java | 3 ++ .../hdds/scm/pipeline/SCMPipelineManager.java | 26 +++++++++++++ .../scm/pipeline/MockRatisPipelineProvider.java | 28 ++++++++++++++ .../scm/pipeline/TestRatisPipelineProvider.java | 12 +++++- .../hdds/scm/pipeline/TestSCMPipelineManager.java | 43 +++++++++++++++++++++- 10 files changed, 166 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 96a3f5d..0049a94 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -333,6 +333,13 @@ public final class ScmConfigKeys { OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY = "ozone.scm.keyvalue.container.deletion-choosing.policy"; + // Max timeout for pipeline to stay at ALLOCATED state before scrubbed. + public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT = + "ozone.scm.pipeline.allocated.timeout"; + + public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT = + "5m"; + public static final String OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT = "ozone.scm.container.creation.lease.timeout"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 66b1efa..21aee42 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -55,6 +55,8 @@ public final class Pipeline { private ThreadLocal<List<DatanodeDetails>> nodesInOrder = new ThreadLocal<>(); // Current reported Leader for the pipeline private UUID leaderId; + // Timestamp for pipeline upon creation + private Long creationTimestamp; /** * The immutable properties of pipeline object is used in @@ -69,6 +71,7 @@ public final class Pipeline { this.factor = factor; this.state = state; this.nodeStatus = nodeStatus; + this.creationTimestamp = System.currentTimeMillis(); } /** @@ -108,6 +111,24 @@ public final class Pipeline { } /** + * Return the creation time of pipeline. + * + * @return Creation Timestamp + */ + public Long getCreationTimestamp() { + return creationTimestamp; + } + + /** + * Set the creation timestamp. Only for protobuf now. + * + * @param creationTimestamp + */ + void setCreationTimestamp(Long creationTimestamp) { + this.creationTimestamp = creationTimestamp; + } + + /** * Return the pipeline leader's UUID. * * @return DatanodeDetails.UUID. @@ -201,6 +222,7 @@ public final class Pipeline { .setFactor(factor) .setState(PipelineState.getProtobuf(state)) .setLeaderID(leaderId != null ? leaderId.toString() : "") + .setCreationTimeStamp(creationTimestamp) .addAllMembers(nodeStatus.keySet().stream() .map(DatanodeDetails::getProtoBufMessage) .collect(Collectors.toList())); @@ -280,7 +302,8 @@ public final class Pipeline { b.append(", Factor:").append(getFactor()); b.append(", State:").append(getPipelineState()); b.append(", leaderId:").append(getLeaderId()); - b.append(" ]"); + b.append(", CreationTimestamp").append(getCreationTimestamp()); + b.append("]"); return b.toString(); } @@ -304,6 +327,7 @@ public final class Pipeline { private List<Integer> nodeOrder = null; private List<DatanodeDetails> nodesInOrder = null; private UUID leaderId = null; + private Long creationTimestamp = null; public Builder() {} @@ -315,6 +339,7 @@ public final class Pipeline { this.nodeStatus = pipeline.nodeStatus; this.nodesInOrder = pipeline.nodesInOrder.get(); this.leaderId = pipeline.getLeaderId(); + this.creationTimestamp = pipeline.getCreationTimestamp(); } public Builder setId(PipelineID id1) { @@ -361,6 +386,10 @@ public final class Pipeline { Preconditions.checkNotNull(nodeStatus); Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus); pipeline.setLeaderId(leaderId); + // overwrite with original creationTimestamp + if (creationTimestamp != null) { + pipeline.setCreationTimestamp(creationTimestamp); + } if (nodeOrder != null && !nodeOrder.isEmpty()) { // This branch is for build from ProtoBuf diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index 39a01dc..b313604 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -75,6 +75,7 @@ message Pipeline { required PipelineID id = 5; optional string leaderID = 6; repeated uint32 memberOrders = 7; + optional uint64 creationTimeStamp = 8; } message KeyValue { diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 300955f..ded565c 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -848,6 +848,18 @@ </description> </property> <property> + <name>ozone.scm.pipeline.allocated.timeout</name> + <value>5m</value> + <tag>OZONE, SCM, PIPELINE</tag> + <description> + Timeout for every pipeline to stay in ALLOCATED stage. When pipeline is created, + it should be at OPEN stage once pipeline report is successfully received by SCM. + If a pipeline stays at ALLOCATED for too long, it should be scrubbed so that new + pipeline can be created. This timeout is for how long pipeline can stay at ALLOCATED + stage until it gets scrubbed. + </description> + </property> + <property> <name>ozone.scm.container.size</name> <value>5GB</value> <tag>OZONE, PERFORMANCE, MANAGEMENT</tag> diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java index a4b3f85..927fe4f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java @@ -110,12 +110,18 @@ class BackgroundPipelineCreator { // Skip this iteration for creating pipeline continue; } + + try { + pipelineManager.scrubPipeline(type, factor); + } catch (IOException e) { + LOG.error("Error while scrubbing pipelines {}", e); + } + while (true) { try { if (scheduler.isClosed()) { break; } - pipelineManager.createPipeline(type, factor); } catch (IOException ioe) { break; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 779008f..44432d9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -75,6 +75,9 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean { void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout) throws IOException; + void scrubPipeline(ReplicationType type, ReplicationFactor factor) + throws IOException; + void startPipelineCreator(); void triggerPipelineCreation(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index f44962e..e89206d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -54,6 +54,7 @@ import java.util.Collection; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB; @@ -341,6 +342,31 @@ public class SCMPipelineManager implements PipelineManager { } @Override + public void scrubPipeline(ReplicationType type, ReplicationFactor factor) + throws IOException{ + if (type != ReplicationType.RATIS || factor != ReplicationFactor.THREE) { + // Only srub pipeline for RATIS THREE pipeline + return; + } + Long currentTime = System.currentTimeMillis(); + Long pipelineScrubTimeoutInMills = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, + ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + List<Pipeline> needToSrubPipelines = stateManager.getPipelines(type, factor, + Pipeline.PipelineState.ALLOCATED).stream() + .filter(p -> (currentTime - p.getCreationTimestamp() + >= pipelineScrubTimeoutInMills)) + .collect(Collectors.toList()); + for (Pipeline p : needToSrubPipelines) { + LOG.info("srubbing pipeline: id: " + p.getId().toString() + + " since it stays at ALLOCATED stage for " + + (currentTime - p.getCreationTimestamp())/60000 + " mins."); + finalizeAndDestroyPipeline(p, false); + } + } + + @Override public Map<String, Integer> getPipelineInfo() { final Map<String, Integer> pipelineInfo = new HashMap<>(); for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java index 25b0adc..7513cad 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java @@ -33,6 +33,15 @@ import java.util.List; */ public class MockRatisPipelineProvider extends RatisPipelineProvider { + private boolean autoOpenPipeline; + + public MockRatisPipelineProvider(NodeManager nodeManager, + PipelineStateManager stateManager, + Configuration conf, boolean autoOpen) { + super(nodeManager, stateManager, conf, null); + autoOpenPipeline = autoOpen; + } + public MockRatisPipelineProvider(NodeManager nodeManager, PipelineStateManager stateManager, Configuration conf) { @@ -43,6 +52,7 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider { PipelineStateManager stateManager, Configuration conf, EventPublisher eventPublisher) { super(nodeManager, stateManager, conf, eventPublisher); + autoOpenPipeline = true; } protected void initializePipeline(Pipeline pipeline) throws IOException { @@ -50,6 +60,24 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider { } @Override + public Pipeline create(HddsProtos.ReplicationFactor factor) + throws IOException { + if (autoOpenPipeline) { + return super.create(factor); + } else { + Pipeline initialPipeline = super.create(factor); + return Pipeline.newBuilder() + .setId(initialPipeline.getId()) + // overwrite pipeline state to main ALLOCATED + .setState(Pipeline.PipelineState.ALLOCATED) + .setType(initialPipeline.getType()) + .setFactor(factor) + .setNodes(initialPipeline.getNodes()) + .build(); + } + } + + @Override public void shutdown() { // Do nothing. } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java index 065b08b..cf43acc 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.node.NodeManager; @@ -53,9 +54,11 @@ public class TestRatisPipelineProvider { @Before public void init() throws Exception { nodeManager = new MockNodeManager(true, 10); - stateManager = new PipelineStateManager(new OzoneConfiguration()); + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1); + stateManager = new PipelineStateManager(conf); provider = new MockRatisPipelineProvider(nodeManager, - stateManager, new OzoneConfiguration()); + stateManager, conf); } private void createPipelineAndAssertions( @@ -64,6 +67,7 @@ public class TestRatisPipelineProvider { assertPipelineProperties(pipeline, factor, REPLICATION_TYPE, Pipeline.PipelineState.ALLOCATED); stateManager.addPipeline(pipeline); + nodeManager.addPipeline(pipeline); Pipeline pipeline1 = provider.create(factor); assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE, @@ -149,6 +153,9 @@ public class TestRatisPipelineProvider { Pipeline pipeline = provider.create(factor); assertPipelineProperties(pipeline, factor, REPLICATION_TYPE, Pipeline.PipelineState.ALLOCATED); + nodeManager.addPipeline(pipeline); + stateManager.addPipeline(pipeline); + List<DatanodeDetails> nodes = pipeline.getNodes(); @@ -184,5 +191,6 @@ public class TestRatisPipelineProvider { .build(); stateManager.addPipeline(openPipeline); + nodeManager.addPipeline(openPipeline); } } \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 08f5185..2df851d 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; @@ -28,6 +29,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; @@ -270,7 +272,7 @@ public class TestSCMPipelineManager { numPipelineCreateFailed = getLongCounter( "NumPipelineCreationFailed", metrics); Assert.assertTrue(numPipelineCreateFailed == 1); - + // clean up pipelineManager.close(); } @@ -374,6 +376,45 @@ public class TestSCMPipelineManager { pipelineManager.close(); } + @Test + public void testScrubPipeline() throws IOException { + // No timeout for pipeline scrubber. + conf.setTimeDuration( + OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1, + TimeUnit.MILLISECONDS); + + final SCMPipelineManager pipelineManager = + new SCMPipelineManager(conf, nodeManager, new EventQueue(), null); + final PipelineProvider ratisProvider = new MockRatisPipelineProvider( + nodeManager, pipelineManager.getStateManager(), conf, false); + + pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, + ratisProvider); + + Pipeline pipeline = pipelineManager + .createPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + // At this point, pipeline is not at OPEN stage. + Assert.assertEquals(pipeline.getPipelineState(), + Pipeline.PipelineState.ALLOCATED); + + // pipeline should be seen in pipelineManager as ALLOCATED. + Assert.assertTrue(pipelineManager + .getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, + Pipeline.PipelineState.ALLOCATED).contains(pipeline)); + pipelineManager.scrubPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + + // pipeline should be scrubbed. + Assert.assertFalse(pipelineManager + .getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, + Pipeline.PipelineState.ALLOCATED).contains(pipeline)); + + pipelineManager.close(); + } + private void sendPipelineReport(DatanodeDetails dn, Pipeline pipeline, PipelineReportHandler pipelineReportHandler, boolean isLeader, EventQueue eventQueue) { --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org