This is an automated email from the ASF dual-hosted git repository. licheng pushed a commit to branch HDDS-2823 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push: new bc68c48 HDDS-3693 Switch to new StateManager interface. (#1007) bc68c48 is described below commit bc68c4843a84b9738c8f3e1bd3c47eaf47d4f12f Author: Li Cheng <bloodhell2...@gmail.com> AuthorDate: Wed Jun 3 11:34:21 2020 +0800 HDDS-3693 Switch to new StateManager interface. (#1007) Co-authored-by: Li Cheng <timmych...@tencent.com> --- .../hadoop/hdds/scm/pipeline/PipelineFactory.java | 2 +- .../hdds/scm/pipeline/PipelineManagerV2Impl.java | 22 ++++-- .../hdds/scm/pipeline/PipelinePlacementPolicy.java | 5 +- .../hadoop/hdds/scm/pipeline/PipelineProvider.java | 6 +- .../hdds/scm/pipeline/PipelineStateManager.java | 80 ++++++++++++++++---- .../scm/pipeline/PipelineStateManagerV2Impl.java | 52 +++++++++++-- .../hdds/scm/pipeline/RatisPipelineProvider.java | 4 +- .../hdds/scm/pipeline/RatisPipelineUtils.java | 2 +- .../hdds/scm/pipeline/SCMPipelineManager.java | 4 +- .../hdds/scm/pipeline/SimplePipelineProvider.java | 2 +- ...pelineStateManagerV2.java => StateManager.java} | 23 +++++- .../scm/pipeline/MockRatisPipelineProvider.java | 20 ++--- .../hdds/scm/pipeline/TestPipelieManagerImpl.java | 87 ++++++++++++++++++++++ .../TestPipelineDatanodesIntersection.java | 2 +- .../scm/pipeline/TestPipelineStateManager.java | 2 +- .../ozone/recon/scm/ReconPipelineManager.java | 3 +- 16 files changed, 262 insertions(+), 54 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java index e1cf382..bdd5053 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java @@ -39,7 +39,7 @@ public class PipelineFactory { private Map<ReplicationType, PipelineProvider> providers; - PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager, + PipelineFactory(NodeManager nodeManager, StateManager stateManager, ConfigurationSource conf, EventPublisher eventPublisher) { providers = new HashMap<>(); providers.put(ReplicationType.STAND_ALONE, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java index a6a3249..f451000 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java @@ -58,13 +58,13 @@ import java.util.stream.Collectors; * All the write operations for pipelines must come via PipelineManager. * It synchronises all write and read operations via a ReadWriteLock. */ -public class PipelineManagerV2Impl implements PipelineManager { +public final class PipelineManagerV2Impl implements PipelineManager { private static final Logger LOG = LoggerFactory.getLogger(SCMPipelineManager.class); private final ReadWriteLock lock; private PipelineFactory pipelineFactory; - private PipelineStateManagerV2 stateManager; + private StateManager stateManager; private Scheduler scheduler; private BackgroundPipelineCreator backgroundPipelineCreator; private final ConfigurationSource conf; @@ -77,9 +77,9 @@ public class PipelineManagerV2Impl implements PipelineManager { // to prevent pipelines being created until sufficient nodes have registered. private final AtomicBoolean pipelineCreationAllowed; - public PipelineManagerV2Impl(ConfigurationSource conf, + private PipelineManagerV2Impl(ConfigurationSource conf, NodeManager nodeManager, - PipelineStateManagerV2 pipelineStateManager, + StateManager pipelineStateManager, PipelineFactory pipelineFactory) { this.lock = new ReentrantReadWriteLock(); this.pipelineFactory = pipelineFactory; @@ -100,17 +100,20 @@ public class PipelineManagerV2Impl implements PipelineManager { this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get()); } - public static PipelineManager newPipelineManager( + public static PipelineManagerV2Impl newPipelineManager( ConfigurationSource conf, SCMHAManager scmhaManager, NodeManager nodeManager, Table<PipelineID, Pipeline> pipelineStore, - PipelineFactory pipelineFactory) throws IOException { + EventPublisher eventPublisher) throws IOException { // Create PipelineStateManager - PipelineStateManagerV2 stateManager = PipelineStateManagerV2Impl + StateManager stateManager = PipelineStateManagerV2Impl .newBuilder().setPipelineStore(pipelineStore) .setRatisServer(scmhaManager.getRatisServer()) .setNodeManager(nodeManager) .build(); + // Create PipelineFactory + PipelineFactory pipelineFactory = new PipelineFactory( + nodeManager, stateManager, conf, eventPublisher); // Create PipelineManager PipelineManagerV2Impl pipelineManager = new PipelineManagerV2Impl(conf, nodeManager, stateManager, pipelineFactory); @@ -572,6 +575,11 @@ public class PipelineManagerV2Impl implements PipelineManager { return pipelineCreationAllowed.get(); } + @VisibleForTesting + public void allowPipelineCreation() { + this.pipelineCreationAllowed.set(true); + } + private void setBackgroundPipelineCreator( BackgroundPipelineCreator backgroundPipelineCreator) { this.backgroundPipelineCreator = backgroundPipelineCreator; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java index 524b5ec..6573271 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java @@ -51,7 +51,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { static final Logger LOG = LoggerFactory.getLogger(PipelinePlacementPolicy.class); private final NodeManager nodeManager; - private final PipelineStateManager stateManager; + private final StateManager stateManager; private final ConfigurationSource conf; private final int heavyNodeCriteria; private static final int REQUIRED_RACKS = 2; @@ -65,7 +65,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { * @param conf Configuration */ public PipelinePlacementPolicy(final NodeManager nodeManager, - final PipelineStateManager stateManager, final ConfigurationSource conf) { + final StateManager stateManager, + final ConfigurationSource conf) { super(nodeManager, conf); this.nodeManager = nodeManager; this.conf = conf; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java index 533f77e..576d415 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java @@ -37,10 +37,10 @@ import org.apache.hadoop.hdds.scm.node.NodeManager; public abstract class PipelineProvider { private final NodeManager nodeManager; - private final PipelineStateManager stateManager; + private final StateManager stateManager; public PipelineProvider(NodeManager nodeManager, - PipelineStateManager stateManager) { + StateManager stateManager) { this.nodeManager = nodeManager; this.stateManager = stateManager; } @@ -54,7 +54,7 @@ public abstract class PipelineProvider { return nodeManager; } - public PipelineStateManager getPipelineStateManager() { + public StateManager getPipelineStateManager() { return stateManager; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java index bb56a03..de6f186 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -38,7 +39,7 @@ import java.util.NavigableSet; * state. All the read and write operations in PipelineStateMap are protected * by a read write lock. */ -public class PipelineStateManager { +public class PipelineStateManager implements StateManager { private static final Logger LOG = LoggerFactory.getLogger(PipelineStateManager.class); @@ -48,71 +49,90 @@ public class PipelineStateManager { public PipelineStateManager() { this.pipelineStateMap = new PipelineStateMap(); } - + @Override public void addPipeline(Pipeline pipeline) throws IOException { pipelineStateMap.addPipeline(pipeline); LOG.info("Created pipeline {}", pipeline); } - void addContainerToPipeline(PipelineID pipelineId, ContainerID containerID) + @Override + public void addContainerToPipeline(PipelineID pipelineId, + ContainerID containerID) throws IOException { pipelineStateMap.addContainerToPipeline(pipelineId, containerID); } - Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException { + @Override + public Pipeline getPipeline(PipelineID pipelineID) + throws PipelineNotFoundException { return pipelineStateMap.getPipeline(pipelineID); } + @Override public List<Pipeline> getPipelines() { return pipelineStateMap.getPipelines(); } - List<Pipeline> getPipelines(ReplicationType type) { + @Override + public List<Pipeline> getPipelines(ReplicationType type) { return pipelineStateMap.getPipelines(type); } - List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor) { + @Override + public List<Pipeline> getPipelines(ReplicationType type, + ReplicationFactor factor) { return pipelineStateMap.getPipelines(type, factor); } - List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor, + @Override + public List<Pipeline> getPipelines(ReplicationType type, + ReplicationFactor factor, PipelineState state) { return pipelineStateMap.getPipelines(type, factor, state); } - List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor, + @Override + public List<Pipeline> getPipelines( + ReplicationType type, ReplicationFactor factor, PipelineState state, Collection<DatanodeDetails> excludeDns, Collection<PipelineID> excludePipelines) { return pipelineStateMap .getPipelines(type, factor, state, excludeDns, excludePipelines); } - List<Pipeline> getPipelines(ReplicationType type, PipelineState... states) { + @Override + public List<Pipeline> getPipelines(ReplicationType type, + PipelineState... states) { return pipelineStateMap.getPipelines(type, states); } - NavigableSet<ContainerID> getContainers(PipelineID pipelineID) + @Override + public NavigableSet<ContainerID> getContainers(PipelineID pipelineID) throws IOException { return pipelineStateMap.getContainers(pipelineID); } - int getNumberOfContainers(PipelineID pipelineID) throws IOException { + @Override + public int getNumberOfContainers(PipelineID pipelineID) throws IOException { return pipelineStateMap.getNumberOfContainers(pipelineID); } - Pipeline removePipeline(PipelineID pipelineID) throws IOException { + @Override + public Pipeline removePipeline(PipelineID pipelineID) throws IOException { Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID); LOG.info("Pipeline {} removed from db", pipeline); return pipeline; } - void removeContainerFromPipeline(PipelineID pipelineID, + @Override + public void removeContainerFromPipeline(PipelineID pipelineID, ContainerID containerID) throws IOException { pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID); } - Pipeline finalizePipeline(PipelineID pipelineId) - throws PipelineNotFoundException { + @Override + public Pipeline finalizePipeline(PipelineID pipelineId) + throws IOException { Pipeline pipeline = pipelineStateMap.getPipeline(pipelineId); if (!pipeline.isClosed()) { pipeline = pipelineStateMap @@ -122,7 +142,8 @@ public class PipelineStateManager { return pipeline; } - Pipeline openPipeline(PipelineID pipelineId) throws IOException { + @Override + public Pipeline openPipeline(PipelineID pipelineId) throws IOException { Pipeline pipeline = pipelineStateMap.getPipeline(pipelineId); if (pipeline.isClosed()) { throw new IOException("Closed pipeline can not be opened"); @@ -141,6 +162,7 @@ public class PipelineStateManager { * @param pipelineID ID of the pipeline to activate. * @throws IOException in case of any Exception */ + @Override public void activatePipeline(PipelineID pipelineID) throws IOException { pipelineStateMap @@ -153,14 +175,40 @@ public class PipelineStateManager { * @param pipelineID ID of the pipeline to deactivate. * @throws IOException in case of any Exception */ + @Override public void deactivatePipeline(PipelineID pipelineID) throws IOException { pipelineStateMap .updatePipelineState(pipelineID, PipelineState.DORMANT); } + @Override public void updatePipelineState(PipelineID id, PipelineState newState) throws PipelineNotFoundException { pipelineStateMap.updatePipelineState(id, newState); } + + @Override + public void addPipeline(HddsProtos.Pipeline pipelineProto) + throws IOException { + throw new IOException("Not supported."); + } + + @Override + public void removePipeline(HddsProtos.PipelineID pipelineIDProto) + throws IOException { + throw new IOException("Not supported."); + } + + @Override + public void updatePipelineState(HddsProtos.PipelineID pipelineIDProto, + HddsProtos.PipelineState newState) + throws IOException { + throw new IOException("Not supported."); + } + + @Override + public void close() { + // Do nothing + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java index c74dc86..703cdec 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java @@ -42,7 +42,7 @@ import java.util.NavigableSet; * state. All the read and write operations in PipelineStateMap are protected * by a read write lock. */ -public class PipelineStateManagerV2Impl implements PipelineStateManagerV2 { +public class PipelineStateManagerV2Impl implements StateManager { private static final Logger LOG = LoggerFactory.getLogger(PipelineStateManager.class); @@ -180,6 +180,48 @@ public class PipelineStateManagerV2Impl implements PipelineStateManagerV2 { pipelineStore.close(); } + // TODO Remove legacy + @Override + public void addPipeline(Pipeline pipeline) throws IOException { + throw new IOException("Not supported."); + } + + @Override + public Pipeline removePipeline(PipelineID pipelineID) throws IOException { + throw new IOException("Not supported."); + } + + @Override + public void updatePipelineState(PipelineID id, + Pipeline.PipelineState newState) + throws IOException { + throw new IOException("Not supported."); + } + + @Override + public Pipeline finalizePipeline(PipelineID pipelineId) + throws IOException { + throw new IOException("Not supported."); + } + + + @Override + public Pipeline openPipeline(PipelineID pipelineId) throws IOException { + throw new IOException("Not supported."); + } + + @Override + public void activatePipeline(PipelineID pipelineID) throws IOException { + throw new IOException("Not supported."); + } + + @Override + public void deactivatePipeline(PipelineID pipelineID) throws IOException { + throw new IOException("Not supported."); + } + + // legacy interfaces end + public static Builder newBuilder() { return new Builder(); } @@ -208,19 +250,19 @@ public class PipelineStateManagerV2Impl implements PipelineStateManagerV2 { return this; } - public PipelineStateManagerV2 build() throws IOException { + public StateManager build() throws IOException { Preconditions.checkNotNull(pipelineStore); - final PipelineStateManagerV2 pipelineStateManager = + final StateManager pipelineStateManager = new PipelineStateManagerV2Impl(pipelineStore, nodeManager); final SCMHAInvocationHandler invocationHandler = new SCMHAInvocationHandler(SCMRatisProtocol.RequestType.PIPELINE, pipelineStateManager, scmRatisServer); - return (PipelineStateManagerV2) Proxy.newProxyInstance( + return (StateManager) Proxy.newProxyInstance( SCMHAInvocationHandler.class.getClassLoader(), - new Class<?>[]{PipelineStateManagerV2.class}, invocationHandler); + new Class<?>[]{StateManager.class}, invocationHandler); } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index 4d91541..821ed30 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -54,8 +54,8 @@ public class RatisPipelineProvider extends PipelineProvider { private int maxPipelinePerDatanode; RatisPipelineProvider(NodeManager nodeManager, - PipelineStateManager stateManager, ConfigurationSource conf, - EventPublisher eventPublisher) { + StateManager stateManager, ConfigurationSource conf, + EventPublisher eventPublisher) { super(nodeManager, stateManager); this.conf = conf; this.eventPublisher = eventPublisher; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index edc40af..19a8fc5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -127,7 +127,7 @@ public final class RatisPipelineUtils { * @return list of matched pipeline */ static List<Pipeline> checkPipelineContainSameDatanodes( - PipelineStateManagerV2 stateManager, Pipeline pipeline) { + StateManager stateManager, Pipeline pipeline) { return stateManager.getPipelines( HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 941ce19..71c3190 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -68,7 +68,7 @@ public class SCMPipelineManager implements PipelineManager { private final ReadWriteLock lock; private PipelineFactory pipelineFactory; - private PipelineStateManager stateManager; + private StateManager stateManager; private final BackgroundPipelineCreator backgroundPipelineCreator; private Scheduler scheduler; @@ -133,7 +133,7 @@ public class SCMPipelineManager implements PipelineManager { this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get()); } - public PipelineStateManager getStateManager() { + public StateManager getStateManager() { return stateManager; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java index c7b6305..69711bb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java @@ -34,7 +34,7 @@ import java.util.List; public class SimplePipelineProvider extends PipelineProvider { public SimplePipelineProvider(NodeManager nodeManager, - PipelineStateManager stateManager) { + StateManager stateManager) { super(nodeManager, stateManager); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java similarity index 83% rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2.java rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java index 4021575..3a772e5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java @@ -29,8 +29,9 @@ import java.util.NavigableSet; /** * Manages the state of pipelines in SCM. + * TODO Rename to PipelineStateManager once the old state manager is removed. */ -public interface PipelineStateManagerV2 { +public interface StateManager { /** * Adding pipeline would be replicated to Ratis. @@ -96,4 +97,24 @@ public interface PipelineStateManagerV2 { ContainerID containerID) throws IOException; void close() throws Exception; + + // TODO remove legacy interfaces once we switch to Ratis based. + + void addPipeline(Pipeline pipeline) throws IOException; + + Pipeline removePipeline(PipelineID pipelineID) throws IOException; + + void updatePipelineState(PipelineID id, Pipeline.PipelineState newState) + throws IOException; + + Pipeline finalizePipeline(PipelineID pipelineId) + throws IOException; + + Pipeline openPipeline(PipelineID pipelineId) throws IOException; + + void activatePipeline(PipelineID pipelineID) + throws IOException; + + void deactivatePipeline(PipelineID pipelineID) + throws IOException; } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java index f9fb150..e355877 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java @@ -36,29 +36,31 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider { private boolean autoOpenPipeline; private boolean isHealthy; - public MockRatisPipelineProvider(NodeManager nodeManager, - PipelineStateManager stateManager, ConfigurationSource conf, - EventPublisher eventPublisher, boolean autoOpen) { + public MockRatisPipelineProvider( + NodeManager nodeManager, StateManager stateManager, + ConfigurationSource conf, EventPublisher eventPublisher, + boolean autoOpen) { super(nodeManager, stateManager, conf, eventPublisher); autoOpenPipeline = autoOpen; } public MockRatisPipelineProvider(NodeManager nodeManager, - PipelineStateManager stateManager, + StateManager stateManager, ConfigurationSource conf) { super(nodeManager, stateManager, conf, new EventQueue()); } public MockRatisPipelineProvider(NodeManager nodeManager, - PipelineStateManager stateManager, - ConfigurationSource conf, boolean isHealthy) { + StateManager stateManager, + ConfigurationSource conf, + boolean isHealthy) { super(nodeManager, stateManager, conf, new EventQueue()); this.isHealthy = isHealthy; } - public MockRatisPipelineProvider(NodeManager nodeManager, - PipelineStateManager stateManager, ConfigurationSource conf, - EventPublisher eventPublisher) { + public MockRatisPipelineProvider( + NodeManager nodeManager, StateManager stateManager, + ConfigurationSource conf, EventPublisher eventPublisher) { super(nodeManager, stateManager, conf, eventPublisher); autoOpenPipeline = true; } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelieManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelieManagerImpl.java new file mode 100644 index 0000000..a2a8e25 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelieManagerImpl.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl; +import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; +import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.DBStoreBuilder; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.UUID; + +/** + * Tests for PipelineManagerImpl. + */ +public class TestPipelieManagerImpl { + private PipelineManagerV2Impl pipelineManager; + private File testDir; + private DBStore dbStore; + + @Before + public void init() throws Exception { + final OzoneConfiguration conf = SCMTestUtils.getConf(); + testDir = GenericTestUtils.getTestDir( + TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID()); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); + dbStore = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition()); + pipelineManager = PipelineManagerV2Impl.newPipelineManager( + conf, MockSCMHAManager.getInstance(), + new MockNodeManager(true, 20), + SCMDBDefinition.PIPELINES.getTable(dbStore), new EventQueue()); + } + + @After + public void cleanup() throws Exception { + if (pipelineManager != null) { + pipelineManager.close(); + } + if (dbStore != null) { + dbStore.close(); + } + FileUtil.fullyDelete(testDir); + } + + @Test + public void testCreatePipeline() throws Exception { + Assert.assertTrue(pipelineManager.getPipelines().isEmpty()); + pipelineManager.allowPipelineCreation(); + Pipeline pipeline1 = pipelineManager.createPipeline( + HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); + Assert.assertEquals(1, pipelineManager.getPipelines().size()); + Assert.assertTrue(pipelineManager.containsPipeline(pipeline1.getId())); + + Pipeline pipeline2 = pipelineManager.createPipeline( + HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE); + Assert.assertEquals(2, pipelineManager.getPipelines().size()); + Assert.assertTrue(pipelineManager.containsPipeline(pipeline2.getId())); + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java index 41eea3d..3320081 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java @@ -80,7 +80,7 @@ public class TestPipelineDatanodesIntersection { NodeManager nodeManager= new MockNodeManager(true, nodeCount); conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, nodeHeaviness); conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false); - PipelineStateManager stateManager = new PipelineStateManager(); + StateManager stateManager = new PipelineStateManager(); PipelineProvider provider = new MockRatisPipelineProvider(nodeManager, stateManager, conf); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java index 6bff581..8252e2c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java @@ -37,7 +37,7 @@ import java.util.Set; */ public class TestPipelineStateManager { - private PipelineStateManager stateManager; + private StateManager stateManager; @Before public void init() throws Exception { diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java index a8dd3c9..beed591 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; -import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager; import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -118,7 +117,7 @@ public class ReconPipelineManager extends SCMPipelineManager { if (!p.getPipelineState().equals(CLOSED)) { try { getStateManager().updatePipelineState(pipelineID, CLOSED); - } catch (PipelineNotFoundException e) { + } catch (IOException e) { LOG.warn("Pipeline {} not found while updating state. ", p.getId(), e); } --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org