This is an automated email from the ASF dual-hosted git repository. sammichen pushed a commit to branch HDDS-1564 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-1564 by this push: new c855528 HDDS-1572 Implement a Pipeline scrubber to clean up non-OPEN pipeline. (#237) c855528 is described below commit c85552817fe62feceb3bb47b843b720903996ac8 Author: Li Cheng <bloodhell2...@gmail.com> AuthorDate: Wed Nov 27 20:18:50 2019 +0800 HDDS-1572 Implement a Pipeline scrubber to clean up non-OPEN pipeline. (#237) --- .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 7 ++++ .../apache/hadoop/hdds/scm/pipeline/Pipeline.java | 29 +++++++++++++++ hadoop-hdds/common/src/main/proto/hdds.proto | 1 + .../common/src/main/resources/ozone-default.xml | 12 ++++++ .../scm/pipeline/BackgroundPipelineCreator.java | 7 +++- .../hadoop/hdds/scm/pipeline/PipelineManager.java | 3 ++ .../hdds/scm/pipeline/SCMPipelineManager.java | 26 +++++++++++++ .../scm/safemode/HealthyPipelineSafeModeRule.java | 1 - .../scm/pipeline/MockRatisPipelineProvider.java | 28 ++++++++++++++ .../scm/pipeline/TestRatisPipelineProvider.java | 11 +++++- .../hdds/scm/pipeline/TestSCMPipelineManager.java | 43 +++++++++++++++++++++- .../hadoop/ozone/TestContainerOperations.java | 2 - 12 files changed, 163 insertions(+), 7 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 17e09c1..4066661 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -333,6 +333,13 @@ public final class ScmConfigKeys { OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY = "ozone.scm.keyvalue.container.deletion-choosing.policy"; + // Max timeout for pipeline to stay at ALLOCATED state before scrubbed. + public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT = + "ozone.scm.pipeline.allocated.timeout"; + + public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT = + "5m"; + public static final String OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT = "ozone.scm.container.creation.lease.timeout"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 47ec453..54d752f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -55,6 +55,8 @@ public final class Pipeline { private ThreadLocal<List<DatanodeDetails>> nodesInOrder = new ThreadLocal<>(); // Current reported Leader for the pipeline private UUID leaderId; + // Timestamp for pipeline upon creation + private Long creationTimestamp; /** * The immutable properties of pipeline object is used in @@ -69,6 +71,7 @@ public final class Pipeline { this.factor = factor; this.state = state; this.nodeStatus = nodeStatus; + this.creationTimestamp = System.currentTimeMillis(); } /** @@ -108,6 +111,24 @@ public final class Pipeline { } /** + * Return the creation time of pipeline. + * + * @return Creation Timestamp + */ + public Long getCreationTimestamp() { + return creationTimestamp; + } + + /** + * Set the creation timestamp. Only for protobuf now. + * + * @param creationTimestamp + */ + void setCreationTimestamp(Long creationTimestamp) { + this.creationTimestamp = creationTimestamp; + } + + /** * Return the pipeline leader's UUID. * * @return DatanodeDetails.UUID. @@ -196,6 +217,7 @@ public final class Pipeline { .setFactor(factor) .setState(PipelineState.getProtobuf(state)) .setLeaderID(leaderId != null ? leaderId.toString() : "") + .setCreationTimeStamp(creationTimestamp) .addAllMembers(nodeStatus.keySet().stream() .map(DatanodeDetails::getProtoBufMessage) .collect(Collectors.toList())); @@ -274,6 +296,7 @@ public final class Pipeline { b.append(", Type:").append(getType()); b.append(", Factor:").append(getFactor()); b.append(", State:").append(getPipelineState()); + b.append(", CreationTimestamp").append(getCreationTimestamp()); b.append("]"); return b.toString(); } @@ -298,6 +321,7 @@ public final class Pipeline { private List<Integer> nodeOrder = null; private List<DatanodeDetails> nodesInOrder = null; private UUID leaderId = null; + private Long creationTimestamp = null; public Builder() {} @@ -309,6 +333,7 @@ public final class Pipeline { this.nodeStatus = pipeline.nodeStatus; this.nodesInOrder = pipeline.nodesInOrder.get(); this.leaderId = pipeline.getLeaderId(); + this.creationTimestamp = pipeline.getCreationTimestamp(); } public Builder setId(PipelineID id1) { @@ -355,6 +380,10 @@ public final class Pipeline { Preconditions.checkNotNull(nodeStatus); Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus); pipeline.setLeaderId(leaderId); + // overwrite with original creationTimestamp + if (creationTimestamp != null) { + pipeline.setCreationTimestamp(creationTimestamp); + } if (nodeOrder != null && !nodeOrder.isEmpty()) { // This branch is for build from ProtoBuf diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index 39a01dc..b313604 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -75,6 +75,7 @@ message Pipeline { required PipelineID id = 5; optional string leaderID = 6; repeated uint32 memberOrders = 7; + optional uint64 creationTimeStamp = 8; } message KeyValue { diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 909c692..bd7dda6 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -857,6 +857,18 @@ </description> </property> <property> + <name>ozone.scm.pipeline.allocated.timeout</name> + <value>5m</value> + <tag>OZONE, SCM, PIPELINE</tag> + <description> + Timeout for every pipeline to stay in ALLOCATED stage. When pipeline is created, + it should be at OPEN stage once pipeline report is successfully received by SCM. + If a pipeline stays at ALLOCATED for too long, it should be scrubbed so that new + pipeline can be created. This timeout is for how long pipeline can stay at ALLOCATED + stage until it gets scrubbed. + </description> + </property> + <property> <name>ozone.scm.container.size</name> <value>5GB</value> <tag>OZONE, PERFORMANCE, MANAGEMENT</tag> diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java index 6952f74..9bfd87a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java @@ -91,12 +91,17 @@ class BackgroundPipelineCreator { for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor .values()) { + try { + pipelineManager.scrubPipeline(type, factor); + } catch (IOException e) { + LOG.error("Error while scrubbing pipelines {}", e); + } + while (true) { try { if (scheduler.isClosed()) { break; } - pipelineManager.createPipeline(type, factor); } catch (IOException ioe) { break; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 9ba5f31..adbe442 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -73,6 +73,9 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean { void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout) throws IOException; + void scrubPipeline(ReplicationType type, ReplicationFactor factor) + throws IOException; + void startPipelineCreator(); void triggerPipelineCreation(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index b41c595..9fd1cd0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -53,6 +53,7 @@ import java.util.Collection; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB; @@ -323,6 +324,31 @@ public class SCMPipelineManager implements PipelineManager { } @Override + public void scrubPipeline(ReplicationType type, ReplicationFactor factor) + throws IOException{ + if (type != ReplicationType.RATIS || factor != ReplicationFactor.THREE) { + // Only srub pipeline for RATIS THREE pipeline + return; + } + Long currentTime = System.currentTimeMillis(); + Long pipelineScrubTimeoutInMills = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, + ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + List<Pipeline> needToSrubPipelines = stateManager.getPipelines(type, factor, + Pipeline.PipelineState.ALLOCATED).stream() + .filter(p -> (currentTime - p.getCreationTimestamp() + >= pipelineScrubTimeoutInMills)) + .collect(Collectors.toList()); + for (Pipeline p : needToSrubPipelines) { + LOG.info("srubbing pipeline: id: " + p.getId().toString() + + " since it stays at ALLOCATED stage for " + + (currentTime - p.getCreationTimestamp())/60000 + " mins."); + finalizeAndDestroyPipeline(p, false); + } + } + + @Override public Map<String, Integer> getPipelineInfo() { final Map<String, Integer> pipelineInfo = new HashMap<>(); for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java index 3b31454..4672c23 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java @@ -147,7 +147,6 @@ public class HealthyPipelineSafeModeRule getSafeModeMetrics().incCurrentHealthyPipelinesCount(); processedPipelineIDs.add(pipelineID); } - } if (scmInSafeMode()) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java index 342ee5b..0ed3f16 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java @@ -31,10 +31,20 @@ import java.util.List; */ public class MockRatisPipelineProvider extends RatisPipelineProvider { + private boolean autoOpenPipeline; + + public MockRatisPipelineProvider(NodeManager nodeManager, + PipelineStateManager stateManager, + Configuration conf, boolean autoOpen) { + super(nodeManager, stateManager, conf, null); + autoOpenPipeline = autoOpen; + } + public MockRatisPipelineProvider(NodeManager nodeManager, PipelineStateManager stateManager, Configuration conf) { super(nodeManager, stateManager, conf, null); + autoOpenPipeline = true; } protected void initializePipeline(Pipeline pipeline) throws IOException { @@ -42,6 +52,24 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider { } @Override + public Pipeline create(HddsProtos.ReplicationFactor factor) + throws IOException { + if (autoOpenPipeline) { + return super.create(factor); + } else { + Pipeline initialPipeline = super.create(factor); + return Pipeline.newBuilder() + .setId(initialPipeline.getId()) + // overwrite pipeline state to main ALLOCATED + .setState(Pipeline.PipelineState.ALLOCATED) + .setType(initialPipeline.getType()) + .setFactor(factor) + .setNodes(initialPipeline.getNodes()) + .build(); + } + } + + @Override public void shutdown() { // Do nothing. } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java index 6f0425d..b64f338 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.node.NodeManager; @@ -53,9 +54,11 @@ public class TestRatisPipelineProvider { @Before public void init() throws Exception { nodeManager = new MockNodeManager(true, 10); - stateManager = new PipelineStateManager(new OzoneConfiguration()); + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1); + stateManager = new PipelineStateManager(conf); provider = new MockRatisPipelineProvider(nodeManager, - stateManager, new OzoneConfiguration()); + stateManager, conf); } private void createPipelineAndAssertions( @@ -63,6 +66,7 @@ public class TestRatisPipelineProvider { Pipeline pipeline = provider.create(factor); assertPipelineProperties(pipeline, factor, REPLICATION_TYPE); stateManager.addPipeline(pipeline); + nodeManager.addPipeline(pipeline); Pipeline pipeline1 = provider.create(factor); assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE); @@ -142,6 +146,8 @@ public class TestRatisPipelineProvider { // only 2 healthy DNs left that are not part of any pipeline Pipeline pipeline = provider.create(factor); assertPipelineProperties(pipeline, factor, REPLICATION_TYPE); + nodeManager.addPipeline(pipeline); + stateManager.addPipeline(pipeline); List<DatanodeDetails> nodes = pipeline.getNodes(); @@ -176,5 +182,6 @@ public class TestRatisPipelineProvider { .build(); stateManager.addPipeline(openPipeline); + nodeManager.addPipeline(openPipeline); } } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 7aba39a..0ee1721 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; @@ -28,6 +29,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; @@ -205,7 +207,7 @@ public class TestSCMPipelineManager { numPipelineCreateFailed = getLongCounter( "NumPipelineCreationFailed", metrics); Assert.assertTrue(numPipelineCreateFailed == 1); - + // clean up pipelineManager.close(); } @@ -309,6 +311,45 @@ public class TestSCMPipelineManager { pipelineManager.close(); } + @Test + public void testScrubPipeline() throws IOException { + // No timeout for pipeline scrubber. + conf.setTimeDuration( + OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1, + TimeUnit.MILLISECONDS); + + final SCMPipelineManager pipelineManager = + new SCMPipelineManager(conf, nodeManager, new EventQueue(), null); + final PipelineProvider ratisProvider = new MockRatisPipelineProvider( + nodeManager, pipelineManager.getStateManager(), conf, false); + + pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, + ratisProvider); + + Pipeline pipeline = pipelineManager + .createPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + // At this point, pipeline is not at OPEN stage. + Assert.assertEquals(pipeline.getPipelineState(), + Pipeline.PipelineState.ALLOCATED); + + // pipeline should be seen in pipelineManager as ALLOCATED. + Assert.assertTrue(pipelineManager + .getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, + Pipeline.PipelineState.ALLOCATED).contains(pipeline)); + pipelineManager.scrubPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + + // pipeline should be scrubbed. + Assert.assertFalse(pipelineManager + .getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, + Pipeline.PipelineState.ALLOCATED).contains(pipeline)); + + pipelineManager.close(); + } + private void sendPipelineReport(DatanodeDetails dn, Pipeline pipeline, PipelineReportHandler pipelineReportHandler, boolean isLeader, EventQueue eventQueue) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java index 6f347cf..50429cc 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.ozone; -import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -30,7 +29,6 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE; import static org.junit.Assert.assertEquals; /** --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org