HDDS-304. Process ContainerAction from datanode heartbeat in SCM. Contributed by Nanda Kumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7c368575 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7c368575 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7c368575 Branch: refs/heads/YARN-7402 Commit: 7c368575a319f5ba98019418166524bac982086f Parents: 97870ec Author: Mukul Kumar Singh <msi...@apache.org> Authored: Thu Aug 2 17:34:17 2018 +0530 Committer: Mukul Kumar Singh <msi...@apache.org> Committed: Thu Aug 2 17:34:17 2018 +0530 ---------------------------------------------------------------------- .../scm/container/ContainerActionsHandler.java | 60 +++++++++++++++++ .../hadoop/hdds/scm/events/SCMEvents.java | 16 ++++- .../server/SCMDatanodeHeartbeatDispatcher.java | 22 +++++++ .../scm/server/StorageContainerManager.java | 3 + .../container/TestContainerActionsHandler.java | 68 ++++++++++++++++++++ 5 files changed, 168 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c368575/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerActionsHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerActionsHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerActionsHandler.java new file mode 100644 index 0000000..ce399eb --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerActionsHandler.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerAction; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .ContainerActionsFromDatanode; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles container reports from datanode. + */ +public class ContainerActionsHandler implements + EventHandler<ContainerActionsFromDatanode> { + + private static final Logger LOG = LoggerFactory.getLogger( + ContainerActionsHandler.class); + + @Override + public void onMessage( + ContainerActionsFromDatanode containerReportFromDatanode, + EventPublisher publisher) { + DatanodeDetails dd = containerReportFromDatanode.getDatanodeDetails(); + for (ContainerAction action : containerReportFromDatanode.getReport() + .getContainerActionsList()) { + ContainerID containerId = ContainerID.valueof(action.getContainerID()); + switch (action.getAction()) { + case CLOSE: + LOG.debug("Closing container {} in datanode {} because the" + + " container is {}.", containerId, dd, action.getReason()); + publisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerId); + break; + default: + LOG.warn("Invalid action {} with reason {}, from datanode {}. ", + action.getAction(), action.getReason(), dd); } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c368575/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index ad1702b..d49dd4f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -20,9 +20,16 @@ package org.apache.hadoop.hdds.scm.events; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.*; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler + .CloseContainerStatus; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler + .DeleteBlockCommandStatus; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler + .ReplicationStatus; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .ContainerActionsFromDatanode; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .CommandStatusReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .ContainerReportFromDatanode; @@ -57,6 +64,13 @@ public final class SCMEvents { new TypedEvent<>(ContainerReportFromDatanode.class, "Container_Report"); /** + * ContainerActions are sent by Datanode. This event is received by + * SCMDatanodeHeartbeatDispatcher and CONTAINER_ACTIONS event is generated. + */ + public static final TypedEvent<ContainerActionsFromDatanode> + CONTAINER_ACTIONS = new TypedEvent<>(ContainerActionsFromDatanode.class, + "Container_Actions"); + /** * A Command status report will be sent by datanodes. This repoort is received * by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c368575/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java index 2461d37..c259141 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.server; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerActionsProto; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; import org.apache.hadoop.hdds.protocol.proto @@ -37,6 +39,7 @@ import org.slf4j.LoggerFactory; import java.util.List; +import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS; import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT; import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT; import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT; @@ -89,6 +92,13 @@ public final class SCMDatanodeHeartbeatDispatcher { } + if (heartbeat.hasContainerActions()) { + LOG.debug("Dispatching Container Actions."); + eventPublisher.fireEvent(CONTAINER_ACTIONS, + new ContainerActionsFromDatanode(datanodeDetails, + heartbeat.getContainerActions())); + } + if (heartbeat.hasCommandStatusReport()) { eventPublisher.fireEvent(CMD_STATUS_REPORT, new CommandStatusReportFromDatanode(datanodeDetails, @@ -146,6 +156,18 @@ public final class SCMDatanodeHeartbeatDispatcher { } /** + * Container action event payload with origin. + */ + public static class ContainerActionsFromDatanode + extends ReportFromDatanode<ContainerActionsProto> { + + public ContainerActionsFromDatanode(DatanodeDetails datanodeDetails, + ContainerActionsProto actions) { + super(datanodeDetails, actions); + } + } + + /** * Container report event payload with origin. */ public static class CommandStatusReportFromDatanode http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c368575/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index be8fb43..9cb1318 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; +import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler; import org.apache.hadoop.hdds.scm.container.ContainerMapping; import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; import org.apache.hadoop.hdds.scm.container.Mapping; @@ -209,10 +210,12 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap); StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap); DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap); + ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler); eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler); + eventQueue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler); eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler); eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler); eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c368575/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerActionsHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerActionsHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerActionsHandler.java new file mode 100644 index 0000000..0997e1f --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerActionsHandler.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerActionsProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerActionsFromDatanode; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests ContainerActionsHandler. + */ +public class TestContainerActionsHandler { + + @Test + public void testCloseContainerAction() { + EventQueue queue = new EventQueue(); + ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); + CloseContainerEventHandler closeContainerEventHandler = Mockito.mock( + CloseContainerEventHandler.class); + queue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerEventHandler); + queue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler); + + ContainerAction action = ContainerAction.newBuilder() + .setContainerID(1L) + .setAction(ContainerAction.Action.CLOSE) + .setReason(ContainerAction.Reason.CONTAINER_FULL) + .build(); + + ContainerActionsProto cap = ContainerActionsProto.newBuilder() + .addContainerActions(action) + .build(); + + ContainerActionsFromDatanode containerActions = + new ContainerActionsFromDatanode( + TestUtils.randomDatanodeDetails(), cap); + + queue.fireEvent(SCMEvents.CONTAINER_ACTIONS, containerActions); + + verify(closeContainerEventHandler, times(1)) + .onMessage(ContainerID.valueof(1L), queue); + + } + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org