HDDS-277. PipelineStateMachine should handle closure of pipelines in SCM. 
Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fd31cb6c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fd31cb6c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fd31cb6c

Branch: refs/heads/YARN-3409
Commit: fd31cb6cfeef0c7e9bb0a054cb0f78853df8976f
Parents: be150a1
Author: Xiaoyu Yao <x...@apache.org>
Authored: Thu Jul 26 13:15:27 2018 -0700
Committer: Xiaoyu Yao <x...@apache.org>
Committed: Thu Jul 26 13:15:55 2018 -0700

----------------------------------------------------------------------
 .../container/common/helpers/ContainerInfo.java |   7 +-
 .../container/CloseContainerEventHandler.java   |  28 ++--
 .../hdds/scm/container/ContainerMapping.java    |  16 +-
 .../scm/container/ContainerStateManager.java    |  11 ++
 .../scm/container/states/ContainerStateMap.java |   2 +-
 .../hdds/scm/pipelines/Node2PipelineMap.java    |  33 ++--
 .../hdds/scm/pipelines/PipelineManager.java     |  31 ++--
 .../hdds/scm/pipelines/PipelineSelector.java    |  70 +++++++--
 .../scm/pipelines/ratis/RatisManagerImpl.java   |  14 +-
 .../standalone/StandaloneManagerImpl.java       |  13 +-
 .../scm/server/StorageContainerManager.java     |   2 +-
 .../hadoop/hdds/scm/block/TestBlockManager.java |   4 +-
 .../TestCloseContainerEventHandler.java         |  13 +-
 .../scm/container/TestContainerMapping.java     |   4 +-
 .../container/closer/TestContainerCloser.java   |   4 +-
 .../hdds/scm/node/TestContainerPlacement.java   |   3 +-
 .../hdds/scm/pipeline/TestPipelineClose.java    | 152 +++++++++++++++++++
 .../hadoop/ozone/scm/TestContainerSQLCli.java   |   4 +-
 18 files changed, 331 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
index 4074b21..b194c14 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java
@@ -459,12 +459,13 @@ public class ContainerInfo implements 
Comparator<ContainerInfo>,
 
   /**
    * Check if a container is in open state, this will check if the
-   * container is either open or allocated or creating. Any containers in
-   * these states is managed as an open container by SCM.
+   * container is either open, allocated, creating or creating.
+   * Any containers in these states is managed as an open container by SCM.
    */
   public boolean isContainerOpen() {
     return state == HddsProtos.LifeCycleState.ALLOCATED ||
         state == HddsProtos.LifeCycleState.CREATING ||
-        state == HddsProtos.LifeCycleState.OPEN;
+        state == HddsProtos.LifeCycleState.OPEN ||
+        state == HddsProtos.LifeCycleState.CLOSING;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 859e5d5..949eb13 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -21,7 +21,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
@@ -63,13 +62,13 @@ public class CloseContainerEventHandler implements 
EventHandler<ContainerID> {
           containerManager.getContainerWithPipeline(containerID.getId());
       info = containerWithPipeline.getContainerInfo();
       if (info == null) {
-        LOG.info("Failed to update the container state. Container with id : {} 
"
+        LOG.error("Failed to update the container state. Container with id : 
{} "
             + "does not exist", containerID.getId());
         return;
       }
     } catch (IOException e) {
-      LOG.info("Failed to update the container state. Container with id : {} "
-          + "does not exist", containerID.getId());
+      LOG.error("Failed to update the container state. Container with id : {} "
+          + "does not exist", containerID.getId(), e);
       return;
     }
 
@@ -85,11 +84,22 @@ public class CloseContainerEventHandler implements 
EventHandler<ContainerID> {
       try {
         // Finalize event will make sure the state of the container transitions
         // from OPEN to CLOSING in containerStateManager.
-        containerManager.getStateManager()
-            .updateContainerState(info, HddsProtos.LifeCycleEvent.FINALIZE);
-      } catch (SCMException ex) {
-        LOG.error("Failed to update the container state for container : {}"
-            + containerID);
+        containerManager.updateContainerState(containerID.getId(),
+            HddsProtos.LifeCycleEvent.FINALIZE);
+      } catch (IOException ex) {
+        LOG.error("Failed to update the container state to FINALIZE for"
+            + "container : {}" + containerID, ex);
+      }
+    } else if (info.getState() == HddsProtos.LifeCycleState.ALLOCATED) {
+      try {
+        // Create event will make sure the state of the container transitions
+        // from OPEN to CREATING in containerStateManager, this will move
+        // the container out of active allocation path.
+        containerManager.updateContainerState(containerID.getId(),
+            HddsProtos.LifeCycleEvent.CREATE);
+      } catch (IOException ex) {
+        LOG.error("Failed to update the container state to CREATE for"
+            + "container:{}" + containerID, ex);
       }
     } else {
       LOG.info("container with id : {} is in {} state and need not be closed.",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index e17fe3d..d84551a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.lease.Lease;
 import org.apache.hadoop.ozone.lease.LeaseException;
@@ -104,7 +105,7 @@ public class ContainerMapping implements Mapping {
   @SuppressWarnings("unchecked")
   public ContainerMapping(
       final Configuration conf, final NodeManager nodeManager, final int
-      cacheSizeMB) throws IOException {
+      cacheSizeMB, EventPublisher eventPublisher) throws IOException {
     this.nodeManager = nodeManager;
     this.cacheSize = cacheSizeMB;
     this.closer = new ContainerCloser(nodeManager, conf);
@@ -122,14 +123,15 @@ public class ContainerMapping implements Mapping {
 
     this.lock = new ReentrantLock();
 
-    this.pipelineSelector = new PipelineSelector(nodeManager, conf);
-
     // To be replaced with code getStorageSize once it is committed.
     size = conf.getLong(OZONE_SCM_CONTAINER_SIZE_GB,
         OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024 * 1024 * 1024;
     this.containerStateManager =
         new ContainerStateManager(conf, this);
 
+    this.pipelineSelector = new PipelineSelector(nodeManager,
+        containerStateManager, conf, eventPublisher);
+
     this.containerCloseThreshold = conf.getFloat(
         ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD,
         ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
@@ -372,6 +374,12 @@ public class ContainerMapping implements Mapping {
       // Like releasing the lease in case of BEGIN_CREATE.
       ContainerInfo updatedContainer = containerStateManager
           .updateContainerState(containerInfo, event);
+      if (!updatedContainer.isContainerOpen()) {
+        Pipeline pipeline = pipelineSelector
+            .getPipeline(containerInfo.getPipelineName(),
+                containerInfo.getReplicationType());
+        pipelineSelector.closePipelineIfNoOpenContainers(pipeline);
+      }
       containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
       return updatedContainer.getState();
     } catch (LeaseException e) {
@@ -446,7 +454,7 @@ public class ContainerMapping implements Mapping {
         .getPipeline(containerInfo.getPipelineName(),
             containerInfo.getReplicationType());
     if (pipeline == null) {
-      pipelineSelector
+      pipeline = pipelineSelector
           .getReplicationPipeline(containerInfo.getReplicationType(),
               containerInfo.getReplicationFactor());
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index b2431dc..f0ab213 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -463,6 +463,17 @@ public class ContainerStateManager implements Closeable {
   }
 
   /**
+   * Returns a set of open ContainerIDs that reside on a pipeline.
+   *
+   * @param pipeline Pipeline of the Containers.
+   * @return Set of containers that match the specific query parameters.
+   */
+  public NavigableSet<ContainerID> getMatchingContainerIDsByPipeline(String
+      pipeline) {
+    return containers.getOpenContainerIDsByPipeline(pipeline);
+  }
+
+  /**
    * Returns the containerInfo with pipeline for the given container id.
    * @param selector -- Pipeline selector class.
    * @param containerID id of the container

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 46fe2ab..b358b7c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -346,7 +346,7 @@ public class ContainerStateMap {
     }
     // In case the container is set to closed state, it needs to be removed 
from
     // the pipeline Map.
-    if (newState == LifeCycleState.CLOSED) {
+    if (!info.isContainerOpen()) {
       openPipelineMap.remove(info.getPipelineName(), id);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
index 2e89616..b860082 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
@@ -102,18 +102,27 @@ public class Node2PipelineMap {
         Collections.unmodifiableSet(v));
   }
 
-/**
- * Adds a pipeline entry to a given dataNode in the map.
- * @param pipeline Pipeline to be added
- */
- public synchronized void addPipeline(Pipeline pipeline) throws SCMException {
-   for (DatanodeDetails details : pipeline.getDatanodes().values()) {
-     UUID dnId = details.getUuid();
-     dn2PipelineMap
-         .computeIfAbsent(dnId,k->Collections.synchronizedSet(new HashSet<>()))
-         .add(pipeline);
-   }
- }
+  /**
+   * Adds a pipeline entry to a given dataNode in the map.
+   * @param pipeline Pipeline to be added
+   */
+  public synchronized void addPipeline(Pipeline pipeline) {
+    for (DatanodeDetails details : pipeline.getDatanodes().values()) {
+      UUID dnId = details.getUuid();
+      dn2PipelineMap
+          .computeIfAbsent(dnId,
+              k -> Collections.synchronizedSet(new HashSet<>()))
+          .add(pipeline);
+    }
+  }
+
+  public synchronized void removePipeline(Pipeline pipeline) {
+    for (DatanodeDetails details : pipeline.getDatanodes().values()) {
+      UUID dnId = details.getUuid();
+      dn2PipelineMap.computeIfPresent(dnId,
+          (k, v) -> {v.remove(pipeline); return v;});
+    }
+  }
 
   public Map<UUID, Set<Pipeline>> getDn2PipelineMap() {
     return Collections.unmodifiableMap(dn2PipelineMap);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
index 77d8211..266b1f3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -38,14 +38,14 @@ public abstract class PipelineManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(PipelineManager.class);
   private final List<Pipeline> activePipelines;
-  private final Map<String, Pipeline> activePipelineMap;
+  private final Map<String, Pipeline> pipelineMap;
   private final AtomicInteger pipelineIndex;
   private final Node2PipelineMap node2PipelineMap;
 
   public PipelineManager(Node2PipelineMap map) {
     activePipelines = new LinkedList<>();
     pipelineIndex = new AtomicInteger(0);
-    activePipelineMap = new WeakHashMap<>();
+    pipelineMap = new WeakHashMap<>();
     node2PipelineMap = map;
   }
 
@@ -85,8 +85,8 @@ public abstract class PipelineManager {
     Pipeline pipeline = null;
 
     // 1. Check if pipeline already exists
-    if (activePipelineMap.containsKey(pipelineName)) {
-      pipeline = activePipelineMap.get(pipelineName);
+    if (pipelineMap.containsKey(pipelineName)) {
+      pipeline = pipelineMap.get(pipelineName);
       LOG.debug("Returning pipeline for pipelineName:{}", pipelineName);
       return pipeline;
     } else {
@@ -115,11 +115,6 @@ public abstract class PipelineManager {
    */
   public abstract void initializePipeline(Pipeline pipeline) throws 
IOException;
 
-  public void removePipeline(Pipeline pipeline) {
-    activePipelines.remove(pipeline);
-    activePipelineMap.remove(pipeline.getPipelineName());
-  }
-
   /**
    * Find a Pipeline that is operational.
    *
@@ -172,16 +167,28 @@ public abstract class PipelineManager {
               + "replicationType:{} replicationFactor:{}",
           pipeline.getPipelineName(), replicationType, replicationFactor);
       activePipelines.add(pipeline);
-      activePipelineMap.put(pipeline.getPipelineName(), pipeline);
+      pipelineMap.put(pipeline.getPipelineName(), pipeline);
       node2PipelineMap.addPipeline(pipeline);
     }
     return pipeline;
   }
 
   /**
-   * Close the  pipeline with the given clusterId.
+   * Remove the pipeline from active allocation
+   * @param pipeline pipeline to be finalized
+   */
+  public synchronized void finalizePipeline(Pipeline pipeline) {
+    activePipelines.remove(pipeline);
+  }
+
+  /**
+   *
+   * @param pipeline
    */
-  public abstract void closePipeline(String pipelineID) throws IOException;
+  public void closePipeline(Pipeline pipeline) {
+    pipelineMap.remove(pipeline.getPipelineName());
+    node2PipelineMap.removePipeline(pipeline);
+  }
 
   /**
    * list members in the pipeline .

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index b1e1dd0..ebe39d0 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -19,11 +19,14 @@ package org.apache.hadoop.hdds.scm.pipelines;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
@@ -33,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.statemachine
     .InvalidStateTransitionException;
@@ -48,6 +52,7 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.HashSet;
 import java.util.List;
+import java.util.NavigableSet;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -65,6 +70,8 @@ public class PipelineSelector {
   private final ContainerPlacementPolicy placementPolicy;
   private final NodeManager nodeManager;
   private final Configuration conf;
+  private final ContainerStateManager containerStateManager;
+  private final EventPublisher eventPublisher;
   private final RatisManagerImpl ratisManager;
   private final StandaloneManagerImpl standaloneManager;
   private final long containerSize;
@@ -79,9 +86,12 @@ public class PipelineSelector {
    * @param nodeManager - node manager
    * @param conf - Ozone Config
    */
-  public PipelineSelector(NodeManager nodeManager, Configuration conf) {
+  public PipelineSelector(NodeManager nodeManager,
+      ContainerStateManager containerStateManager, Configuration conf,
+      EventPublisher eventPublisher) {
     this.nodeManager = nodeManager;
     this.conf = conf;
+    this.eventPublisher = eventPublisher;
     this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf);
     this.containerSize = OzoneConsts.GB * this.conf.getInt(
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
@@ -99,6 +109,7 @@ public class PipelineSelector {
         ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
         ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
         TimeUnit.MILLISECONDS);
+    this.containerStateManager = containerStateManager;
     pipelineLeaseManager = new LeaseManager<>("PipelineCreation",
         pipelineCreationLeaseTimeout);
     pipelineLeaseManager.start();
@@ -306,15 +317,54 @@ public class PipelineSelector {
   }
 
   /**
-   * Close the  pipeline with the given clusterId.
+   * Finalize a given pipeline.
    */
+  public void finalizePipeline(Pipeline pipeline) throws IOException {
+    PipelineManager manager = getPipelineManager(pipeline.getType());
+    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
+    LOG.debug("Finalizing pipeline. pipelineID: {}", 
pipeline.getPipelineName());
+    // Remove the pipeline from active allocation
+    manager.finalizePipeline(pipeline);
+    updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.FINALIZE);
+    closePipelineIfNoOpenContainers(pipeline);
+  }
 
-  public void closePipeline(ReplicationType replicationType, String
-      pipelineID) throws IOException {
-    PipelineManager manager = getPipelineManager(replicationType);
+  /**
+   * Close a given pipeline.
+   */
+  public void closePipelineIfNoOpenContainers(Pipeline pipeline) throws 
IOException {
+    if (pipeline.getLifeCycleState() != LifeCycleState.CLOSING) {
+      return;
+    }
+    NavigableSet<ContainerID> containerIDS = containerStateManager
+        .getMatchingContainerIDsByPipeline(pipeline.getPipelineName());
+    if (containerIDS.size() == 0) {
+      updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CLOSE);
+      LOG.info("Closing pipeline. pipelineID: {}", pipeline.getPipelineName());
+    }
+  }
+
+  /**
+   * Close a given pipeline.
+   */
+  private void closePipeline(Pipeline pipeline) {
+    PipelineManager manager = getPipelineManager(pipeline.getType());
     Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
-    LOG.debug("Closing pipeline. pipelineID: {}", pipelineID);
-    manager.closePipeline(pipelineID);
+    LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getPipelineName());
+    NavigableSet<ContainerID> containers =
+        containerStateManager
+            .getMatchingContainerIDsByPipeline(pipeline.getPipelineName());
+    Preconditions.checkArgument(containers.size() == 0);
+    manager.closePipeline(pipeline);
+  }
+
+  private void closeContainersByPipeline(Pipeline pipeline) {
+    NavigableSet<ContainerID> containers =
+        containerStateManager
+            .getMatchingContainerIDsByPipeline(pipeline.getPipelineName());
+    for (ContainerID id : containers) {
+      eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, id);
+    }
   }
 
   /**
@@ -352,7 +402,7 @@ public class PipelineSelector {
         node2PipelineMap.getPipelines(dnId);
     for (Pipeline pipeline : pipelineSet) {
       getPipelineManager(pipeline.getType())
-          .removePipeline(pipeline);
+          .closePipeline(pipeline);
     }
     node2PipelineMap.removeDatanode(dnId);
   }
@@ -398,12 +448,12 @@ public class PipelineSelector {
         break;
 
       case FINALIZE:
-        //TODO: cleanup pipeline by closing all the containers on the pipeline
+        closeContainersByPipeline(pipeline);
         break;
 
       case CLOSE:
       case TIMEOUT:
-        // TODO: Release the nodes here when pipelines are destroyed
+        closePipeline(pipeline);
         break;
       default:
         throw new SCMException("Unsupported pipeline LifeCycleEvent.",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
index c726ef6..fdd0605 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
@@ -108,13 +108,15 @@ public class RatisManagerImpl extends PipelineManager {
   }
 
   /**
-   * Close the  pipeline with the given clusterId.
-   *
-   * @param pipelineID
+   * Close the pipeline.
    */
-  @Override
-  public void closePipeline(String pipelineID) throws IOException {
-
+  public void closePipeline(Pipeline pipeline) {
+    super.closePipeline(pipeline);
+    for (DatanodeDetails node : pipeline.getMachines()) {
+      // A node should always be the in ratis members list.
+      Preconditions.checkArgument(ratisMembers.remove(node));
+    }
+    //TODO: should the raft ring also be destroyed as well?
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
index bb4951f..0506e59 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -101,13 +101,14 @@ public class StandaloneManagerImpl extends 
PipelineManager {
   }
 
   /**
-   * Close the  pipeline with the given clusterId.
-   *
-   * @param pipelineID
+   * Close the pipeline.
    */
-  @Override
-  public void closePipeline(String pipelineID) throws IOException {
-
+  public void closePipeline(Pipeline pipeline) {
+    super.closePipeline(pipeline);
+    for (DatanodeDetails node : pipeline.getMachines()) {
+      // A node should always be the in standalone members list.
+      Preconditions.checkArgument(standAloneMembers.remove(node));
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 165805f..be8fb43 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -192,7 +192,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     scmNodeManager = new SCMNodeManager(
         conf, scmStorage.getClusterID(), this, eventQueue);
     scmContainerManager = new ContainerMapping(
-        conf, getScmNodeManager(), cacheSize);
+        conf, getScmNodeManager(), cacheSize, eventQueue);
     scmBlockManager = new BlockManagerImpl(
         conf, getScmNodeManager(), scmContainerManager, eventQueue);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index 06e7420..7049029 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerMapping;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -73,7 +74,8 @@ public class TestBlockManager {
       throw new IOException("Unable to create test directory path");
     }
     nodeManager = new MockNodeManager(true, 10);
-    mapping = new ContainerMapping(conf, nodeManager, 128);
+    mapping =
+        new ContainerMapping(conf, nodeManager, 128, new EventQueue());
     blockManager = new BlockManagerImpl(conf, nodeManager, mapping, null);
     if(conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
         ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT)){

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index 0764b12..543cad3 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -66,7 +66,8 @@ public class TestCloseContainerEventHandler {
     configuration
         .set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
     nodeManager = new MockNodeManager(true, 10);
-    mapping = new ContainerMapping(configuration, nodeManager, 128);
+    mapping = new ContainerMapping(configuration, nodeManager, 128,
+        new EventQueue());
     eventQueue = new EventQueue();
     eventQueue.addHandler(CLOSE_CONTAINER,
         new CloseContainerEventHandler(mapping));
@@ -122,12 +123,7 @@ public class TestCloseContainerEventHandler {
     // state, so firing close container event should not queue CLOSE
     // command in the Datanode
     Assert.assertEquals(0, nodeManager.getCommandCount(datanode));
-    // Make sure the information is logged
-    Assert.assertTrue(logCapturer.getOutput().contains(
-        "container with id : " + id.getId()
-            + " is in ALLOCATED state and need not be closed"));
     //Execute these state transitions so that we can close the container.
-    mapping.updateContainerState(id.getId(), CREATE);
     mapping.updateContainerState(id.getId(), CREATED);
     eventQueue.fireEvent(CLOSE_CONTAINER,
         new ContainerID(
@@ -164,12 +160,7 @@ public class TestCloseContainerEventHandler {
       Assert.assertEquals(closeCount[i], nodeManager.getCommandCount(details));
       i++;
     }
-    // Make sure the information is logged
-    Assert.assertTrue(logCapturer.getOutput().contains(
-        "container with id : " + id.getId()
-            + " is in ALLOCATED state and need not be closed"));
     //Execute these state transitions so that we can close the container.
-    mapping.updateContainerState(id.getId(), CREATE);
     mapping.updateContainerState(id.getId(), CREATED);
     eventQueue.fireEvent(CLOSE_CONTAINER, id);
     eventQueue.processAll(1000);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java
index 79ac9cf..6269514 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -84,7 +85,8 @@ public class TestContainerMapping {
       throw new IOException("Unable to create test directory path");
     }
     nodeManager = new MockNodeManager(true, 10);
-    mapping = new ContainerMapping(conf, nodeManager, 128);
+    mapping = new ContainerMapping(conf, nodeManager, 128,
+        new EventQueue());
     xceiverClientManager = new XceiverClientManager(conf);
     random = new Random();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java
index cc25544..0c0f25d 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/closer/TestContainerCloser.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -79,7 +80,8 @@ public class TestContainerCloser {
     configuration.set(OzoneConfigKeys.OZONE_METADATA_DIRS,
         testDir.getAbsolutePath());
     nodeManager = new MockNodeManager(true, 10);
-    mapping = new ContainerMapping(configuration, nodeManager, 128);
+    mapping = new ContainerMapping(configuration, nodeManager, 128,
+        new EventQueue());
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 2fef620..52963c0 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -105,9 +105,10 @@ public class TestContainerPlacement {
 
   ContainerMapping createContainerManager(Configuration config,
       NodeManager scmNodeManager) throws IOException {
+    EventQueue eventQueue = new EventQueue();
     final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
         OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
-    return new ContainerMapping(config, scmNodeManager, cacheSize);
+    return new ContainerMapping(config, scmNodeManager, cacheSize, eventQueue);
 
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
new file mode 100644
index 0000000..24e25ab
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerMapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.NavigableSet;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
+    .ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
+    .ReplicationType.RATIS;
+
+public class TestPipelineClose {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static StorageContainerManager scm;
+  private static ContainerWithPipeline ratisContainer1;
+  private static ContainerWithPipeline ratisContainer2;
+  private static ContainerStateMap stateMap;
+  private static ContainerMapping mapping;
+  private static PipelineSelector pipelineSelector;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(6).build();
+    cluster.waitForClusterToBeReady();
+    scm = cluster.getStorageContainerManager();
+    mapping = (ContainerMapping)scm.getScmContainerManager();
+    stateMap = mapping.getStateManager().getContainerStateMap();
+    ratisContainer1 = mapping.allocateContainer(RATIS, THREE, "testOwner");
+    ratisContainer2 = mapping.allocateContainer(RATIS, THREE, "testOwner");
+    pipelineSelector = mapping.getPipelineSelector();
+    // At this stage, there should be 2 pipeline one with 1 open container 
each.
+    // Try closing the both the pipelines, one with a closed container and
+    // the other with an open container.
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+
+  @Test
+  public void testPipelineCloseWithClosedContainer() throws IOException {
+    NavigableSet<ContainerID> set = stateMap.getOpenContainerIDsByPipeline(
+        ratisContainer1.getPipeline().getPipelineName());
+
+    long cId = ratisContainer1.getContainerInfo().getContainerID();
+    Assert.assertEquals(1, set.size());
+    Assert.assertEquals(cId, set.first().getId());
+
+    // Now close the container and it should not show up while fetching
+    // containers by pipeline
+    mapping
+        .updateContainerState(cId, HddsProtos.LifeCycleEvent.CREATE);
+    mapping
+        .updateContainerState(cId, HddsProtos.LifeCycleEvent.CREATED);
+    mapping
+        .updateContainerState(cId, HddsProtos.LifeCycleEvent.FINALIZE);
+    mapping
+        .updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE);
+
+    NavigableSet<ContainerID> setClosed = 
stateMap.getOpenContainerIDsByPipeline(
+        ratisContainer1.getPipeline().getPipelineName());
+    Assert.assertEquals(0, setClosed.size());
+
+    pipelineSelector.finalizePipeline(ratisContainer1.getPipeline());
+    Pipeline pipeline1 = pipelineSelector
+        .getPipeline(ratisContainer1.getPipeline().getPipelineName(),
+            ratisContainer1.getContainerInfo().getReplicationType());
+    Assert.assertNull(pipeline1);
+    Assert.assertEquals(ratisContainer1.getPipeline().getLifeCycleState(),
+        HddsProtos.LifeCycleState.CLOSED);
+    for (DatanodeDetails dn : ratisContainer1.getPipeline().getMachines()) {
+      // Assert that the pipeline has been removed from Node2PipelineMap as 
well
+      Assert.assertEquals(pipelineSelector.getNode2PipelineMap()
+          .getPipelines(dn.getUuid()).size(), 0);
+    }
+  }
+
+  @Test
+  public void testPipelineCloseWithOpenContainer() throws IOException,
+      TimeoutException, InterruptedException {
+    NavigableSet<ContainerID> setOpen = stateMap.getOpenContainerIDsByPipeline(
+        ratisContainer2.getPipeline().getPipelineName());
+    Assert.assertEquals(1, setOpen.size());
+
+    long cId2 = ratisContainer2.getContainerInfo().getContainerID();
+    mapping
+        .updateContainerState(cId2, HddsProtos.LifeCycleEvent.CREATE);
+    mapping
+        .updateContainerState(cId2, HddsProtos.LifeCycleEvent.CREATED);
+    pipelineSelector.finalizePipeline(ratisContainer2.getPipeline());
+    Assert.assertEquals(ratisContainer2.getPipeline().getLifeCycleState(),
+        HddsProtos.LifeCycleState.CLOSING);
+    Pipeline pipeline2 = pipelineSelector
+        .getPipeline(ratisContainer2.getPipeline().getPipelineName(),
+            ratisContainer2.getContainerInfo().getReplicationType());
+    Assert.assertEquals(pipeline2.getLifeCycleState(),
+        HddsProtos.LifeCycleState.CLOSING);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd31cb6c/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
index a878627..65bd036 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.scm;
 
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -115,7 +116,8 @@ public class TestContainerSQLCli {
     cluster.getStorageContainerManager().stop();
 
     nodeManager = cluster.getStorageContainerManager().getScmNodeManager();
-    mapping = new ContainerMapping(conf, nodeManager, 128);
+    mapping = new ContainerMapping(conf, nodeManager, 128,
+        new EventQueue());
     blockManager = new BlockManagerImpl(conf, nodeManager, mapping, null);
 
     // blockManager.allocateBlock() will create containers if there is none


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to