HDDS-127. Add CloseContainerEventHandler in SCM.
Contributed by Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/78761e87
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/78761e87
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/78761e87

Branch: refs/heads/HDFS-12943
Commit: 78761e87a7f3012ef2d96e294d55b323b76b7c42
Parents: ba303b1
Author: Anu Engineer <aengin...@apache.org>
Authored: Thu Jun 7 14:35:22 2018 -0700
Committer: Anu Engineer <aengin...@apache.org>
Committed: Thu Jun 7 14:35:22 2018 -0700

----------------------------------------------------------------------
 .../container/CloseContainerEventHandler.java   |  83 +++++++++
 .../hdds/scm/container/ContainerMapping.java    |   5 +
 .../scm/container/ContainerStateManager.java    |   9 +
 .../hadoop/hdds/scm/container/Mapping.java      |   6 +
 .../TestCloseContainerEventHandler.java         | 177 +++++++++++++++++++
 5 files changed, 280 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/78761e87/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
new file mode 100644
index 0000000..bc95b55
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.TypedEvent;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * In case of a node failure, volume failure, volume out of spapce, node
+ * out of space etc, CLOSE_CONTAINER_EVENT will be triggered.
+ * CloseContainerEventHandler is the handler for CLOSE_CONTAINER_EVENT.
+ * When a close container event is fired, a close command for the container
+ * should be sent to all the datanodes in the pipeline and 
containerStateManager
+ * needs to update the container state to Closing.
+ */
+public class CloseContainerEventHandler implements EventHandler<ContainerID> {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(CloseContainerEventHandler.class);
+
+  public static final TypedEvent<ContainerID> CLOSE_CONTAINER_EVENT =
+            new TypedEvent<>(ContainerID.class);
+
+  private final Mapping containerManager;
+
+  public CloseContainerEventHandler(Mapping containerManager) {
+    this.containerManager = containerManager;
+  }
+
+  @Override
+  public void onMessage(ContainerID containerID, EventPublisher publisher) {
+
+    LOG.info("Close container Event triggered for container : {}",
+        containerID.getId());
+    ContainerStateManager stateManager = containerManager.getStateManager();
+    ContainerInfo info = stateManager.getContainer(containerID);
+    if (info == null) {
+      LOG.info("Container with id : {} does not exist", containerID.getId());
+      return;
+    }
+    if (info.getState() == HddsProtos.LifeCycleState.OPEN) {
+      for (DatanodeDetails datanode : info.getPipeline().getMachines()) {
+        
containerManager.getNodeManager().addDatanodeCommand(datanode.getUuid(),
+            new CloseContainerCommand(containerID.getId()));
+      }
+      try {
+        // Finalize event will make sure the state of the container transitions
+        // from OPEN to CLOSING in containerStateManager.
+        stateManager
+            .updateContainerState(info, HddsProtos.LifeCycleEvent.FINALIZE);
+      } catch (SCMException ex) {
+        LOG.error("Failed to update the container state for container : {}"
+            + containerID);
+      }
+    } else {
+      LOG.info("container with id : {} is in {} state and need not be closed.",
+          containerID.getId(), info.getState());
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/78761e87/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index f5fe46a..b961c38 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -588,6 +588,11 @@ public class ContainerMapping implements Mapping {
     }
   }
 
+  @Override
+  public NodeManager getNodeManager() {
+    return nodeManager;
+  }
+
   @VisibleForTesting
   public MetadataStore getContainerStore() {
     return containerStore;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/78761e87/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 4895b78..9dfa660 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -445,6 +445,15 @@ public class ContainerStateManager implements Closeable {
         factor, type);
   }
 
+  /**
+   * Returns the containerInfo for the given container id.
+   * @param containerID id of the container
+   * @return ContainerInfo containerInfo
+   * @throws IOException
+   */
+  public ContainerInfo getContainer(ContainerID containerID) {
+    return containers.getContainerInfo(containerID.getId());
+  }
   @Override
   public void close() throws IOException {
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/78761e87/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
index ee8e344..ab42520 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
@@ -21,6 +21,7 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -103,4 +104,9 @@ public interface Mapping extends Closeable {
                                ContainerReportsProto reports)
       throws IOException;
 
+  /**
+   * Returns the nodeManager.
+   * @return NodeManager
+   */
+  NodeManager getNodeManager();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/78761e87/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
new file mode 100644
index 0000000..09ade3e
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CREATE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CREATED;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB;
+
+/**
+ * Tests the closeContainerEventHandler class.
+ */
+public class TestCloseContainerEventHandler {
+
+  private static Configuration configuration;
+  private static MockNodeManager nodeManager;
+  private static ContainerMapping mapping;
+  private static long size;
+  private static File testDir;
+  private static EventQueue eventQueue;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    configuration = SCMTestUtils.getConf();
+    size = configuration
+        .getLong(OZONE_SCM_CONTAINER_SIZE_GB, OZONE_SCM_CONTAINER_SIZE_DEFAULT)
+        * 1024 * 1024 * 1024;
+    testDir = GenericTestUtils
+        .getTestDir(TestCloseContainerEventHandler.class.getSimpleName());
+    configuration
+        .set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    nodeManager = new MockNodeManager(true, 10);
+    mapping = new ContainerMapping(configuration, nodeManager, 128);
+    eventQueue = new EventQueue();
+    eventQueue.addHandler(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT,
+        new CloseContainerEventHandler(mapping));
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (mapping != null) {
+      mapping.close();
+    }
+    FileUtil.fullyDelete(testDir);
+  }
+
+  @Test
+  public void testIfCloseContainerEventHadnlerInvoked() {
+    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+        .captureLogs(CloseContainerEventHandler.LOG);
+    eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT,
+        new ContainerID(Math.abs(new Random().nextLong())));
+    eventQueue.processAll(1000);
+    Assert.assertTrue(logCapturer.getOutput()
+        .contains("Close container Event triggered for container"));
+  }
+
+  @Test
+  public void testCloseContainerEventWithInvalidContainer() {
+    long id = Math.abs(new Random().nextLong());
+    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+        .captureLogs(CloseContainerEventHandler.LOG);
+    eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT,
+        new ContainerID(id));
+    eventQueue.processAll(1000);
+    Assert.assertTrue(logCapturer.getOutput()
+        .contains("Container with id : " + id + " does not exist"));
+  }
+
+  @Test
+  public void testCloseContainerEventWithValidContainers() throws IOException {
+
+    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+        .captureLogs(CloseContainerEventHandler.LOG);
+    ContainerInfo info = mapping
+        .allocateContainer(HddsProtos.ReplicationType.STAND_ALONE,
+            HddsProtos.ReplicationFactor.ONE, "ozone");
+    ContainerID id = new ContainerID(info.getContainerID());
+    DatanodeDetails datanode = info.getPipeline().getLeader();
+    int closeCount = nodeManager.getCommandCount(datanode);
+    eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT, id);
+    eventQueue.processAll(1000);
+    // At this point of time, the allocated container is not in open
+    // state, so firing close container event should not queue CLOSE
+    // command in the Datanode
+    Assert.assertEquals(0, nodeManager.getCommandCount(datanode));
+    // Make sure the information is logged
+    Assert.assertTrue(logCapturer.getOutput().contains(
+        "container with id : " + id.getId()
+            + " is in ALLOCATED state and need not be closed"));
+    //Execute these state transitions so that we can close the container.
+    mapping.updateContainerState(id.getId(), CREATE);
+    mapping.updateContainerState(id.getId(), CREATED);
+    eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT,
+        new ContainerID(info.getContainerID()));
+    eventQueue.processAll(1000);
+    Assert.assertEquals(closeCount + 1, nodeManager.getCommandCount(datanode));
+    Assert.assertEquals(HddsProtos.LifeCycleState.CLOSING,
+        mapping.getStateManager().getContainer(id).getState());
+  }
+
+  @Test
+  public void testCloseContainerEventWithRatis() throws IOException {
+
+    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+        .captureLogs(CloseContainerEventHandler.LOG);
+    ContainerInfo info = mapping
+        .allocateContainer(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE, "ozone");
+    ContainerID id = new ContainerID(info.getContainerID());
+    int[] closeCount = new int[3];
+    eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT, id);
+    eventQueue.processAll(1000);
+    int i = 0;
+    for (DatanodeDetails details : info.getPipeline().getMachines()) {
+      closeCount[i] = nodeManager.getCommandCount(details);
+      i++;
+    }
+    i = 0;
+    for (DatanodeDetails details : info.getPipeline().getMachines()) {
+      Assert.assertEquals(closeCount[i], nodeManager.getCommandCount(details));
+      i++;
+    }
+    // Make sure the information is logged
+    Assert.assertTrue(logCapturer.getOutput().contains(
+        "container with id : " + id.getId()
+            + " is in ALLOCATED state and need not be closed"));
+    //Execute these state transitions so that we can close the container.
+    mapping.updateContainerState(id.getId(), CREATE);
+    mapping.updateContainerState(id.getId(), CREATED);
+    eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT,
+        new ContainerID(info.getContainerID()));
+    eventQueue.processAll(1000);
+    i = 0;
+    // Make sure close is queued for each datanode on the pipeline
+    for (DatanodeDetails details : info.getPipeline().getMachines()) {
+      Assert.assertEquals(closeCount[i] + 1,
+          nodeManager.getCommandCount(details));
+      Assert.assertEquals(HddsProtos.LifeCycleState.CLOSING,
+          mapping.getStateManager().getContainer(id).getState());
+      i++;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to