HDDS-277. PipelineStateMachine should handle closure of pipelines in SCM. Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fd31cb6c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fd31cb6c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fd31cb6c Branch: refs/heads/YARN-3409 Commit: fd31cb6cfeef0c7e9bb0a054cb0f78853df8976f Parents: be150a1 Author: Xiaoyu Yao <x...@apache.org> Authored: Thu Jul 26 13:15:27 2018 -0700 Committer: Xiaoyu Yao <x...@apache.org> Committed: Thu Jul 26 13:15:55 2018 -0700 ---------------------------------------------------------------------- .../container/common/helpers/ContainerInfo.java | 7 +- .../container/CloseContainerEventHandler.java | 28 ++-- .../hdds/scm/container/ContainerMapping.java | 16 +- .../scm/container/ContainerStateManager.java | 11 ++ .../scm/container/states/ContainerStateMap.java | 2 +- .../hdds/scm/pipelines/Node2PipelineMap.java | 33 ++-- .../hdds/scm/pipelines/PipelineManager.java | 31 ++-- .../hdds/scm/pipelines/PipelineSelector.java | 70 +++++++-- .../scm/pipelines/ratis/RatisManagerImpl.java | 14 +- .../standalone/StandaloneManagerImpl.java | 13 +- .../scm/server/StorageContainerManager.java | 2 +- .../hadoop/hdds/scm/block/TestBlockManager.java | 4 +- .../TestCloseContainerEventHandler.java | 13 +- .../scm/container/TestContainerMapping.java | 4 +- .../container/closer/TestContainerCloser.java | 4 +- .../hdds/scm/node/TestContainerPlacement.java | 3 +- .../hdds/scm/pipeline/TestPipelineClose.java | 152 +++++++++++++++++++ .../hadoop/ozone/scm/TestContainerSQLCli.java | 4 +- 18 files changed, 331 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java index 4074b21..b194c14 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java @@ -459,12 +459,13 @@ public class ContainerInfo implements Comparator<ContainerInfo>, /** * Check if a container is in open state, this will check if the - * container is either open or allocated or creating. Any containers in - * these states is managed as an open container by SCM. + * container is either open, allocated, creating or creating. + * Any containers in these states is managed as an open container by SCM. */ public boolean isContainerOpen() { return state == HddsProtos.LifeCycleState.ALLOCATED || state == HddsProtos.LifeCycleState.CREATING || - state == HddsProtos.LifeCycleState.OPEN; + state == HddsProtos.LifeCycleState.OPEN || + state == HddsProtos.LifeCycleState.CLOSING; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java index 859e5d5..949eb13 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java @@ -21,7 +21,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; @@ -63,13 +62,13 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> { containerManager.getContainerWithPipeline(containerID.getId()); info = containerWithPipeline.getContainerInfo(); if (info == null) { - LOG.info("Failed to update the container state. Container with id : {} " + LOG.error("Failed to update the container state. Container with id : {} " + "does not exist", containerID.getId()); return; } } catch (IOException e) { - LOG.info("Failed to update the container state. Container with id : {} " - + "does not exist", containerID.getId()); + LOG.error("Failed to update the container state. Container with id : {} " + + "does not exist", containerID.getId(), e); return; } @@ -85,11 +84,22 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> { try { // Finalize event will make sure the state of the container transitions // from OPEN to CLOSING in containerStateManager. - containerManager.getStateManager() - .updateContainerState(info, HddsProtos.LifeCycleEvent.FINALIZE); - } catch (SCMException ex) { - LOG.error("Failed to update the container state for container : {}" - + containerID); + containerManager.updateContainerState(containerID.getId(), + HddsProtos.LifeCycleEvent.FINALIZE); + } catch (IOException ex) { + LOG.error("Failed to update the container state to FINALIZE for" + + "container : {}" + containerID, ex); + } + } else if (info.getState() == HddsProtos.LifeCycleState.ALLOCATED) { + try { + // Create event will make sure the state of the container transitions + // from OPEN to CREATING in containerStateManager, this will move + // the container out of active allocation path. + containerManager.updateContainerState(containerID.getId(), + HddsProtos.LifeCycleEvent.CREATE); + } catch (IOException ex) { + LOG.error("Failed to update the container state to CREATE for" + + "container:{}" + containerID, ex); } } else { LOG.info("container with id : {} is in {} state and need not be closed.", http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index e17fe3d..d84551a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.lease.Lease; import org.apache.hadoop.ozone.lease.LeaseException; @@ -104,7 +105,7 @@ public class ContainerMapping implements Mapping { @SuppressWarnings("unchecked") public ContainerMapping( final Configuration conf, final NodeManager nodeManager, final int - cacheSizeMB) throws IOException { + cacheSizeMB, EventPublisher eventPublisher) throws IOException { this.nodeManager = nodeManager; this.cacheSize = cacheSizeMB; this.closer = new ContainerCloser(nodeManager, conf); @@ -122,14 +123,15 @@ public class ContainerMapping implements Mapping { this.lock = new ReentrantLock(); - this.pipelineSelector = new PipelineSelector(nodeManager, conf); - // To be replaced with code getStorageSize once it is committed. size = conf.getLong(OZONE_SCM_CONTAINER_SIZE_GB, OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024 * 1024 * 1024; this.containerStateManager = new ContainerStateManager(conf, this); + this.pipelineSelector = new PipelineSelector(nodeManager, + containerStateManager, conf, eventPublisher); + this.containerCloseThreshold = conf.getFloat( ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD, ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT); @@ -372,6 +374,12 @@ public class ContainerMapping implements Mapping { // Like releasing the lease in case of BEGIN_CREATE. ContainerInfo updatedContainer = containerStateManager .updateContainerState(containerInfo, event); + if (!updatedContainer.isContainerOpen()) { + Pipeline pipeline = pipelineSelector + .getPipeline(containerInfo.getPipelineName(), + containerInfo.getReplicationType()); + pipelineSelector.closePipelineIfNoOpenContainers(pipeline); + } containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray()); return updatedContainer.getState(); } catch (LeaseException e) { @@ -446,7 +454,7 @@ public class ContainerMapping implements Mapping { .getPipeline(containerInfo.getPipelineName(), containerInfo.getReplicationType()); if (pipeline == null) { - pipelineSelector + pipeline = pipelineSelector .getReplicationPipeline(containerInfo.getReplicationType(), containerInfo.getReplicationFactor()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index b2431dc..f0ab213 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -463,6 +463,17 @@ public class ContainerStateManager implements Closeable { } /** + * Returns a set of open ContainerIDs that reside on a pipeline. + * + * @param pipeline Pipeline of the Containers. + * @return Set of containers that match the specific query parameters. + */ + public NavigableSet<ContainerID> getMatchingContainerIDsByPipeline(String + pipeline) { + return containers.getOpenContainerIDsByPipeline(pipeline); + } + + /** * Returns the containerInfo with pipeline for the given container id. * @param selector -- Pipeline selector class. * @param containerID id of the container http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java index 46fe2ab..b358b7c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java @@ -346,7 +346,7 @@ public class ContainerStateMap { } // In case the container is set to closed state, it needs to be removed from // the pipeline Map. - if (newState == LifeCycleState.CLOSED) { + if (!info.isContainerOpen()) { openPipelineMap.remove(info.getPipelineName(), id); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java index 2e89616..b860082 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java @@ -102,18 +102,27 @@ public class Node2PipelineMap { Collections.unmodifiableSet(v)); } -/** - * Adds a pipeline entry to a given dataNode in the map. - * @param pipeline Pipeline to be added - */ - public synchronized void addPipeline(Pipeline pipeline) throws SCMException { - for (DatanodeDetails details : pipeline.getDatanodes().values()) { - UUID dnId = details.getUuid(); - dn2PipelineMap - .computeIfAbsent(dnId,k->Collections.synchronizedSet(new HashSet<>())) - .add(pipeline); - } - } + /** + * Adds a pipeline entry to a given dataNode in the map. + * @param pipeline Pipeline to be added + */ + public synchronized void addPipeline(Pipeline pipeline) { + for (DatanodeDetails details : pipeline.getDatanodes().values()) { + UUID dnId = details.getUuid(); + dn2PipelineMap + .computeIfAbsent(dnId, + k -> Collections.synchronizedSet(new HashSet<>())) + .add(pipeline); + } + } + + public synchronized void removePipeline(Pipeline pipeline) { + for (DatanodeDetails details : pipeline.getDatanodes().values()) { + UUID dnId = details.getUuid(); + dn2PipelineMap.computeIfPresent(dnId, + (k, v) -> {v.remove(pipeline); return v;}); + } + } public Map<UUID, Set<Pipeline>> getDn2PipelineMap() { return Collections.unmodifiableMap(dn2PipelineMap); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java index 77d8211..266b1f3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java @@ -38,14 +38,14 @@ public abstract class PipelineManager { private static final Logger LOG = LoggerFactory.getLogger(PipelineManager.class); private final List<Pipeline> activePipelines; - private final Map<String, Pipeline> activePipelineMap; + private final Map<String, Pipeline> pipelineMap; private final AtomicInteger pipelineIndex; private final Node2PipelineMap node2PipelineMap; public PipelineManager(Node2PipelineMap map) { activePipelines = new LinkedList<>(); pipelineIndex = new AtomicInteger(0); - activePipelineMap = new WeakHashMap<>(); + pipelineMap = new WeakHashMap<>(); node2PipelineMap = map; } @@ -85,8 +85,8 @@ public abstract class PipelineManager { Pipeline pipeline = null; // 1. Check if pipeline already exists - if (activePipelineMap.containsKey(pipelineName)) { - pipeline = activePipelineMap.get(pipelineName); + if (pipelineMap.containsKey(pipelineName)) { + pipeline = pipelineMap.get(pipelineName); LOG.debug("Returning pipeline for pipelineName:{}", pipelineName); return pipeline; } else { @@ -115,11 +115,6 @@ public abstract class PipelineManager { */ public abstract void initializePipeline(Pipeline pipeline) throws IOException; - public void removePipeline(Pipeline pipeline) { - activePipelines.remove(pipeline); - activePipelineMap.remove(pipeline.getPipelineName()); - } - /** * Find a Pipeline that is operational. * @@ -172,16 +167,28 @@ public abstract class PipelineManager { + "replicationType:{} replicationFactor:{}", pipeline.getPipelineName(), replicationType, replicationFactor); activePipelines.add(pipeline); - activePipelineMap.put(pipeline.getPipelineName(), pipeline); + pipelineMap.put(pipeline.getPipelineName(), pipeline); node2PipelineMap.addPipeline(pipeline); } return pipeline; } /** - * Close the pipeline with the given clusterId. + * Remove the pipeline from active allocation + * @param pipeline pipeline to be finalized + */ + public synchronized void finalizePipeline(Pipeline pipeline) { + activePipelines.remove(pipeline); + } + + /** + * + * @param pipeline */ - public abstract void closePipeline(String pipelineID) throws IOException; + public void closePipeline(Pipeline pipeline) { + pipelineMap.remove(pipeline.getPipelineName()); + node2PipelineMap.removePipeline(pipeline); + } /** * list members in the pipeline . http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java index b1e1dd0..ebe39d0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java @@ -19,11 +19,14 @@ package org.apache.hadoop.hdds.scm.pipelines; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerStateManager; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.placement.algorithms .ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.container.placement.algorithms .SCMContainerPlacementRandom; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl; @@ -33,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.common.statemachine .InvalidStateTransitionException; @@ -48,6 +52,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.HashSet; import java.util.List; +import java.util.NavigableSet; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -65,6 +70,8 @@ public class PipelineSelector { private final ContainerPlacementPolicy placementPolicy; private final NodeManager nodeManager; private final Configuration conf; + private final ContainerStateManager containerStateManager; + private final EventPublisher eventPublisher; private final RatisManagerImpl ratisManager; private final StandaloneManagerImpl standaloneManager; private final long containerSize; @@ -79,9 +86,12 @@ public class PipelineSelector { * @param nodeManager - node manager * @param conf - Ozone Config */ - public PipelineSelector(NodeManager nodeManager, Configuration conf) { + public PipelineSelector(NodeManager nodeManager, + ContainerStateManager containerStateManager, Configuration conf, + EventPublisher eventPublisher) { this.nodeManager = nodeManager; this.conf = conf; + this.eventPublisher = eventPublisher; this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf); this.containerSize = OzoneConsts.GB * this.conf.getInt( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB, @@ -99,6 +109,7 @@ public class PipelineSelector { ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT, ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); + this.containerStateManager = containerStateManager; pipelineLeaseManager = new LeaseManager<>("PipelineCreation", pipelineCreationLeaseTimeout); pipelineLeaseManager.start(); @@ -306,15 +317,54 @@ public class PipelineSelector { } /** - * Close the pipeline with the given clusterId. + * Finalize a given pipeline. */ + public void finalizePipeline(Pipeline pipeline) throws IOException { + PipelineManager manager = getPipelineManager(pipeline.getType()); + Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); + LOG.debug("Finalizing pipeline. pipelineID: {}", pipeline.getPipelineName()); + // Remove the pipeline from active allocation + manager.finalizePipeline(pipeline); + updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.FINALIZE); + closePipelineIfNoOpenContainers(pipeline); + } - public void closePipeline(ReplicationType replicationType, String - pipelineID) throws IOException { - PipelineManager manager = getPipelineManager(replicationType); + /** + * Close a given pipeline. + */ + public void closePipelineIfNoOpenContainers(Pipeline pipeline) throws IOException { + if (pipeline.getLifeCycleState() != LifeCycleState.CLOSING) { + return; + } + NavigableSet<ContainerID> containerIDS = containerStateManager + .getMatchingContainerIDsByPipeline(pipeline.getPipelineName()); + if (containerIDS.size() == 0) { + updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CLOSE); + LOG.info("Closing pipeline. pipelineID: {}", pipeline.getPipelineName()); + } + } + + /** + * Close a given pipeline. + */ + private void closePipeline(Pipeline pipeline) { + PipelineManager manager = getPipelineManager(pipeline.getType()); Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); - LOG.debug("Closing pipeline. pipelineID: {}", pipelineID); - manager.closePipeline(pipelineID); + LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getPipelineName()); + NavigableSet<ContainerID> containers = + containerStateManager + .getMatchingContainerIDsByPipeline(pipeline.getPipelineName()); + Preconditions.checkArgument(containers.size() == 0); + manager.closePipeline(pipeline); + } + + private void closeContainersByPipeline(Pipeline pipeline) { + NavigableSet<ContainerID> containers = + containerStateManager + .getMatchingContainerIDsByPipeline(pipeline.getPipelineName()); + for (ContainerID id : containers) { + eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, id); + } } /** @@ -352,7 +402,7 @@ public class PipelineSelector { node2PipelineMap.getPipelines(dnId); for (Pipeline pipeline : pipelineSet) { getPipelineManager(pipeline.getType()) - .removePipeline(pipeline); + .closePipeline(pipeline); } node2PipelineMap.removeDatanode(dnId); } @@ -398,12 +448,12 @@ public class PipelineSelector { break; case FINALIZE: - //TODO: cleanup pipeline by closing all the containers on the pipeline + closeContainersByPipeline(pipeline); break; case CLOSE: case TIMEOUT: - // TODO: Release the nodes here when pipelines are destroyed + closePipeline(pipeline); break; default: throw new SCMException("Unsupported pipeline LifeCycleEvent.", http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java index c726ef6..fdd0605 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java @@ -108,13 +108,15 @@ public class RatisManagerImpl extends PipelineManager { } /** - * Close the pipeline with the given clusterId. - * - * @param pipelineID + * Close the pipeline. */ - @Override - public void closePipeline(String pipelineID) throws IOException { - + public void closePipeline(Pipeline pipeline) { + super.closePipeline(pipeline); + for (DatanodeDetails node : pipeline.getMachines()) { + // A node should always be the in ratis members list. + Preconditions.checkArgument(ratisMembers.remove(node)); + } + //TODO: should the raft ring also be destroyed as well? } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java index bb4951f..0506e59 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java @@ -101,13 +101,14 @@ public class StandaloneManagerImpl extends PipelineManager { } /** - * Close the pipeline with the given clusterId. - * - * @param pipelineID + * Close the pipeline. */ - @Override - public void closePipeline(String pipelineID) throws IOException { - + public void closePipeline(Pipeline pipeline) { + super.closePipeline(pipeline); + for (DatanodeDetails node : pipeline.getMachines()) { + // A node should always be the in standalone members list. + Preconditions.checkArgument(standAloneMembers.remove(node)); + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 165805f..be8fb43 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -192,7 +192,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl scmNodeManager = new SCMNodeManager( conf, scmStorage.getClusterID(), this, eventQueue); scmContainerManager = new ContainerMapping( - conf, getScmNodeManager(), cacheSize); + conf, getScmNodeManager(), cacheSize, eventQueue); scmBlockManager = new BlockManagerImpl( conf, getScmNodeManager(), scmContainerManager, eventQueue); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java index 06e7420..7049029 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerMapping; import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.test.GenericTestUtils; @@ -73,7 +74,8 @@ public class TestBlockManager { throw new IOException("Unable to create test directory path"); } nodeManager = new MockNodeManager(true, 10); - mapping = new ContainerMapping(conf, nodeManager, 128); + mapping = + new ContainerMapping(conf, nodeManager, 128, new EventQueue()); blockManager = new BlockManagerImpl(conf, nodeManager, mapping, null); if(conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT)){ http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java index 0764b12..543cad3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java @@ -66,7 +66,8 @@ public class TestCloseContainerEventHandler { configuration .set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); nodeManager = new MockNodeManager(true, 10); - mapping = new ContainerMapping(configuration, nodeManager, 128); + mapping = new ContainerMapping(configuration, nodeManager, 128, + new EventQueue()); eventQueue = new EventQueue(); eventQueue.addHandler(CLOSE_CONTAINER, new CloseContainerEventHandler(mapping)); @@ -122,12 +123,7 @@ public class TestCloseContainerEventHandler { // state, so firing close container event should not queue CLOSE // command in the Datanode Assert.assertEquals(0, nodeManager.getCommandCount(datanode)); - // Make sure the information is logged - Assert.assertTrue(logCapturer.getOutput().contains( - "container with id : " + id.getId() - + " is in ALLOCATED state and need not be closed")); //Execute these state transitions so that we can close the container. - mapping.updateContainerState(id.getId(), CREATE); mapping.updateContainerState(id.getId(), CREATED); eventQueue.fireEvent(CLOSE_CONTAINER, new ContainerID( @@ -164,12 +160,7 @@ public class TestCloseContainerEventHandler { Assert.assertEquals(closeCount[i], nodeManager.getCommandCount(details)); i++; } - // Make sure the information is logged - Assert.assertTrue(logCapturer.getOutput().contains( - "container with id : " + id.getId() - + " is in ALLOCATED state and need not be closed")); //Execute these state transitions so that we can close the container. - mapping.updateContainerState(id.getId(), CREATE); mapping.updateContainerState(id.getId(), CREATED); eventQueue.fireEvent(CLOSE_CONTAINER, id); eventQueue.processAll(1000); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java index 79ac9cf..6269514 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.test.GenericTestUtils; @@ -84,7 +85,8 @@ public class TestContainerMapping { throw new IOException("Unable to create test directory path"); } nodeManager = new MockNodeManager(true, 10); - mapping = new ContainerMapping(conf, nodeManager, 128); + mapping = new ContainerMapping(conf, nodeManager, 128, + new EventQueue()); xceiverClientManager = new XceiverClientManager(conf); random = new Random(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java index cc25544..0c0f25d 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.test.GenericTestUtils; @@ -79,7 +80,8 @@ public class TestContainerCloser { configuration.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); nodeManager = new MockNodeManager(true, 10); - mapping = new ContainerMapping(configuration, nodeManager, 128); + mapping = new ContainerMapping(configuration, nodeManager, 128, + new EventQueue()); } @AfterClass http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java index 2fef620..52963c0 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java @@ -105,9 +105,10 @@ public class TestContainerPlacement { ContainerMapping createContainerManager(Configuration config, NodeManager scmNodeManager) throws IOException { + EventQueue eventQueue = new EventQueue(); final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, OZONE_SCM_DB_CACHE_SIZE_DEFAULT); - return new ContainerMapping(config, scmNodeManager, cacheSize); + return new ContainerMapping(config, scmNodeManager, cacheSize, eventQueue); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java new file mode 100644 index 0000000..24e25ab --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerMapping; +import org.apache.hadoop.hdds.scm.container.common.helpers + .ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.HddsDatanodeService; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.NavigableSet; +import java.util.UUID; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos + .ReplicationFactor.THREE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos + .ReplicationType.RATIS; + +public class TestPipelineClose { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + private static StorageContainerManager scm; + private static ContainerWithPipeline ratisContainer1; + private static ContainerWithPipeline ratisContainer2; + private static ContainerStateMap stateMap; + private static ContainerMapping mapping; + private static PipelineSelector pipelineSelector; + + /** + * Create a MiniDFSCluster for testing. + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + conf = new OzoneConfiguration(); + cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(6).build(); + cluster.waitForClusterToBeReady(); + scm = cluster.getStorageContainerManager(); + mapping = (ContainerMapping)scm.getScmContainerManager(); + stateMap = mapping.getStateManager().getContainerStateMap(); + ratisContainer1 = mapping.allocateContainer(RATIS, THREE, "testOwner"); + ratisContainer2 = mapping.allocateContainer(RATIS, THREE, "testOwner"); + pipelineSelector = mapping.getPipelineSelector(); + // At this stage, there should be 2 pipeline one with 1 open container each. + // Try closing the both the pipelines, one with a closed container and + // the other with an open container. + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + + @Test + public void testPipelineCloseWithClosedContainer() throws IOException { + NavigableSet<ContainerID> set = stateMap.getOpenContainerIDsByPipeline( + ratisContainer1.getPipeline().getPipelineName()); + + long cId = ratisContainer1.getContainerInfo().getContainerID(); + Assert.assertEquals(1, set.size()); + Assert.assertEquals(cId, set.first().getId()); + + // Now close the container and it should not show up while fetching + // containers by pipeline + mapping + .updateContainerState(cId, HddsProtos.LifeCycleEvent.CREATE); + mapping + .updateContainerState(cId, HddsProtos.LifeCycleEvent.CREATED); + mapping + .updateContainerState(cId, HddsProtos.LifeCycleEvent.FINALIZE); + mapping + .updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE); + + NavigableSet<ContainerID> setClosed = stateMap.getOpenContainerIDsByPipeline( + ratisContainer1.getPipeline().getPipelineName()); + Assert.assertEquals(0, setClosed.size()); + + pipelineSelector.finalizePipeline(ratisContainer1.getPipeline()); + Pipeline pipeline1 = pipelineSelector + .getPipeline(ratisContainer1.getPipeline().getPipelineName(), + ratisContainer1.getContainerInfo().getReplicationType()); + Assert.assertNull(pipeline1); + Assert.assertEquals(ratisContainer1.getPipeline().getLifeCycleState(), + HddsProtos.LifeCycleState.CLOSED); + for (DatanodeDetails dn : ratisContainer1.getPipeline().getMachines()) { + // Assert that the pipeline has been removed from Node2PipelineMap as well + Assert.assertEquals(pipelineSelector.getNode2PipelineMap() + .getPipelines(dn.getUuid()).size(), 0); + } + } + + @Test + public void testPipelineCloseWithOpenContainer() throws IOException, + TimeoutException, InterruptedException { + NavigableSet<ContainerID> setOpen = stateMap.getOpenContainerIDsByPipeline( + ratisContainer2.getPipeline().getPipelineName()); + Assert.assertEquals(1, setOpen.size()); + + long cId2 = ratisContainer2.getContainerInfo().getContainerID(); + mapping + .updateContainerState(cId2, HddsProtos.LifeCycleEvent.CREATE); + mapping + .updateContainerState(cId2, HddsProtos.LifeCycleEvent.CREATED); + pipelineSelector.finalizePipeline(ratisContainer2.getPipeline()); + Assert.assertEquals(ratisContainer2.getPipeline().getLifeCycleState(), + HddsProtos.LifeCycleState.CLOSING); + Pipeline pipeline2 = pipelineSelector + .getPipeline(ratisContainer2.getPipeline().getPipelineName(), + ratisContainer2.getContainerInfo().getReplicationType()); + Assert.assertEquals(pipeline2.getLifeCycleState(), + HddsProtos.LifeCycleState.CLOSING); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java index a878627..65bd036 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.scm; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -115,7 +116,8 @@ public class TestContainerSQLCli { cluster.getStorageContainerManager().stop(); nodeManager = cluster.getStorageContainerManager().getScmNodeManager(); - mapping = new ContainerMapping(conf, nodeManager, 128); + mapping = new ContainerMapping(conf, nodeManager, 128, + new EventQueue()); blockManager = new BlockManagerImpl(conf, nodeManager, mapping, null); // blockManager.allocateBlock() will create containers if there is none --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org