HDDS-656. Add logic for pipeline report and action processing in new pipeline 
code. Contributed by Lokesh Jain.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/64a43c92
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/64a43c92
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/64a43c92

Branch: refs/heads/HDDS-4
Commit: 64a43c92c2133f3b9a317dcc4f0391ad6b604194
Parents: 5331387
Author: Nandakumar <na...@apache.org>
Authored: Wed Oct 17 13:56:54 2018 +0530
Committer: Nandakumar <na...@apache.org>
Committed: Wed Oct 17 13:57:38 2018 +0530

----------------------------------------------------------------------
 .../hadoop/hdds/scm/XceiverClientRatis.java     |  19 ++
 .../hadoop/hdds/scm/pipeline/Pipeline.java      | 103 ++++++---
 hadoop-hdds/common/src/main/proto/hdds.proto    |   1 +
 .../scm/pipeline/PipelineActionHandler.java     |  66 ++++++
 .../hdds/scm/pipeline/PipelineFactory.java      |   7 +-
 .../hdds/scm/pipeline/PipelineManager.java      |  13 +-
 .../scm/pipeline/PipelineReportHandler.java     | 104 +++++++++
 .../hdds/scm/pipeline/PipelineStateManager.java | 135 +++---------
 .../hdds/scm/pipeline/PipelineStateMap.java     |  76 ++++---
 .../scm/pipeline/RatisPipelineProvider.java     |  26 ++-
 .../hdds/scm/pipeline/SCMPipelineManager.java   |  81 ++++---
 .../scm/pipeline/SimplePipelineProvider.java    |   6 +-
 .../org/apache/hadoop/hdds/scm/TestUtils.java   |  17 ++
 .../scm/pipeline/TestPipelineStateManager.java  | 209 ++++++++++++++-----
 .../scm/pipeline/TestRatisPipelineProvider.java |  18 +-
 .../scm/pipeline/TestSCMPipelineManager.java    | 187 +++++++++++++++++
 .../pipeline/TestSimplePipelineProvider.java    |  16 +-
 17 files changed, 804 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 4efe7ba..45e9d6e 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm;
 
 import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.thirdparty.com.google.protobuf
@@ -73,6 +74,24 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
         retryPolicy);
   }
 
+  public static XceiverClientRatis newXceiverClientRatis(
+      org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
+      Configuration ozoneConf) {
+    final String rpcType = ozoneConf
+        .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
+            ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+    final int maxOutstandingRequests =
+        HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
+    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
+    Pipeline pipeline1 =
+        new Pipeline(pipeline.getNodes().get(0).getUuidString(),
+            HddsProtos.LifeCycleState.OPEN, pipeline.getType(),
+            pipeline.getFactor(), 
PipelineID.valueOf(pipeline.getID().getId()));
+    return new XceiverClientRatis(pipeline1,
+        SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests,
+        retryPolicy);
+  }
+
   private final Pipeline pipeline;
   private final RpcType rpcType;
   private final AtomicReference<RaftClient> client = new AtomicReference<>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index b58a001..b22a0c6 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -23,12 +23,14 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
@@ -40,17 +42,17 @@ public final class Pipeline {
   private final ReplicationType type;
   private final ReplicationFactor factor;
 
-  private LifeCycleState state;
-  private List<DatanodeDetails> nodes;
+  private PipelineState state;
+  private Map<DatanodeDetails, Long> nodeStatus;
 
   private Pipeline(PipelineID id, ReplicationType type,
-      ReplicationFactor factor, LifeCycleState state,
-      List<DatanodeDetails> nodes) {
+      ReplicationFactor factor, PipelineState state,
+      Map<DatanodeDetails, Long> nodeStatus) {
     this.id = id;
     this.type = type;
     this.factor = factor;
     this.state = state;
-    this.nodes = nodes;
+    this.nodeStatus = nodeStatus;
   }
 
   /**
@@ -85,36 +87,68 @@ public final class Pipeline {
    *
    * @return - LifeCycleStates.
    */
-  public LifeCycleState getLifeCycleState() {
+  PipelineState getPipelineState() {
+    // TODO: See if we need to expose this.
     return state;
   }
 
+  public boolean isClosed() {
+    return state == PipelineState.CLOSED;
+  }
+
+  public boolean isOpen() {
+    return state == PipelineState.OPEN;
+  }
+
+  void reportDatanode(DatanodeDetails dn) throws IOException {
+    if (nodeStatus.get(dn) == null) {
+      throw new IOException(
+          String.format("Datanode=%s not part of pipeline=%s", dn, id));
+    }
+    nodeStatus.put(dn, System.currentTimeMillis());
+  }
+
+  boolean isHealthy() {
+    for (Long reportedTime : nodeStatus.values()) {
+      if (reportedTime < 0) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   /**
    * Returns the list of nodes which form this pipeline.
    *
    * @return List of DatanodeDetails
    */
   public List<DatanodeDetails> getNodes() {
-    return new ArrayList<>(nodes);
+    return new ArrayList<>(nodeStatus.keySet());
   }
 
   public HddsProtos.Pipeline getProtobufMessage() {
-    HddsProtos.Pipeline.Builder builder = HddsProtos.Pipeline.newBuilder();
-    builder.setId(id.getProtobuf());
-    builder.setType(type);
-    builder.setState(state);
-    builder.addAllMembers(nodes.stream().map(
-        DatanodeDetails::getProtoBufMessage).collect(Collectors.toList()));
+    HddsProtos.Pipeline.Builder builder = HddsProtos.Pipeline.newBuilder()
+        .setId(id.getProtobuf())
+        .setType(type)
+        .setFactor(factor)
+        .setLeaderID("")
+        .addAllMembers(nodeStatus.keySet().stream()
+            .map(DatanodeDetails::getProtoBufMessage)
+            .collect(Collectors.toList()));
     return builder.build();
   }
 
   public static Pipeline fromProtobuf(HddsProtos.Pipeline pipeline) {
-    return new Pipeline(PipelineID.getFromProtobuf(pipeline.getId()),
-        pipeline.getType(), pipeline.getFactor(), pipeline.getState(),
-        
pipeline.getMembersList().stream().map(DatanodeDetails::getFromProtoBuf)
-            .collect(Collectors.toList()));
+    return new Builder().setId(PipelineID.getFromProtobuf(pipeline.getId()))
+        .setFactor(pipeline.getFactor())
+        .setType(pipeline.getType())
+        .setState(PipelineState.ALLOCATED)
+        .setNodes(pipeline.getMembersList().stream()
+            
.map(DatanodeDetails::getFromProtoBuf).collect(Collectors.toList()))
+        .build();
   }
 
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -131,7 +165,7 @@ public final class Pipeline {
         .append(type, that.type)
         .append(factor, that.factor)
         .append(state, that.state)
-        .append(nodes, that.nodes)
+        .append(nodeStatus, that.nodeStatus)
         .isEquals();
   }
 
@@ -142,7 +176,7 @@ public final class Pipeline {
         .append(type)
         .append(factor)
         .append(state)
-        .append(nodes)
+        .append(nodeStatus)
         .toHashCode();
   }
 
@@ -161,17 +195,17 @@ public final class Pipeline {
     private PipelineID id = null;
     private ReplicationType type = null;
     private ReplicationFactor factor = null;
-    private LifeCycleState state = null;
-    private List<DatanodeDetails> nodes = null;
+    private PipelineState state = null;
+    private Map<DatanodeDetails, Long> nodeStatus = null;
 
     public Builder() {}
 
     public Builder(Pipeline pipeline) {
-      this.id = pipeline.getID();
-      this.type = pipeline.getType();
-      this.factor = pipeline.getFactor();
-      this.state = pipeline.getLifeCycleState();
-      this.nodes = pipeline.getNodes();
+      this.id = pipeline.id;
+      this.type = pipeline.type;
+      this.factor = pipeline.factor;
+      this.state = pipeline.state;
+      this.nodeStatus = pipeline.nodeStatus;
     }
 
     public Builder setId(PipelineID id1) {
@@ -189,13 +223,14 @@ public final class Pipeline {
       return this;
     }
 
-    public Builder setState(LifeCycleState state1) {
+    public Builder setState(PipelineState state1) {
       this.state = state1;
       return this;
     }
 
-    public Builder setNodes(List<DatanodeDetails> nodes1) {
-      this.nodes = nodes1;
+    public Builder setNodes(List<DatanodeDetails> nodes) {
+      this.nodeStatus = new LinkedHashMap<>();
+      nodes.forEach(node -> nodeStatus.put(node, -1L));
       return this;
     }
 
@@ -204,8 +239,12 @@ public final class Pipeline {
       Preconditions.checkNotNull(type);
       Preconditions.checkNotNull(factor);
       Preconditions.checkNotNull(state);
-      Preconditions.checkNotNull(nodes);
-      return new Pipeline(id, type, factor, state, nodes);
+      Preconditions.checkNotNull(nodeStatus);
+      return new Pipeline(id, type, factor, state, nodeStatus);
     }
   }
+
+  enum PipelineState {
+    ALLOCATED, OPEN, CLOSED
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/common/src/main/proto/hdds.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto 
b/hadoop-hdds/common/src/main/proto/hdds.proto
index dedc57b..6525134 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -47,6 +47,7 @@ message PipelineID {
 message Pipeline {
     required string leaderID = 1;
     repeated DatanodeDetailsProto members = 2;
+    // TODO: remove the state and leaderID from this class
     optional LifeCycleState state = 3 [default = OPEN];
     optional ReplicationType type = 4 [default = STAND_ALONE];
     optional ReplicationFactor factor = 5 [default = ONE];

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
new file mode 100644
index 0000000..a44ce9d
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineAction;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .PipelineActionsFromDatanode;
+
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Handles pipeline actions from datanode.
+ */
+public class PipelineActionHandler implements
+    EventHandler<PipelineActionsFromDatanode> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(
+      PipelineActionHandler.class);
+
+  private final PipelineManager pipelineManager;
+
+  public PipelineActionHandler(PipelineManager pipelineManager) {
+    this.pipelineManager = pipelineManager;
+  }
+
+  @Override
+  public void onMessage(PipelineActionsFromDatanode report,
+      EventPublisher publisher) {
+    for (PipelineAction action : report.getReport().getPipelineActionsList()) {
+      if (action.getAction() == PipelineAction.Action.CLOSE) {
+        PipelineID pipelineID = null;
+        try {
+          pipelineID = PipelineID.
+              getFromProtobuf(action.getClosePipeline().getPipelineID());
+          pipelineManager.finalizePipeline(pipelineID);
+        } catch (IOException ioe) {
+          LOG.error("Could not execute pipeline action={} pipeline={} {}",
+              action, pipelineID, ioe);
+        }
+      } else {
+        LOG.error("unknown pipeline action:{}" + action.getAction());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 0265ff2..261c544 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.pipeline;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -35,13 +36,13 @@ public final class PipelineFactory {
 
   private Map<ReplicationType, PipelineProvider> providers;
 
-  PipelineFactory(NodeManager nodeManager,
-      PipelineStateManager stateManager) {
+  PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager,
+      Configuration conf) {
     providers = new HashMap<>();
     providers.put(ReplicationType.STAND_ALONE,
         new SimplePipelineProvider(nodeManager));
     providers.put(ReplicationType.RATIS,
-        new RatisPipelineProvider(nodeManager, stateManager));
+        new RatisPipelineProvider(nodeManager, stateManager, conf));
   }
 
   public Pipeline create(ReplicationType type, ReplicationFactor factor)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 2d8cae3..51f9e86 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -41,18 +41,25 @@ public interface PipelineManager extends Closeable {
 
   Pipeline getPipeline(PipelineID pipelineID) throws IOException;
 
+  List<Pipeline> getPipelinesByType(ReplicationType type);
+
+  List<Pipeline> getPipelinesByTypeAndFactor(ReplicationType type,
+      ReplicationFactor factor);
+
   void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID)
       throws IOException;
 
-  void removeContainerFromPipeline(PipelineID pipelineID, ContainerID 
containerID)
-      throws IOException;
+  void removeContainerFromPipeline(PipelineID pipelineID,
+      ContainerID containerID) throws IOException;
 
   Set<ContainerID> getContainersInPipeline(PipelineID pipelineID)
       throws IOException;
 
+  int getNumberOfContainers(PipelineID pipelineID) throws IOException;
+
   void finalizePipeline(PipelineID pipelineID) throws IOException;
 
-  void closePipeline(PipelineID pipelineId) throws IOException;
+  void openPipeline(PipelineID pipelineId) throws IOException;
 
   void removePipeline(PipelineID pipelineID) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
new file mode 100644
index 0000000..ad11b47
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineReport;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.server
+    .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Handles Pipeline Reports from datanode.
+ */
+public class PipelineReportHandler implements
+    EventHandler<PipelineReportFromDatanode> {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(PipelineReportHandler.class);
+  private final PipelineManager pipelineManager;
+  private final Configuration conf;
+
+  public PipelineReportHandler(PipelineManager pipelineManager,
+      Configuration conf) {
+    Preconditions.checkNotNull(pipelineManager);
+    this.pipelineManager = pipelineManager;
+    this.conf = conf;
+  }
+
+  @Override
+  public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
+      EventPublisher publisher) {
+    Preconditions.checkNotNull(pipelineReportFromDatanode);
+    DatanodeDetails dn = pipelineReportFromDatanode.getDatanodeDetails();
+    PipelineReportsProto pipelineReport =
+        pipelineReportFromDatanode.getReport();
+    Preconditions.checkNotNull(dn, "Pipeline Report is "
+        + "missing DatanodeDetails.");
+    LOGGER.trace("Processing pipeline report for dn: {}", dn);
+    for (PipelineReport report : pipelineReport.getPipelineReportList()) {
+      try {
+        processPipelineReport(report, dn);
+      } catch (IOException e) {
+        LOGGER.error("Could not process pipeline report={} from dn={} {}",
+            report, dn, e);
+      }
+    }
+  }
+
+  private void processPipelineReport(PipelineReport report, DatanodeDetails dn)
+      throws IOException {
+    PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID());
+    Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
+
+    if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
+      pipeline.reportDatanode(dn);
+      if (pipeline.isHealthy()) {
+        // if all the dns have reported, pipeline can be moved to OPEN state
+        pipelineManager.openPipeline(pipelineID);
+      }
+    } else if (pipeline.isClosed()) {
+      int numContainers = pipelineManager.getNumberOfContainers(pipelineID);
+      if (numContainers == 0) {
+        // if all the containers have been closed the pipeline can be destroyed
+        try (XceiverClientRatis client =
+            XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
+          client.destroyPipeline();
+        }
+        // after successfully destroying the pipeline, the pipeline can be
+        // removed from the pipeline manager
+        pipelineManager.removePipeline(pipelineID);
+      }
+    } else {
+      // In OPEN state case just report the datanode
+      pipeline.reportDatanode(dn);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index 9752b5a..8f5f89a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -19,25 +19,16 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
-import org.apache.hadoop.ozone.common.statemachine.StateMachine;
-import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_PIPELINE_STATE;
 
 /**
  * Manages the state of pipelines in SCM. All write operations like pipeline
@@ -52,95 +43,9 @@ class PipelineStateManager {
       org.apache.hadoop.hdds.scm.pipelines.PipelineStateManager.class);
 
   private final PipelineStateMap pipelineStateMap;
-  private final StateMachine<LifeCycleState, LifeCycleEvent> stateMachine;
-  private final LeaseManager<Pipeline> pipelineLeaseManager;
 
   PipelineStateManager(Configuration conf) {
     this.pipelineStateMap = new PipelineStateMap();
-    Set<LifeCycleState> finalStates = new HashSet<>();
-    long pipelineCreationLeaseTimeout = conf.getTimeDuration(
-        ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
-        ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
-        TimeUnit.MILLISECONDS);
-    // TODO: Use LeaseManager for creation of pipelines.
-    // Add pipeline initialization logic.
-    this.pipelineLeaseManager = new LeaseManager<>("PipelineCreation",
-        pipelineCreationLeaseTimeout);
-    this.pipelineLeaseManager.start();
-
-    finalStates.add(LifeCycleState.CLOSED);
-    this.stateMachine = new StateMachine<>(LifeCycleState.ALLOCATED,
-        finalStates);
-    initializeStateMachine();
-  }
-
-
-  /*
-   * Event and State Transition Mapping.
-   *
-   * State: ALLOCATED ---------------> CREATING
-   * Event:                CREATE
-   *
-   * State: CREATING  ---------------> OPEN
-   * Event:               CREATED
-   *
-   * State: OPEN      ---------------> CLOSING
-   * Event:               FINALIZE
-   *
-   * State: CLOSING   ---------------> CLOSED
-   * Event:                CLOSE
-   *
-   * State: CREATING  ---------------> CLOSED
-   * Event:               TIMEOUT
-   *
-   *
-   * Container State Flow:
-   *
-   * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
-   *            (CREATE)     | (CREATED)     (FINALIZE)   |
-   *                         |                            |
-   *                         |                            |
-   *                         |(TIMEOUT)                   |(CLOSE)
-   *                         |                            |
-   *                         +--------> [CLOSED] <--------+
-   */
-
-  /**
-   * Add javadoc.
-   */
-  private void initializeStateMachine() {
-    stateMachine.addTransition(LifeCycleState.ALLOCATED,
-        LifeCycleState.CREATING, LifeCycleEvent.CREATE);
-
-    stateMachine.addTransition(LifeCycleState.CREATING,
-        LifeCycleState.OPEN, LifeCycleEvent.CREATED);
-
-    stateMachine.addTransition(LifeCycleState.OPEN,
-        LifeCycleState.CLOSING, LifeCycleEvent.FINALIZE);
-
-    stateMachine.addTransition(LifeCycleState.CLOSING,
-        LifeCycleState.CLOSED, LifeCycleEvent.CLOSE);
-
-    stateMachine.addTransition(LifeCycleState.CREATING,
-        LifeCycleState.CLOSED, LifeCycleEvent.TIMEOUT);
-  }
-
-  Pipeline updatePipelineState(PipelineID pipelineID, LifeCycleEvent event)
-      throws IOException {
-    Pipeline pipeline = null;
-    try {
-      pipeline = pipelineStateMap.getPipeline(pipelineID);
-      LifeCycleState newState =
-          stateMachine.getNextState(pipeline.getLifeCycleState(), event);
-      return pipelineStateMap.updatePipelineState(pipeline.getID(), newState);
-    } catch (InvalidStateTransitionException ex) {
-      String error = String.format("Failed to update pipeline state %s, "
-              + "reason: invalid state transition from state: %s upon "
-              + "event: %s.", pipeline.getID(), pipeline.getLifeCycleState(),
-          event);
-      LOG.error(error);
-      throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
-    }
   }
 
   void addPipeline(Pipeline pipeline) throws IOException {
@@ -156,14 +61,23 @@ class PipelineStateManager {
     return pipelineStateMap.getPipeline(pipelineID);
   }
 
-  List<Pipeline> getPipelines(HddsProtos.ReplicationType type) {
-    return pipelineStateMap.getPipelines(type);
+  List<Pipeline> getPipelinesByType(ReplicationType type) {
+    return pipelineStateMap.getPipelinesByType(type);
+  }
+
+  List<Pipeline> getPipelinesByTypeAndFactor(ReplicationType type,
+      ReplicationFactor factor) {
+    return pipelineStateMap.getPipelinesByTypeAndFactor(type, factor);
   }
 
   Set<ContainerID> getContainers(PipelineID pipelineID) throws IOException {
     return pipelineStateMap.getContainers(pipelineID);
   }
 
+  int getNumberOfContainers(PipelineID pipelineID) throws IOException {
+    return pipelineStateMap.getNumberOfContainers(pipelineID);
+  }
+
   void removePipeline(PipelineID pipelineID) throws IOException {
     pipelineStateMap.removePipeline(pipelineID);
   }
@@ -173,7 +87,24 @@ class PipelineStateManager {
     pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID);
   }
 
-  void close() {
-    pipelineLeaseManager.shutdown();
+  Pipeline finalizePipeline(PipelineID pipelineId) throws IOException {
+    Pipeline pipeline = pipelineStateMap.getPipeline(pipelineId);
+    if (!pipeline.isClosed()) {
+      pipeline = pipelineStateMap
+          .updatePipelineState(pipelineId, PipelineState.CLOSED);
+    }
+    return pipeline;
+  }
+
+  Pipeline openPipeline(PipelineID pipelineId) throws IOException {
+    Pipeline pipeline = pipelineStateMap.getPipeline(pipelineId);
+    if (pipeline.isClosed()) {
+      throw new IOException("Closed pipeline can not be opened");
+    }
+    if (pipeline.getPipelineState() == PipelineState.ALLOCATED) {
+      pipeline = pipelineStateMap
+          .updatePipelineState(pipelineId, PipelineState.OPEN);
+    }
+    return pipeline;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
index e3f2393..110d26b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
@@ -18,9 +18,10 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +46,7 @@ class PipelineStateMap {
 
   PipelineStateMap() {
 
+    // TODO: Use TreeMap for range operations?
     this.pipelineMap = new HashMap<>();
     this.pipeline2container = new HashMap<>();
 
@@ -86,8 +88,7 @@ class PipelineStateMap {
         "container Id cannot be null");
 
     Pipeline pipeline = getPipeline(pipelineID);
-    // TODO: verify the state we need the pipeline to be in
-    if (!isOpen(pipeline)) {
+    if (!pipeline.isOpen()) {
       throw new IOException(
           String.format("%s is not in open state", pipelineID));
     }
@@ -115,7 +116,7 @@ class PipelineStateMap {
    * @param type - ReplicationType
    * @return List of pipelines which have the specified replication type
    */
-  List<Pipeline> getPipelines(ReplicationType type) {
+  List<Pipeline> getPipelinesByType(ReplicationType type) {
     Preconditions.checkNotNull(type, "Replication type cannot be null");
 
     return pipelineMap.values().stream().filter(p -> p.getType().equals(type))
@@ -123,10 +124,25 @@ class PipelineStateMap {
   }
 
   /**
-   * Get set of containers corresponding to a pipeline.
+   * Get open pipeline corresponding to specified replication type and factor.
+   *
+   * @param type - ReplicationType
+   * @param factor - ReplicationFactor
+   * @return List of open pipelines with specified replication type and factor
+   */
+  List<Pipeline> getPipelinesByTypeAndFactor(ReplicationType type,
+      ReplicationFactor factor) {
+    return pipelineMap.values().stream()
+        .filter(pipeline -> pipeline.isOpen() && pipeline.getType() == type
+            && pipeline.getFactor() == factor)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Get set of containerIDs corresponding to a pipeline.
    *
    * @param pipelineID - PipelineID
-   * @return Set of Containers belonging to the pipeline
+   * @return Set of containerIDs belonging to the pipeline
    * @throws IOException if pipeline is not found
    */
   Set<ContainerID> getContainers(PipelineID pipelineID)
@@ -139,6 +155,21 @@ class PipelineStateMap {
   }
 
   /**
+   * Get number of containers corresponding to a pipeline.
+   *
+   * @param pipelineID - PipelineID
+   * @return Number of containers belonging to the pipeline
+   * @throws IOException if pipeline is not found
+   */
+  int getNumberOfContainers(PipelineID pipelineID) throws IOException {
+    Set<ContainerID> containerIDs = pipeline2container.get(pipelineID);
+    if (containerIDs == null) {
+      throw new IOException(String.format("%s not found", pipelineID));
+    }
+    return containerIDs.size();
+  }
+
+  /**
    * Remove pipeline from the data structures.
    *
    * @param pipelineID - PipelineID of the pipeline to be removed
@@ -147,12 +178,18 @@ class PipelineStateMap {
   void removePipeline(PipelineID pipelineID) throws IOException {
     Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null");
 
-    //TODO: Add a flag which suppresses exception if pipeline does not exist?
-    Set<ContainerID> containerIDs = getContainers(pipelineID);
+    Pipeline pipeline = getPipeline(pipelineID);
+    if (!pipeline.isClosed()) {
+      throw new IOException(
+          String.format("Pipeline with %s is not yet closed", pipelineID));
+    }
+
+    Set<ContainerID> containerIDs = pipeline2container.get(pipelineID);
     if (containerIDs.size() != 0) {
       throw new IOException(
           String.format("Pipeline with %s is not empty", pipelineID));
     }
+
     pipelineMap.remove(pipelineID);
     pipeline2container.remove(pipelineID);
   }
@@ -172,12 +209,8 @@ class PipelineStateMap {
     Preconditions.checkNotNull(containerID,
         "container Id cannot be null");
 
-    Pipeline pipeline = getPipeline(pipelineID);
     Set<ContainerID> containerIDs = pipeline2container.get(pipelineID);
     containerIDs.remove(containerID);
-    if (containerIDs.size() == 0 && isClosingOrClosed(pipeline)) {
-      removePipeline(pipelineID);
-    }
   }
 
   /**
@@ -189,24 +222,13 @@ class PipelineStateMap {
    * @return Pipeline with the updated state
    * @throws IOException if pipeline does not exist
    */
-  Pipeline updatePipelineState(PipelineID pipelineID, LifeCycleState state)
+  Pipeline updatePipelineState(PipelineID pipelineID, PipelineState state)
       throws IOException {
     Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null");
     Preconditions.checkNotNull(state, "Pipeline LifeCycleState cannot be 
null");
 
-    Pipeline pipeline = getPipeline(pipelineID);
-    pipeline = pipelineMap
-        .put(pipelineID, 
Pipeline.newBuilder(pipeline).setState(state).build());
-    // TODO: Verify if need to throw exception for non-existent pipeline
-    return pipeline;
-  }
-
-  private boolean isClosingOrClosed(Pipeline pipeline) {
-    LifeCycleState state = pipeline.getLifeCycleState();
-    return state == LifeCycleState.CLOSING || state == LifeCycleState.CLOSED;
-  }
-
-  private boolean isOpen(Pipeline pipeline) {
-    return pipeline.getLifeCycleState() == LifeCycleState.OPEN;
+    final Pipeline pipeline = getPipeline(pipelineID);
+    return pipelineMap.compute(pipelineID,
+        (id, p) -> Pipeline.newBuilder(pipeline).setState(state).build());
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index b3bed33..400ab24 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -23,11 +23,12 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -44,11 +45,13 @@ public class RatisPipelineProvider implements 
PipelineProvider {
 
   private final NodeManager nodeManager;
   private final PipelineStateManager stateManager;
+  private final Configuration conf;
 
   RatisPipelineProvider(NodeManager nodeManager,
-      PipelineStateManager stateManager) {
+      PipelineStateManager stateManager, Configuration conf) {
     this.nodeManager = nodeManager;
     this.stateManager = stateManager;
+    this.conf = conf;
   }
 
   /**
@@ -90,7 +93,7 @@ public class RatisPipelineProvider implements 
PipelineProvider {
   public Pipeline create(ReplicationFactor factor) throws IOException {
     // Get set of datanodes already used for ratis pipeline
     Set<DatanodeDetails> dnsUsed = new HashSet<>();
-    stateManager.getPipelines(ReplicationType.RATIS)
+    stateManager.getPipelinesByType(ReplicationType.RATIS)
         .forEach(p -> dnsUsed.addAll(p.getNodes()));
 
     // Get list of healthy nodes
@@ -107,13 +110,15 @@ public class RatisPipelineProvider implements 
PipelineProvider {
       throw new IOException(e);
     }
 
-    return Pipeline.newBuilder()
+    Pipeline pipeline = Pipeline.newBuilder()
         .setId(PipelineID.randomId())
-        .setState(LifeCycleState.ALLOCATED)
+        .setState(PipelineState.ALLOCATED)
         .setType(ReplicationType.RATIS)
         .setFactor(factor)
         .setNodes(dns)
         .build();
+    initializePipeline(pipeline);
+    return pipeline;
   }
 
   @Override
@@ -126,10 +131,19 @@ public class RatisPipelineProvider implements 
PipelineProvider {
     }
     return Pipeline.newBuilder()
         .setId(PipelineID.randomId())
-        .setState(LifeCycleState.ALLOCATED)
+        .setState(PipelineState.ALLOCATED)
         .setType(ReplicationType.RATIS)
         .setFactor(factor)
         .setNodes(nodes)
         .build();
   }
+
+  private void initializePipeline(Pipeline pipeline)
+      throws IOException {
+    // TODO: remove old code in XceiverClientRatis#newXceiverClientRatis
+    try (XceiverClientRatis client =
+        XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
+      client.createPipeline();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 3ee5849..6a9c783 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -22,11 +22,12 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.utils.MetadataStore;
 import org.apache.hadoop.utils.MetadataStoreBuilder;
@@ -63,11 +64,14 @@ public class SCMPipelineManager implements PipelineManager {
   private final PipelineStateManager stateManager;
   private final MetadataStore pipelineStore;
 
-  public SCMPipelineManager(Configuration conf, NodeManager nodeManager)
-      throws IOException {
+  private final EventPublisher eventPublisher;
+  private final NodeManager nodeManager;
+
+  public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
+      EventPublisher eventPublisher) throws IOException {
     this.lock = new ReentrantReadWriteLock();
     this.stateManager = new PipelineStateManager(conf);
-    this.pipelineFactory = new PipelineFactory(nodeManager, stateManager);
+    this.pipelineFactory = new PipelineFactory(nodeManager, stateManager, 
conf);
     int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
         OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
     File metaDir = getOzoneMetaDirPath(conf);
@@ -78,8 +82,10 @@ public class SCMPipelineManager implements PipelineManager {
             .setDbFile(pipelineDBPath)
             .setCacheSize(cacheSize * OzoneConsts.MB)
             .build();
-
     initializePipelineState();
+
+    this.eventPublisher = eventPublisher;
+    this.nodeManager = nodeManager;
   }
 
   private void initializePipelineState() throws IOException {
@@ -95,6 +101,8 @@ public class SCMPipelineManager implements PipelineManager {
           
.fromProtobuf(HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
       Preconditions.checkNotNull(pipeline);
       stateManager.addPipeline(pipeline);
+      // TODO: add pipeline to node manager
+      // nodeManager.addPipeline(pipeline);
     }
   }
 
@@ -104,16 +112,10 @@ public class SCMPipelineManager implements 
PipelineManager {
     lock.writeLock().lock();
     try {
       Pipeline pipeline =  pipelineFactory.create(type, factor);
+      pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(),
+          pipeline.getProtobufMessage().toByteArray());
       stateManager.addPipeline(pipeline);
-      try {
-        pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(),
-            pipeline.getProtobufMessage().toByteArray());
-      } catch (IOException ioe) {
-        // if db operation fails we need to revert the pipeline creation in
-        // state manager.
-        stateManager.removePipeline(pipeline.getID());
-        throw ioe;
-      }
+      // TODO: add pipeline to node manager
       return pipeline;
     } finally {
       lock.writeLock().unlock();
@@ -144,6 +146,27 @@ public class SCMPipelineManager implements PipelineManager 
{
   }
 
   @Override
+  public List<Pipeline> getPipelinesByType(ReplicationType type) {
+    lock.readLock().lock();
+    try {
+      return stateManager.getPipelinesByType(type);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public List<Pipeline> getPipelinesByTypeAndFactor(ReplicationType type,
+      ReplicationFactor factor) {
+    lock.readLock().lock();
+    try {
+      return stateManager.getPipelinesByTypeAndFactor(type, factor);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
   public void addContainerToPipeline(PipelineID pipelineID,
       ContainerID containerID) throws IOException {
     lock.writeLock().lock();
@@ -177,27 +200,29 @@ public class SCMPipelineManager implements 
PipelineManager {
   }
 
   @Override
+  public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
+    return stateManager.getNumberOfContainers(pipelineID);
+  }
+
+  @Override
   public void finalizePipeline(PipelineID pipelineId) throws IOException {
     lock.writeLock().lock();
     try {
-      //TODO: close all containers in this pipeline
-      Pipeline pipeline =
-          stateManager.updatePipelineState(pipelineId, 
LifeCycleEvent.FINALIZE);
-      pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(),
-          pipeline.getProtobufMessage().toByteArray());
+      stateManager.finalizePipeline(pipelineId);
+      Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
+      for (ContainerID containerID : containerIDs) {
+        eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
+      }
     } finally {
       lock.writeLock().unlock();
     }
   }
 
   @Override
-  public void closePipeline(PipelineID pipelineId) throws IOException {
+  public void openPipeline(PipelineID pipelineId) throws IOException {
     lock.writeLock().lock();
     try {
-      Pipeline pipeline =
-          stateManager.updatePipelineState(pipelineId, LifeCycleEvent.CLOSE);
-      pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(),
-          pipeline.getProtobufMessage().toByteArray());
+      stateManager.openPipeline(pipelineId);
     } finally {
       lock.writeLock().unlock();
     }
@@ -209,6 +234,7 @@ public class SCMPipelineManager implements PipelineManager {
     try {
       stateManager.removePipeline(pipelineID);
       pipelineStore.delete(pipelineID.getProtobuf().toByteArray());
+      // TODO: remove pipeline from node manager
     } finally {
       lock.writeLock().unlock();
     }
@@ -216,11 +242,8 @@ public class SCMPipelineManager implements PipelineManager 
{
 
   @Override
   public void close() throws IOException {
-    lock.writeLock().lock();
-    try {
-      stateManager.close();
-    } finally {
-      lock.writeLock().unlock();
+    if (pipelineStore != null) {
+      pipelineStore.close();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index 56ffcd0..c95fcfb 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -22,8 +22,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -54,7 +54,7 @@ public class SimplePipelineProvider implements 
PipelineProvider {
     Collections.shuffle(dns);
     return Pipeline.newBuilder()
         .setId(PipelineID.randomId())
-        .setState(LifeCycleState.ALLOCATED)
+        .setState(PipelineState.ALLOCATED)
         .setType(ReplicationType.STAND_ALONE)
         .setFactor(factor)
         .setNodes(dns.subList(0, factor.getNumber()))
@@ -71,7 +71,7 @@ public class SimplePipelineProvider implements 
PipelineProvider {
     }
     return Pipeline.newBuilder()
         .setId(PipelineID.randomId())
-        .setState(LifeCycleState.ALLOCATED)
+        .setState(PipelineState.ALLOCATED)
         .setType(ReplicationType.STAND_ALONE)
         .setFactor(factor)
         .setNodes(nodes)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 24a16c7..21f00cd 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -18,7 +18,11 @@ package org.apache.hadoop.hdds.scm;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineReport;
+import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.server
+    .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.mockito.Mockito;
 import static org.mockito.Mockito.when;
 
@@ -307,6 +311,19 @@ public final class TestUtils {
     return PipelineReportsProto.newBuilder().build();
   }
 
+  public static PipelineReportFromDatanode getRandomPipelineReportFromDatanode(
+      DatanodeDetails dn,
+      org.apache.hadoop.hdds.scm.pipeline.PipelineID... pipelineIDs) {
+    PipelineReportsProto.Builder reportBuilder =
+        PipelineReportsProto.newBuilder();
+    for (org.apache.hadoop.hdds.scm.pipeline.PipelineID pipelineID :
+        pipelineIDs) {
+      reportBuilder.addPipelineReport(
+          PipelineReport.newBuilder().setPipelineID(pipelineID.getProtobuf()));
+    }
+    return new PipelineReportFromDatanode(dn, reportBuilder.build());
+  }
+
   /**
    * Creates container report with the given ContainerInfo(s).
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
index 0d4c461..49fb2bc 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
@@ -48,15 +48,21 @@ public class TestPipelineStateManager {
   }
 
   private Pipeline createDummyPipeline(int numNodes) {
+    return createDummyPipeline(HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.ONE, numNodes);
+  }
+
+  private Pipeline createDummyPipeline(HddsProtos.ReplicationType type,
+      HddsProtos.ReplicationFactor factor, int numNodes) {
     List<DatanodeDetails> nodes = new ArrayList<>();
     for (int i = 0; i < numNodes; i++) {
       nodes.add(TestUtils.randomDatanodeDetails());
     }
     return Pipeline.newBuilder()
-        .setType(HddsProtos.ReplicationType.RATIS)
-        .setFactor(HddsProtos.ReplicationFactor.ONE)
+        .setType(type)
+        .setFactor(factor)
         .setNodes(nodes)
-        .setState(HddsProtos.LifeCycleState.ALLOCATED)
+        .setState(Pipeline.PipelineState.ALLOCATED)
         .setId(PipelineID.randomId())
         .build();
   }
@@ -89,7 +95,7 @@ public class TestPipelineStateManager {
     Assert.assertTrue(pipeline == pipeline1);
 
     // clean up
-    stateManager.removePipeline(pipeline1.getID());
+    removePipeline(pipeline);
   }
 
   @Test
@@ -102,9 +108,63 @@ public class TestPipelineStateManager {
     pipelines.add(pipeline);
     stateManager.addPipeline(pipeline);
 
-    Set<Pipeline> pipelines1 = new HashSet<>(stateManager.getPipelines(
+    Set<Pipeline> pipelines1 = new HashSet<>(stateManager.getPipelinesByType(
         HddsProtos.ReplicationType.RATIS));
     Assert.assertEquals(pipelines, pipelines1);
+    // clean up
+    for (Pipeline pipeline1 : pipelines) {
+      removePipeline(pipeline1);
+    }
+  }
+
+  @Test
+  public void testGetPipelinesByTypeAndFactor() throws IOException {
+    Set<Pipeline> pipelines = new HashSet<>();
+    for (HddsProtos.ReplicationType type : HddsProtos.ReplicationType
+        .values()) {
+      for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
+          .values()) {
+        for (int i = 0; i < 5; i++) {
+          // 5 pipelines in allocated state for each type and factor
+          Pipeline pipeline =
+              createDummyPipeline(type, factor, factor.getNumber());
+          stateManager.addPipeline(pipeline);
+          pipelines.add(pipeline);
+
+          // 5 pipelines in allocated state for each type and factor
+          pipeline = createDummyPipeline(type, factor, factor.getNumber());
+          stateManager.addPipeline(pipeline);
+          stateManager.openPipeline(pipeline.getID());
+          pipelines.add(pipeline);
+
+          // 5 pipelines in allocated state for each type and factor
+          pipeline = createDummyPipeline(type, factor, factor.getNumber());
+          stateManager.addPipeline(pipeline);
+          stateManager.finalizePipeline(pipeline.getID());
+          pipelines.add(pipeline);
+        }
+      }
+    }
+
+    for (HddsProtos.ReplicationType type : HddsProtos.ReplicationType
+        .values()) {
+      for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
+          .values()) {
+        // verify pipelines received
+        List<Pipeline> pipelines1 =
+            stateManager.getPipelinesByTypeAndFactor(type, factor);
+        Assert.assertEquals(5, pipelines1.size());
+        pipelines1.stream().forEach(p -> {
+          Assert.assertEquals(p.getType(), type);
+          Assert.assertEquals(p.getFactor(), factor);
+        });
+      }
+    }
+
+    //clean up
+    for (Pipeline pipeline : pipelines) {
+      removePipeline(pipeline);
+    }
   }
 
   @Test
@@ -115,8 +175,8 @@ public class TestPipelineStateManager {
     pipeline = stateManager.getPipeline(pipeline.getID());
 
     try {
-      stateManager
-          .addContainerToPipeline(pipeline.getID(), 
ContainerID.valueof(++containerID));
+      stateManager.addContainerToPipeline(pipeline.getID(),
+          ContainerID.valueof(++containerID));
       Assert.fail("Container should not have been added");
     } catch (IOException e) {
       // add container possible only in container with open state
@@ -124,16 +184,15 @@ public class TestPipelineStateManager {
     }
 
     // move pipeline to open state
-    updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
-        HddsProtos.LifeCycleEvent.CREATED);
+    stateManager.openPipeline(pipeline.getID());
 
     // add three containers
-    stateManager
-        .addContainerToPipeline(pipeline.getID(), 
ContainerID.valueof(containerID));
-    stateManager
-        .addContainerToPipeline(pipeline.getID(), 
ContainerID.valueof(++containerID));
-    stateManager
-        .addContainerToPipeline(pipeline.getID(), 
ContainerID.valueof(++containerID));
+    stateManager.addContainerToPipeline(pipeline.getID(),
+        ContainerID.valueof(containerID));
+    stateManager.addContainerToPipeline(pipeline.getID(),
+        ContainerID.valueof(++containerID));
+    stateManager.addContainerToPipeline(pipeline.getID(),
+        ContainerID.valueof(++containerID));
 
     //verify the number of containers returned
     Set<ContainerID> containerIDs =
@@ -142,8 +201,8 @@ public class TestPipelineStateManager {
 
     removePipeline(pipeline);
     try {
-      stateManager
-          .addContainerToPipeline(pipeline.getID(), 
ContainerID.valueof(++containerID));
+      stateManager.addContainerToPipeline(pipeline.getID(),
+          ContainerID.valueof(++containerID));
       Assert.fail("Container should not have been added");
     } catch (IOException e) {
       // Can not add a container to removed pipeline
@@ -155,8 +214,8 @@ public class TestPipelineStateManager {
   public void testRemovePipeline() throws IOException {
     Pipeline pipeline = createDummyPipeline(1);
     stateManager.addPipeline(pipeline);
-    updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
-        HddsProtos.LifeCycleEvent.CREATED);
+    // close the pipeline
+    stateManager.openPipeline(pipeline.getID());
     stateManager
         .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(1));
 
@@ -165,6 +224,17 @@ public class TestPipelineStateManager {
       Assert.fail("Pipeline should not have been removed");
     } catch (IOException e) {
       // can not remove a pipeline which already has containers
+      Assert.assertTrue(e.getMessage().contains("not yet closed"));
+    }
+
+    // close the pipeline
+    stateManager.finalizePipeline(pipeline.getID());
+
+    try {
+      stateManager.removePipeline(pipeline.getID());
+      Assert.fail("Pipeline should not have been removed");
+    } catch (IOException e) {
+      // can not remove a pipeline which already has containers
       Assert.assertTrue(e.getMessage().contains("not empty"));
     }
 
@@ -178,64 +248,87 @@ public class TestPipelineStateManager {
     Pipeline pipeline = createDummyPipeline(1);
     // create an open pipeline in stateMap
     stateManager.addPipeline(pipeline);
-    updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
-        HddsProtos.LifeCycleEvent.CREATED);
+    stateManager.openPipeline(pipeline.getID());
 
-    stateManager
-        .addContainerToPipeline(pipeline.getID(), 
ContainerID.valueof(containerID));
-    stateManager
-        .removeContainerFromPipeline(pipeline.getID(), 
ContainerID.valueof(containerID));
-    // removeContainerFromPipeline in open pipeline does not lead to removal 
of pipeline
-    Assert.assertNotNull(stateManager.getPipeline(pipeline.getID()));
+    stateManager.addContainerToPipeline(pipeline.getID(),
+        ContainerID.valueof(containerID));
+    Assert.assertEquals(1, 
stateManager.getContainers(pipeline.getID()).size());
+    stateManager.removeContainerFromPipeline(pipeline.getID(),
+        ContainerID.valueof(containerID));
+    Assert.assertEquals(0, 
stateManager.getContainers(pipeline.getID()).size());
 
     // add two containers in the pipeline
-    stateManager
-        .addContainerToPipeline(pipeline.getID(), 
ContainerID.valueof(++containerID));
-    stateManager
-        .addContainerToPipeline(pipeline.getID(), 
ContainerID.valueof(++containerID));
+    stateManager.addContainerToPipeline(pipeline.getID(),
+        ContainerID.valueof(++containerID));
+    stateManager.addContainerToPipeline(pipeline.getID(),
+        ContainerID.valueof(++containerID));
+    Assert.assertEquals(2, 
stateManager.getContainers(pipeline.getID()).size());
 
     // move pipeline to closing state
-    updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.FINALIZE);
+    stateManager.finalizePipeline(pipeline.getID());
 
-    stateManager
-        .removeContainerFromPipeline(pipeline.getID(), 
ContainerID.valueof(containerID));
-    // removal of second last container in closing or closed pipeline should
-    // not lead to removal of pipeline
-    Assert.assertNotNull(stateManager.getPipeline(pipeline.getID()));
-    stateManager
-        .removeContainerFromPipeline(pipeline.getID(), 
ContainerID.valueof(--containerID));
-    // removal of last container in closing or closed pipeline should lead to
-    // removal of pipeline
-    try {
-      stateManager.getPipeline(pipeline.getID());
-      Assert.fail("getPipeline should have failed.");
-    } catch (IOException e) {
-      Assert.assertTrue(e.getMessage().contains(" not found"));
-    }
+    stateManager.removeContainerFromPipeline(pipeline.getID(),
+        ContainerID.valueof(containerID));
+    stateManager.removeContainerFromPipeline(pipeline.getID(),
+        ContainerID.valueof(--containerID));
+    Assert.assertEquals(0, 
stateManager.getContainers(pipeline.getID()).size());
+
+    // clean up
+    stateManager.removePipeline(pipeline.getID());
   }
 
   @Test
-  public void testUpdatePipelineState() throws IOException {
+  public void testFinalizePipeline() throws IOException {
     Pipeline pipeline = createDummyPipeline(1);
     stateManager.addPipeline(pipeline);
-    updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
-        HddsProtos.LifeCycleEvent.CREATED, HddsProtos.LifeCycleEvent.FINALIZE,
-        HddsProtos.LifeCycleEvent.CLOSE);
+    // finalize on ALLOCATED pipeline
+    stateManager.finalizePipeline(pipeline.getID());
+    Assert.assertEquals(Pipeline.PipelineState.CLOSED,
+        stateManager.getPipeline(pipeline.getID()).getPipelineState());
+    // clean up
+    removePipeline(pipeline);
+
+    pipeline = createDummyPipeline(1);
+    stateManager.addPipeline(pipeline);
+    stateManager.openPipeline(pipeline.getID());
+    // finalize on OPEN pipeline
+    stateManager.finalizePipeline(pipeline.getID());
+    Assert.assertEquals(Pipeline.PipelineState.CLOSED,
+        stateManager.getPipeline(pipeline.getID()).getPipelineState());
+    // clean up
+    removePipeline(pipeline);
 
     pipeline = createDummyPipeline(1);
     stateManager.addPipeline(pipeline);
-    updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
-        HddsProtos.LifeCycleEvent.TIMEOUT);
+    stateManager.openPipeline(pipeline.getID());
+    stateManager.finalizePipeline(pipeline.getID());
+    // finalize should work on already closed pipeline
+    stateManager.finalizePipeline(pipeline.getID());
+    Assert.assertEquals(Pipeline.PipelineState.CLOSED,
+        stateManager.getPipeline(pipeline.getID()).getPipelineState());
+    // clean up
+    removePipeline(pipeline);
   }
 
-  private void updateEvents(PipelineID pipelineID,
-      HddsProtos.LifeCycleEvent... events) throws IOException {
-    for (HddsProtos.LifeCycleEvent event : events) {
-      stateManager.updatePipelineState(pipelineID, event);
-    }
+  @Test
+  public void testOpenPipeline() throws IOException {
+    Pipeline pipeline = createDummyPipeline(1);
+    stateManager.addPipeline(pipeline);
+    // open on ALLOCATED pipeline
+    stateManager.openPipeline(pipeline.getID());
+    Assert.assertEquals(Pipeline.PipelineState.OPEN,
+        stateManager.getPipeline(pipeline.getID()).getPipelineState());
+
+    stateManager.openPipeline(pipeline.getID());
+    // open should work on already open pipeline
+    Assert.assertEquals(Pipeline.PipelineState.OPEN,
+        stateManager.getPipeline(pipeline.getID()).getPipelineState());
+    // clean up
+    removePipeline(pipeline);
   }
 
   private void removePipeline(Pipeline pipeline) throws IOException {
+    stateManager.finalizePipeline(pipeline.getID());
     Set<ContainerID> containerIDs =
         stateManager.getContainers(pipeline.getID());
     for (ContainerID containerID : containerIDs) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 6cf3e62..184143a 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -47,7 +47,7 @@ public class TestRatisPipelineProvider {
     nodeManager = new MockNodeManager(true, 10);
     stateManager = new PipelineStateManager(new OzoneConfiguration());
     provider = new RatisPipelineProvider(nodeManager,
-        stateManager);
+        stateManager, new OzoneConfiguration());
   }
 
   @Test
@@ -57,8 +57,8 @@ public class TestRatisPipelineProvider {
     stateManager.addPipeline(pipeline);
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
-    Assert.assertEquals(pipeline.getLifeCycleState(),
-        HddsProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(pipeline.getPipelineState(),
+        Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
 
     factor = HddsProtos.ReplicationFactor.ONE;
@@ -70,8 +70,8 @@ public class TestRatisPipelineProvider {
             .isEmpty());
     Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline1.getFactor(), factor);
-    Assert.assertEquals(pipeline1.getLifeCycleState(),
-        HddsProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(pipeline1.getPipelineState(),
+        Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
   }
 
@@ -89,16 +89,16 @@ public class TestRatisPipelineProvider {
     Pipeline pipeline = provider.create(createListOfNodes(factor.getNumber()));
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
-    Assert.assertEquals(pipeline.getLifeCycleState(),
-        HddsProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(
+        pipeline.getPipelineState(), Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
 
     factor = HddsProtos.ReplicationFactor.ONE;
     pipeline = provider.create(createListOfNodes(factor.getNumber()));
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
-    Assert.assertEquals(pipeline.getLifeCycleState(),
-        HddsProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(pipeline.getPipelineState(),
+        Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
new file mode 100644
index 0000000..0f9ad55
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestSCMContainerManager;
+import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Test cases to verify PipelineManager.
+ */
+public class TestSCMPipelineManager {
+  private static MockNodeManager nodeManager;
+  private static File testDir;
+  private static Configuration conf;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    conf = new OzoneConfiguration();
+    testDir = GenericTestUtils
+        .getTestDir(TestSCMContainerManager.class.getSimpleName());
+    conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    boolean folderExisted = testDir.exists() || testDir.mkdirs();
+    if (!folderExisted) {
+      throw new IOException("Unable to create test directory path");
+    }
+    nodeManager = new MockNodeManager(true, 20);
+  }
+
+  @AfterClass
+  public static void cleanup() throws IOException {
+    FileUtil.fullyDelete(testDir);
+  }
+
+  @Test
+  public void testPipelineReload() throws IOException {
+    PipelineManager pipelineManager =
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
+    Set<Pipeline> pipelines = new HashSet<>();
+    for (int i = 0; i < 5; i++) {
+      Pipeline pipeline = pipelineManager
+          .createPipeline(HddsProtos.ReplicationType.RATIS,
+              HddsProtos.ReplicationFactor.THREE);
+      pipelines.add(pipeline);
+    }
+    pipelineManager.close();
+
+    // new pipeline manager should be able to load the pipelines from the db
+    pipelineManager =
+        new SCMPipelineManager(conf, nodeManager,
+            new EventQueue());
+    List<Pipeline> pipelineList =
+        pipelineManager.getPipelinesByType(HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipelines, new HashSet<>(pipelineList));
+
+    // clean up
+    for (Pipeline pipeline : pipelines) {
+      pipelineManager.finalizePipeline(pipeline.getID());
+      pipelineManager.removePipeline(pipeline.getID());
+    }
+    pipelineManager.close();
+  }
+
+  @Test
+  public void testRemovePipeline() throws IOException {
+    PipelineManager pipelineManager =
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
+    Pipeline pipeline = pipelineManager
+        .createPipeline(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE);
+    pipelineManager.openPipeline(pipeline.getID());
+    pipelineManager
+        .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(1));
+    pipelineManager.finalizePipeline(pipeline.getID());
+    pipelineManager
+        .removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(1));
+    pipelineManager.removePipeline(pipeline.getID());
+    pipelineManager.close();
+
+    // new pipeline manager should not be able to load removed pipelines
+    pipelineManager =
+        new SCMPipelineManager(conf, nodeManager,
+            new EventQueue());
+    try {
+      pipelineManager.getPipeline(pipeline.getID());
+      Assert.fail("Pipeline should not have been retrieved");
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains("not found"));
+    }
+
+    // clean up
+    pipelineManager.close();
+  }
+
+  @Test
+  public void testPipelineReport() throws IOException {
+    PipelineManager pipelineManager =
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
+
+    // create a pipeline in allocated state with no dns yet reported
+    Pipeline pipeline = pipelineManager
+        .createPipeline(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE);
+    Assert
+        
.assertFalse(pipelineManager.getPipeline(pipeline.getID()).isHealthy());
+    Assert
+        .assertFalse(pipelineManager.getPipeline(pipeline.getID()).isOpen());
+
+    // get pipeline report from each dn in the pipeline
+    PipelineReportHandler pipelineReportHandler =
+        new PipelineReportHandler(pipelineManager, conf);
+    for (DatanodeDetails dn: pipeline.getNodes()) {
+      PipelineReportFromDatanode pipelineReportFromDatanode =
+          TestUtils.getRandomPipelineReportFromDatanode(dn, pipeline.getID());
+      // pipeline is not healthy until all dns report
+      Assert.assertFalse(
+          pipelineManager.getPipeline(pipeline.getID()).isHealthy());
+      pipelineReportHandler
+          .onMessage(pipelineReportFromDatanode, new EventQueue());
+    }
+
+    // pipeline is healthy when all dns report
+    Assert
+        .assertTrue(pipelineManager.getPipeline(pipeline.getID()).isHealthy());
+    // pipeline should now move to open state
+    Assert
+        .assertTrue(pipelineManager.getPipeline(pipeline.getID()).isOpen());
+
+    // close the pipeline
+    pipelineManager.finalizePipeline(pipeline.getID());
+
+    for (DatanodeDetails dn: pipeline.getNodes()) {
+      PipelineReportFromDatanode pipelineReportFromDatanode =
+          TestUtils.getRandomPipelineReportFromDatanode(dn, pipeline.getID());
+      // pipeline report for a closed pipeline should destroy the pipeline
+      // and remove it from the pipeline manager
+      pipelineReportHandler
+          .onMessage(pipelineReportFromDatanode, new EventQueue());
+    }
+
+    try {
+      pipelineManager.getPipeline(pipeline.getID());
+      Assert.fail("Pipeline should not have been retrieved");
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains("not found"));
+    }
+
+    // clean up
+    pipelineManager.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a43c92/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
index 0f56cc8..b44dbef 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
@@ -56,8 +56,8 @@ public class TestSimplePipelineProvider {
     Assert.assertEquals(pipeline.getType(),
         HddsProtos.ReplicationType.STAND_ALONE);
     Assert.assertEquals(pipeline.getFactor(), factor);
-    Assert.assertEquals(pipeline.getLifeCycleState(),
-        HddsProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(pipeline.getPipelineState(),
+        Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
 
     factor = HddsProtos.ReplicationFactor.ONE;
@@ -66,8 +66,8 @@ public class TestSimplePipelineProvider {
     Assert.assertEquals(pipeline1.getType(),
         HddsProtos.ReplicationType.STAND_ALONE);
     Assert.assertEquals(pipeline1.getFactor(), factor);
-    Assert.assertEquals(pipeline1.getLifeCycleState(),
-        HddsProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(pipeline1.getPipelineState(),
+        Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
   }
 
@@ -86,8 +86,8 @@ public class TestSimplePipelineProvider {
     Assert.assertEquals(pipeline.getType(),
         HddsProtos.ReplicationType.STAND_ALONE);
     Assert.assertEquals(pipeline.getFactor(), factor);
-    Assert.assertEquals(pipeline.getLifeCycleState(),
-        HddsProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(pipeline.getPipelineState(),
+        Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
 
     factor = HddsProtos.ReplicationFactor.ONE;
@@ -95,8 +95,8 @@ public class TestSimplePipelineProvider {
     Assert.assertEquals(pipeline.getType(),
         HddsProtos.ReplicationType.STAND_ALONE);
     Assert.assertEquals(pipeline.getFactor(), factor);
-    Assert.assertEquals(pipeline.getLifeCycleState(),
-        HddsProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(pipeline.getPipelineState(),
+        Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
   }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to