HDDS-192:Create new SCMCommand to request a replication of a container. 
Contributed by Elek Marton


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/238fe00a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/238fe00a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/238fe00a

Branch: refs/heads/HDDS-4
Commit: 238fe00ad2692154f6a382f35735169ee5e4af2c
Parents: 35ec940
Author: Bharat Viswanadham <bha...@apache.org>
Authored: Mon Jun 25 21:12:05 2018 -0700
Committer: Bharat Viswanadham <bha...@apache.org>
Committed: Mon Jun 25 21:12:05 2018 -0700

----------------------------------------------------------------------
 .../statemachine/DatanodeStateMachine.java      |  3 +
 .../ReplicateContainerCommandHandler.java       | 67 ++++++++++++++
 .../states/endpoint/HeartbeatEndpointTask.java  | 12 +++
 .../commands/ReplicateContainerCommand.java     | 94 ++++++++++++++++++++
 .../StorageContainerDatanodeProtocol.proto      | 12 ++-
 .../scm/server/SCMDatanodeProtocolServer.java   | 11 +++
 .../TestReplicateContainerHandler.java          | 71 +++++++++++++++
 7 files changed, 269 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/238fe00a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index dc4e673..b073d7b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -32,6 +32,8 @@ import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .CommandDispatcher;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .DeleteBlocksCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+    .ReplicateContainerCommandHandler;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.Time;
@@ -95,6 +97,7 @@ public class DatanodeStateMachine implements Closeable {
         .addHandler(new CloseContainerCommandHandler())
         .addHandler(new DeleteBlocksCommandHandler(
             container.getContainerManager(), conf))
+        .addHandler(new ReplicateContainerCommandHandler())
         .setConnectionManager(connectionManager)
         .setContainer(container)
         .setContext(context)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/238fe00a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
new file mode 100644
index 0000000..b4e83b7
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Command handler to copy containers from sources.
+ */
+public class ReplicateContainerCommandHandler implements CommandHandler {
+  static final Logger LOG =
+      LoggerFactory.getLogger(ReplicateContainerCommandHandler.class);
+
+  private int invocationCount;
+
+  private long totalTime;
+
+  @Override
+  public void handle(SCMCommand command, OzoneContainer container,
+      StateContext context, SCMConnectionManager connectionManager) {
+    LOG.warn("Replicate command is not yet handled");
+
+  }
+
+  @Override
+  public SCMCommandProto.Type getCommandType() {
+    return Type.replicateContainerCommand;
+  }
+
+  @Override
+  public int getInvocationCount() {
+    return this.invocationCount;
+  }
+
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount > 0) {
+      return totalTime / invocationCount;
+    }
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/238fe00a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 1ee6375..260a245 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.ozone.container.common.statemachine
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -196,6 +198,16 @@ public class HeartbeatEndpointTask
         }
         this.context.addCommand(closeContainer);
         break;
+      case replicateContainerCommand:
+        ReplicateContainerCommand replicateContainerCommand =
+            ReplicateContainerCommand.getFromProtobuf(
+                commandResponseProto.getReplicateContainerCommandProto());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received SCM container replicate request for container 
{}",
+              replicateContainerCommand.getContainerID());
+        }
+        this.context.addCommand(replicateContainerCommand);
+        break;
       default:
         throw new IllegalArgumentException("Unknown response : "
             + commandResponseProto.getCommandType().name());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/238fe00a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
new file mode 100644
index 0000000..834318b
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto
+    .Builder;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * SCM command to request replication of a container.
+ */
+public class ReplicateContainerCommand
+    extends SCMCommand<ReplicateContainerCommandProto> {
+
+  private final long containerID;
+
+  private final List<DatanodeDetails> sourceDatanodes;
+
+  public ReplicateContainerCommand(long containerID,
+      List<DatanodeDetails> sourceDatanodes) {
+    this.containerID = containerID;
+    this.sourceDatanodes = sourceDatanodes;
+  }
+
+  @Override
+  public Type getType() {
+    return SCMCommandProto.Type.replicateContainerCommand;
+  }
+
+  @Override
+  public byte[] getProtoBufMessage() {
+    return getProto().toByteArray();
+  }
+
+  public ReplicateContainerCommandProto getProto() {
+    Builder builder = ReplicateContainerCommandProto.newBuilder()
+        .setContainerID(containerID);
+    for (DatanodeDetails dd : sourceDatanodes) {
+      builder.addSources(dd.getProtoBufMessage());
+    }
+    return builder.build();
+  }
+
+  public static ReplicateContainerCommand getFromProtobuf(
+      ReplicateContainerCommandProto protoMessage) {
+    Preconditions.checkNotNull(protoMessage);
+
+    List<DatanodeDetails> datanodeDetails =
+        protoMessage.getSourcesList()
+        .stream()
+        .map(DatanodeDetails::getFromProtoBuf)
+        .collect(Collectors.toList());
+
+    return new ReplicateContainerCommand(protoMessage.getContainerID(),
+        datanodeDetails);
+
+  }
+
+  public long getContainerID() {
+    return containerID;
+  }
+
+  public List<DatanodeDetails> getSourceDatanodes() {
+    return sourceDatanodes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/238fe00a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
 
b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index f6aba05..54230c1 100644
--- 
a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ 
b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -172,6 +172,7 @@ message SCMCommandProto {
     deleteBlocksCommand = 2;
     closeContainerCommand = 3;
     deleteContainerCommand = 4;
+    replicateContainerCommand = 5;
   }
   // TODO: once we start using protoc 3.x, refactor this message using "oneof"
   required Type commandType = 1;
@@ -179,6 +180,7 @@ message SCMCommandProto {
   optional DeleteBlocksCommandProto deleteBlocksCommandProto = 3;
   optional CloseContainerCommandProto closeContainerCommandProto = 4;
   optional DeleteContainerCommandProto deleteContainerCommandProto = 5;
+  optional ReplicateContainerCommandProto replicateContainerCommandProto = 6;
 }
 
 /**
@@ -227,13 +229,21 @@ message CloseContainerCommandProto {
 }
 
 /**
-This command asks the datanode to close a specific container.
+This command asks the datanode to delete a specific container.
 */
 message DeleteContainerCommandProto {
   required int64 containerID = 1;
 }
 
 /**
+This command asks the datanode to replicate a container from specific sources.
+*/
+message ReplicateContainerCommandProto {
+  required int64 containerID = 1;
+  repeated DatanodeDetailsProto sources = 2;
+}
+
+/**
  * Protocol used from a datanode to StorageContainerManager.
  *
  * Please see the request and response messages for details of the RPC calls.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/238fe00a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 7d16161..eb5ce1a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -63,6 +64,9 @@ import static org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto
     .Type.deleteBlocksCommand;
 import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
+    .replicateContainerCommand;
+import static org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto
     .Type.reregisterCommand;
 
@@ -77,6 +81,7 @@ import 
org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
 import org.apache.hadoop.ozone.protocolPB
@@ -293,6 +298,12 @@ public class SCMDatanodeProtocolServer implements
           .setCloseContainerCommandProto(
               ((CloseContainerCommand) cmd).getProto())
           .build();
+    case replicateContainerCommand:
+      return builder
+          .setCommandType(replicateContainerCommand)
+          .setReplicateContainerCommandProto(
+              ((ReplicateContainerCommand)cmd).getProto())
+          .build();
     default:
       throw new IllegalArgumentException("Not implemented");
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/238fe00a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java
new file mode 100644
index 0000000..a5b101f
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.client.rest.OzoneException;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_SIZE_GB;
+import org.junit.Test;
+
+/**
+ * Tests the behavior of the datanode, when replicate container command is
+ * received.
+ */
+public class TestReplicateContainerHandler {
+
+  @Test
+  public void test() throws IOException, TimeoutException, 
InterruptedException,
+      OzoneException {
+
+    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+        .captureLogs(ReplicateContainerCommandHandler.LOG);
+
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(OZONE_SCM_CONTAINER_SIZE_GB, "1");
+    MiniOzoneCluster cluster =
+        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
+    cluster.waitForClusterToBeReady();
+
+    DatanodeDetails datanodeDetails =
+        cluster.getHddsDatanodes().get(0).getDatanodeDetails();
+    //send the order to replicate the container
+    cluster.getStorageContainerManager().getScmNodeManager()
+        .addDatanodeCommand(datanodeDetails.getUuid(),
+            new ReplicateContainerCommand(1L,
+                new ArrayList<>()));
+
+    //TODO: here we test only the serialization/unserialization as
+    // the implementation is not yet done
+    GenericTestUtils
+        .waitFor(() -> logCapturer.getOutput().contains("not yet handled"), 
500,
+            5 * 1000);
+
+  }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to