HDDS-238. Add Node2Pipeline Map in SCM to track ratis/standalone pipelines. Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3f3f7222 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3f3f7222 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3f3f7222 Branch: refs/heads/HDFS-12943 Commit: 3f3f72221ffd11cc6bfa0e010e3c5b0e14911102 Parents: f89e265 Author: Xiaoyu Yao <x...@apache.org> Authored: Thu Jul 12 22:02:57 2018 -0700 Committer: Xiaoyu Yao <x...@apache.org> Committed: Thu Jul 12 22:14:03 2018 -0700 ---------------------------------------------------------------------- .../container/common/helpers/ContainerInfo.java | 11 ++ .../hdds/scm/container/ContainerMapping.java | 11 +- .../scm/container/ContainerStateManager.java | 6 + .../scm/container/states/ContainerStateMap.java | 36 +++++- .../hdds/scm/pipelines/Node2PipelineMap.java | 121 +++++++++++++++++++ .../hdds/scm/pipelines/PipelineManager.java | 22 ++-- .../hdds/scm/pipelines/PipelineSelector.java | 24 +++- .../scm/pipelines/ratis/RatisManagerImpl.java | 11 +- .../standalone/StandaloneManagerImpl.java | 7 +- .../hdds/scm/pipeline/TestNode2PipelineMap.java | 117 ++++++++++++++++++ 10 files changed, 343 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java index 9593717..4074b21 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java @@ -456,4 +456,15 @@ public class ContainerInfo implements Comparator<ContainerInfo>, replicationFactor, replicationType); } } + + /** + * Check if a container is in open state, this will check if the + * container is either open or allocated or creating. Any containers in + * these states is managed as an open container by SCM. + */ + public boolean isContainerOpen() { + return state == HddsProtos.LifeCycleState.ALLOCATED || + state == HddsProtos.LifeCycleState.CREATING || + state == HddsProtos.LifeCycleState.OPEN; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index abad32c..26f4d86 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -477,7 +477,7 @@ public class ContainerMapping implements Mapping { List<StorageContainerDatanodeProtocolProtos.ContainerInfo> containerInfos = reports.getReportsList(); - for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState : + for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState : containerInfos) { byte[] dbKey = Longs.toByteArray(datanodeState.getContainerID()); lock.lock(); @@ -498,7 +498,9 @@ public class ContainerMapping implements Mapping { containerStore.put(dbKey, newState.toByteArray()); // If the container is closed, then state is already written to SCM - Pipeline pipeline = pipelineSelector.getPipeline(newState.getPipelineName(), newState.getReplicationType()); + Pipeline pipeline = + pipelineSelector.getPipeline(newState.getPipelineName(), + newState.getReplicationType()); if(pipeline == null) { pipeline = pipelineSelector .getReplicationPipeline(newState.getReplicationType(), @@ -713,4 +715,9 @@ public class ContainerMapping implements Mapping { public MetadataStore getContainerStore() { return containerStore; } + + @VisibleForTesting + public PipelineSelector getPipelineSelector() { + return pipelineSelector; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index 223deac..b2431dc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.container; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -522,4 +523,9 @@ public class ContainerStateManager implements Closeable { DatanodeDetails dn) throws SCMException { return containers.removeContainerReplica(containerID, dn); } + + @VisibleForTesting + public ContainerStateMap getContainerStateMap() { + return containers; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java index 1c92861..46fe2ab 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java @@ -51,7 +51,7 @@ import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes * Container State Map acts like a unified map for various attributes that are * used to select containers when we need allocated blocks. * <p> - * This class provides the ability to query 4 classes of attributes. They are + * This class provides the ability to query 5 classes of attributes. They are * <p> * 1. LifeCycleStates - LifeCycle States of container describe in which state * a container is. For example, a container needs to be in Open State for a @@ -72,6 +72,9 @@ import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes * Replica and THREE Replica. User can specify how many copies should be made * for a ozone key. * <p> + * 5.Pipeline - The pipeline constitute the set of Datanodes on which the + * open container resides physically. + * <p> * The most common access pattern of this class is to select a container based * on all these parameters, for example, when allocating a block we will * select a container that belongs to user1, with Ratis replication which can @@ -86,6 +89,14 @@ public class ContainerStateMap { private final ContainerAttribute<String> ownerMap; private final ContainerAttribute<ReplicationFactor> factorMap; private final ContainerAttribute<ReplicationType> typeMap; + // This map constitutes the pipeline to open container mappings. + // This map will be queried for the list of open containers on a particular + // pipeline and issue a close on corresponding containers in case of + // following events: + //1. Dead datanode. + //2. Datanode out of space. + //3. Volume loss or volume out of space. + private final ContainerAttribute<String> openPipelineMap; private final Map<ContainerID, ContainerInfo> containerMap; // Map to hold replicas of given container. @@ -106,6 +117,7 @@ public class ContainerStateMap { ownerMap = new ContainerAttribute<>(); factorMap = new ContainerAttribute<>(); typeMap = new ContainerAttribute<>(); + openPipelineMap = new ContainerAttribute<>(); containerMap = new HashMap<>(); autoLock = new AutoCloseableLock(); contReplicaMap = new HashMap<>(); @@ -140,6 +152,9 @@ public class ContainerStateMap { ownerMap.insert(info.getOwner(), id); factorMap.insert(info.getReplicationFactor(), id); typeMap.insert(info.getReplicationType(), id); + if (info.isContainerOpen()) { + openPipelineMap.insert(info.getPipelineName(), id); + } LOG.trace("Created container with {} successfully.", id); } } @@ -329,6 +344,11 @@ public class ContainerStateMap { throw new SCMException("Updating the container map failed.", ex, FAILED_TO_CHANGE_CONTAINER_STATE); } + // In case the container is set to closed state, it needs to be removed from + // the pipeline Map. + if (newState == LifeCycleState.CLOSED) { + openPipelineMap.remove(info.getPipelineName(), id); + } } /** @@ -360,6 +380,20 @@ public class ContainerStateMap { } /** + * Returns Open containers in the SCM by the Pipeline + * + * @param pipeline - Pipeline name. + * @return NavigableSet<ContainerID> + */ + public NavigableSet<ContainerID> getOpenContainerIDsByPipeline(String pipeline) { + Preconditions.checkNotNull(pipeline); + + try (AutoCloseableLock lock = autoLock.acquire()) { + return openPipelineMap.getCollection(pipeline); + } + } + + /** * Returns Containers by replication factor. * * @param factor - Replication Factor. http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java new file mode 100644 index 0000000..2e89616 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.pipelines; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; + +import java.util.Set; +import java.util.UUID; +import java.util.Map; +import java.util.HashSet; +import java.util.Collections; + +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes + .DUPLICATE_DATANODE; + + +/** + * This data structure maintains the list of pipelines which the given datanode + * is a part of. + * This information will be added whenever a new pipeline allocation happens. + * + * TODO: this information needs to be regenerated from pipeline reports on + * SCM restart + */ +public class Node2PipelineMap { + private final Map<UUID, Set<Pipeline>> dn2PipelineMap; + + /** + * Constructs a Node2PipelineMap Object. + */ + public Node2PipelineMap() { + dn2PipelineMap = new ConcurrentHashMap<>(); + } + + /** + * Returns true if this a datanode that is already tracked by + * Node2PipelineMap. + * + * @param datanodeID - UUID of the Datanode. + * @return True if this is tracked, false if this map does not know about it. + */ + private boolean isKnownDatanode(UUID datanodeID) { + Preconditions.checkNotNull(datanodeID); + return dn2PipelineMap.containsKey(datanodeID); + } + + /** + * Insert a new datanode into Node2Pipeline Map. + * + * @param datanodeID -- Datanode UUID + * @param pipelines - set of pipelines. + */ + private void insertNewDatanode(UUID datanodeID, Set<Pipeline> pipelines) + throws SCMException { + Preconditions.checkNotNull(pipelines); + Preconditions.checkNotNull(datanodeID); + if(dn2PipelineMap.putIfAbsent(datanodeID, pipelines) != null) { + throw new SCMException("Node already exists in the map", + DUPLICATE_DATANODE); + } + } + + /** + * Removes datanode Entry from the map. + * @param datanodeID - Datanode ID. + */ + public synchronized void removeDatanode(UUID datanodeID) { + Preconditions.checkNotNull(datanodeID); + dn2PipelineMap.computeIfPresent(datanodeID, (k, v) -> null); + } + + /** + * Returns null if there no pipelines associated with this datanode ID. + * + * @param datanode - UUID + * @return Set of pipelines or Null. + */ + public Set<Pipeline> getPipelines(UUID datanode) { + Preconditions.checkNotNull(datanode); + return dn2PipelineMap.computeIfPresent(datanode, (k, v) -> + Collections.unmodifiableSet(v)); + } + +/** + * Adds a pipeline entry to a given dataNode in the map. + * @param pipeline Pipeline to be added + */ + public synchronized void addPipeline(Pipeline pipeline) throws SCMException { + for (DatanodeDetails details : pipeline.getDatanodes().values()) { + UUID dnId = details.getUuid(); + dn2PipelineMap + .computeIfAbsent(dnId,k->Collections.synchronizedSet(new HashSet<>())) + .add(pipeline); + } + } + + public Map<UUID, Set<Pipeline>> getDn2PipelineMap() { + return Collections.unmodifiableMap(dn2PipelineMap); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java index a1fbce6..a041973 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java @@ -40,11 +40,13 @@ public abstract class PipelineManager { private final List<Pipeline> activePipelines; private final Map<String, Pipeline> activePipelineMap; private final AtomicInteger pipelineIndex; + private final Node2PipelineMap node2PipelineMap; - public PipelineManager() { + public PipelineManager(Node2PipelineMap map) { activePipelines = new LinkedList<>(); pipelineIndex = new AtomicInteger(0); activePipelineMap = new WeakHashMap<>(); + node2PipelineMap = map; } /** @@ -66,24 +68,23 @@ public abstract class PipelineManager { * * 2. This allows all nodes to part of a pipeline quickly. * - * 3. if there are not enough free nodes, return conduits in a + * 3. if there are not enough free nodes, return pipeline in a * round-robin fashion. * * TODO: Might have to come up with a better algorithm than this. - * Create a new placement policy that returns conduits in round robin + * Create a new placement policy that returns pipelines in round robin * fashion. */ - Pipeline pipeline = - allocatePipeline(replicationFactor); + Pipeline pipeline = allocatePipeline(replicationFactor); if (pipeline != null) { LOG.debug("created new pipeline:{} for container with " + "replicationType:{} replicationFactor:{}", pipeline.getPipelineName(), replicationType, replicationFactor); activePipelines.add(pipeline); activePipelineMap.put(pipeline.getPipelineName(), pipeline); + node2PipelineMap.addPipeline(pipeline); } else { - pipeline = - findOpenPipeline(replicationType, replicationFactor); + pipeline = findOpenPipeline(replicationType, replicationFactor); if (pipeline != null) { LOG.debug("re-used pipeline:{} for container with " + "replicationType:{} replicationFactor:{}", @@ -133,6 +134,11 @@ public abstract class PipelineManager { public abstract Pipeline allocatePipeline( ReplicationFactor replicationFactor) throws IOException; + public void removePipeline(Pipeline pipeline) { + activePipelines.remove(pipeline); + activePipelineMap.remove(pipeline.getPipelineName()); + } + /** * Find a Pipeline that is operational. * @@ -143,7 +149,7 @@ public abstract class PipelineManager { Pipeline pipeline = null; final int sentinal = -1; if (activePipelines.size() == 0) { - LOG.error("No Operational conduits found. Returning null."); + LOG.error("No Operational pipelines found. Returning null."); return null; } int startIndex = getNextIndex(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java index 3846a84..2955af5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hdds.scm.pipelines; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.placement.algorithms .ContainerPlacementPolicy; @@ -41,6 +40,8 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.List; +import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; /** @@ -55,7 +56,7 @@ public class PipelineSelector { private final RatisManagerImpl ratisManager; private final StandaloneManagerImpl standaloneManager; private final long containerSize; - + private final Node2PipelineMap node2PipelineMap; /** * Constructs a pipeline Selector. * @@ -69,12 +70,13 @@ public class PipelineSelector { this.containerSize = OzoneConsts.GB * this.conf.getInt( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT); + node2PipelineMap = new Node2PipelineMap(); this.standaloneManager = new StandaloneManagerImpl(this.nodeManager, placementPolicy, - containerSize); + containerSize, node2PipelineMap); this.ratisManager = new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize, - conf); + conf, node2PipelineMap); } /** @@ -243,4 +245,18 @@ public class PipelineSelector { .collect(Collectors.joining(","))); manager.updatePipeline(pipelineID, newDatanodes); } + + public Node2PipelineMap getNode2PipelineMap() { + return node2PipelineMap; + } + + public void removePipeline(UUID dnId) { + Set<Pipeline> pipelineChannelSet = + node2PipelineMap.getPipelines(dnId); + for (Pipeline pipelineChannel : pipelineChannelSet) { + getPipelineManager(pipelineChannel.getType()) + .removePipeline(pipelineChannel); + } + node2PipelineMap.removeDatanode(dnId); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java index 189060e..a8f8b20 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java @@ -19,11 +19,11 @@ package org.apache.hadoop.hdds.scm.pipelines.ratis; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.XceiverClientRatis; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.placement.algorithms .ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap; import org.apache.hadoop.hdds.scm.pipelines.PipelineManager; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -60,8 +60,9 @@ public class RatisManagerImpl extends PipelineManager { * @param nodeManager */ public RatisManagerImpl(NodeManager nodeManager, - ContainerPlacementPolicy placementPolicy, long size, Configuration conf) { - super(); + ContainerPlacementPolicy placementPolicy, long size, Configuration conf, + Node2PipelineMap map) { + super(map); this.conf = conf; this.nodeManager = nodeManager; ratisMembers = new HashSet<>(); @@ -89,11 +90,11 @@ public class RatisManagerImpl extends PipelineManager { ratisMembers.addAll(newNodesList); LOG.info("Allocating a new ratis pipeline of size: {}", count); // Start all channel names with "Ratis", easy to grep the logs. - String conduitName = PREFIX + + String pipelineName = PREFIX + UUID.randomUUID().toString().substring(PREFIX.length()); Pipeline pipeline= PipelineSelector.newPipelineFromNodes(newNodesList, - LifeCycleState.OPEN, ReplicationType.RATIS, factor, conduitName); + LifeCycleState.OPEN, ReplicationType.RATIS, factor, pipelineName); try (XceiverClientRatis client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { client.createPipeline(pipeline.getPipelineName(), newNodesList); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java index 579a3a2..cf691bf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java @@ -17,11 +17,11 @@ package org.apache.hadoop.hdds.scm.pipelines.standalone; import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.placement.algorithms .ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap; import org.apache.hadoop.hdds.scm.pipelines.PipelineManager; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -58,8 +58,9 @@ public class StandaloneManagerImpl extends PipelineManager { * @param containerSize - Container Size. */ public StandaloneManagerImpl(NodeManager nodeManager, - ContainerPlacementPolicy placementPolicy, long containerSize) { - super(); + ContainerPlacementPolicy placementPolicy, long containerSize, + Node2PipelineMap map) { + super(map); this.nodeManager = nodeManager; this.placementPolicy = placementPolicy; this.containerSize = containerSize; http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f3f7222/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java new file mode 100644 index 0000000..bc3505f --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerMapping; +import org.apache.hadoop.hdds.scm.container.common.helpers + .ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; +import java.util.Set; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos + .ReplicationType.RATIS; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos + .ReplicationFactor.THREE; + +public class TestNode2PipelineMap { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + private static StorageContainerManager scm; + private static ContainerWithPipeline ratisContainer; + private static ContainerStateMap stateMap; + private static ContainerMapping mapping; + + /** + * Create a MiniDFSCluster for testing. + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + conf = new OzoneConfiguration(); + cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build(); + cluster.waitForClusterToBeReady(); + scm = cluster.getStorageContainerManager(); + mapping = (ContainerMapping)scm.getScmContainerManager(); + stateMap = mapping.getStateManager().getContainerStateMap(); + ratisContainer = mapping.allocateContainer(RATIS, THREE, "testOwner"); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + + @Test + public void testPipelineMap() throws IOException { + + NavigableSet<ContainerID> set = stateMap.getOpenContainerIDsByPipeline( + ratisContainer.getPipeline().getPipelineName()); + + long cId = ratisContainer.getContainerInfo().getContainerID(); + Assert.assertEquals(1, set.size()); + Assert.assertEquals(cId, set.first().getId()); + + List<DatanodeDetails> dns = ratisContainer.getPipeline().getMachines(); + Assert.assertEquals(3, dns.size()); + + // get pipeline details by dnid + Set<Pipeline> pipelines = mapping.getPipelineSelector() + .getNode2PipelineMap().getPipelines(dns.get(0).getUuid()); + Assert.assertEquals(1, pipelines.size()); + pipelines.forEach(p -> Assert.assertEquals(p.getPipelineName(), + ratisContainer.getPipeline().getPipelineName())); + + + // Now close the container and it should not show up while fetching + // containers by pipeline + mapping + .updateContainerState(cId, HddsProtos.LifeCycleEvent.CREATE); + mapping + .updateContainerState(cId, HddsProtos.LifeCycleEvent.CREATED); + mapping + .updateContainerState(cId, HddsProtos.LifeCycleEvent.FINALIZE); + mapping + .updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE); + NavigableSet<ContainerID> set2 = stateMap.getOpenContainerIDsByPipeline( + ratisContainer.getPipeline().getPipelineName()); + Assert.assertEquals(0, set2.size()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org