Revert "HDDS-239. Add PipelineStateManager to track pipeline state transition. Contributed by Mukul Kumar Singh."
This reverts commit 6837121a43231f854b0b22ad20330012439313ce.(Mixed with HDDS-260) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d2acf8d5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d2acf8d5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d2acf8d5 Branch: refs/heads/HADOOP-15461 Commit: d2acf8d560950f06ffbf5c217fbfab76cd70d5da Parents: c7ae556 Author: Xiaoyu Yao <x...@apache.org> Authored: Fri Jul 20 14:20:18 2018 -0700 Committer: Xiaoyu Yao <x...@apache.org> Committed: Fri Jul 20 14:20:18 2018 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 5 - .../scm/container/common/helpers/Pipeline.java | 7 - .../common/src/main/resources/ozone-default.xml | 12 - .../common/statemachine/StateContext.java | 52 +--- .../states/endpoint/HeartbeatEndpointTask.java | 24 +- .../StorageContainerDatanodeProtocol.proto | 4 +- .../common/report/TestReportPublisher.java | 41 +++ .../endpoint/TestHeartbeatEndpointTask.java | 302 ------------------- .../common/states/endpoint/package-info.java | 18 -- .../hdds/scm/container/ContainerMapping.java | 4 - .../hdds/scm/exceptions/SCMException.java | 1 - .../hdds/scm/pipelines/PipelineManager.java | 64 ++-- .../hdds/scm/pipelines/PipelineSelector.java | 212 ++----------- .../scm/pipelines/ratis/RatisManagerImpl.java | 33 +- .../standalone/StandaloneManagerImpl.java | 21 +- .../hdds/scm/pipeline/TestNode2PipelineMap.java | 14 - 16 files changed, 146 insertions(+), 668 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 6e940ad..71184cf 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -236,11 +236,6 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT = "60s"; - public static final String OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT = - "ozone.scm.pipeline.creation.lease.timeout"; - - public static final String - OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT = "60s"; public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY = "ozone.scm.block.deletion.max.retry"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java index 534c9fd..c5794f4 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java @@ -214,13 +214,6 @@ public class Pipeline { } /** - * Update the State of the pipeline. - */ - public void setLifeCycleState(HddsProtos.LifeCycleState nextState) { - lifeCycleState = nextState; - } - - /** * Gets the pipeline Name. * * @return - Name of the pipeline http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/common/src/main/resources/ozone-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 69a382a..5a1d26a 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1085,17 +1085,5 @@ executed since last report. Unit could be defined with postfix (ns,ms,s,m,h,d)</description> </property> - <property> - <name>ozone.scm.pipeline.creation.lease.timeout</name> - <value>60s</value> - <tag>OZONE, SCM, PIPELINE</tag> - <description> - Pipeline creation timeout in milliseconds to be used by SCM. When - BEGIN_CREATE event happens the pipeline is moved from ALLOCATED to - CREATING state, SCM will now wait for the configured amount of time - to get COMPLETE_CREATE event if it doesn't receive it will move the - pipeline to DELETING. - </description> - </property> </configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 4951f2a..faaff69 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -20,18 +20,14 @@ import com.google.protobuf.GeneratedMessage; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerAction; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.CommandStatus.Status; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status; import org.apache.hadoop.ozone.container.common.states.DatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode .InitDatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode .RunningDatanodeState; import org.apache.hadoop.ozone.protocol.commands.CommandStatus; -import org.apache.hadoop.ozone.protocol.commands.CommandStatus - .CommandStatusBuilder; +import org.apache.hadoop.ozone.protocol.commands.CommandStatus.CommandStatusBuilder; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +43,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; @@ -64,7 +59,6 @@ public class StateContext { private final AtomicLong stateExecutionCount; private final Configuration conf; private final Queue<GeneratedMessage> reports; - private final Queue<ContainerAction> containerActions; private DatanodeStateMachine.DatanodeStates state; /** @@ -82,7 +76,6 @@ public class StateContext { commandQueue = new LinkedList<>(); cmdStatusMap = new ConcurrentHashMap<>(); reports = new LinkedList<>(); - containerActions = new LinkedList<>(); lock = new ReentrantLock(); stateExecutionCount = new AtomicLong(0); } @@ -205,47 +198,6 @@ public class StateContext { return results; } - - /** - * Adds the ContainerAction to ContainerAction queue. - * - * @param containerAction ContainerAction to be added - */ - public void addContainerAction(ContainerAction containerAction) { - synchronized (containerActions) { - containerActions.add(containerAction); - } - } - - /** - * Returns all the pending ContainerActions from the ContainerAction queue, - * or empty list if the queue is empty. - * - * @return List<ContainerAction> - */ - public List<ContainerAction> getAllPendingContainerActions() { - return getPendingContainerAction(Integer.MAX_VALUE); - } - - /** - * Returns pending ContainerActions from the ContainerAction queue with a - * max limit on list size, or empty list if the queue is empty. - * - * @return List<ContainerAction> - */ - public List<ContainerAction> getPendingContainerAction(int maxLimit) { - List<ContainerAction> results = new ArrayList<>(); - synchronized (containerActions) { - containerActions.parallelStream().limit(maxLimit).collect(Collectors.toList()); - ContainerAction action = containerActions.poll(); - while(results.size() < maxLimit && action != null) { - results.add(action); - action = containerActions.poll(); - } - } - return results; - } - /** * Returns the next task to get executed by the datanode state machine. * @return A callable that will be executed by the http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 214e1cd..260a245 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -25,10 +25,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerActionsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerAction; -import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; @@ -50,7 +46,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.ZonedDateTime; -import java.util.List; import java.util.concurrent.Callable; /** @@ -112,7 +107,7 @@ public class HeartbeatEndpointTask SCMHeartbeatRequestProto.newBuilder() .setDatanodeDetails(datanodeDetailsProto); addReports(requestBuilder); - addContainerActions(requestBuilder); + SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint() .sendHeartbeat(requestBuilder.build()); processResponse(reponse, datanodeDetailsProto); @@ -145,23 +140,6 @@ public class HeartbeatEndpointTask } /** - * Adds all the pending ContainerActions to the heartbeat. - * - * @param requestBuilder builder to which the report has to be added. - */ - private void addContainerActions( - SCMHeartbeatRequestProto.Builder requestBuilder) { - List<ContainerAction> actions = context.getAllPendingContainerActions(); - if (!actions.isEmpty()) { - ContainerActionsProto cap = ContainerActionsProto.newBuilder() - .addAllContainerActions(actions) - .build(); - requestBuilder.setContainerActions(cap); - } - } - - - /** * Returns a builder class for HeartbeatEndpointTask task. * @return Builder. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index d89567b..4238389 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -79,8 +79,8 @@ message SCMHeartbeatRequestProto { required DatanodeDetailsProto datanodeDetails = 1; optional NodeReportProto nodeReport = 2; optional ContainerReportsProto containerReport = 3; - optional CommandStatusReportsProto commandStatusReport = 4; - optional ContainerActionsProto containerActions = 5; + optional ContainerActionsProto containerActions = 4; + optional CommandStatusReportsProto commandStatusReport = 5; } /* http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java index 811599f..a0db2e8 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.common.report; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.Descriptors; import com.google.protobuf.GeneratedMessage; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -27,8 +28,14 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.CommandStatus.Status; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.util.concurrent.HadoopExecutors; @@ -171,6 +178,22 @@ public class TestReportPublisher { executorService.shutdown(); } + @Test + public void testAddingReportToHeartbeat() { + GeneratedMessage nodeReport = NodeReportProto.getDefaultInstance(); + GeneratedMessage containerReport = ContainerReportsProto + .getDefaultInstance(); + SCMHeartbeatRequestProto.Builder heartbeatBuilder = + SCMHeartbeatRequestProto.newBuilder(); + heartbeatBuilder.setDatanodeDetails( + getDatanodeDetails().getProtoBufMessage()); + addReport(heartbeatBuilder, nodeReport); + addReport(heartbeatBuilder, containerReport); + SCMHeartbeatRequestProto heartbeat = heartbeatBuilder.build(); + Assert.assertTrue(heartbeat.hasNodeReport()); + Assert.assertTrue(heartbeat.hasContainerReport()); + } + /** * Get a datanode details. * @@ -199,4 +222,22 @@ public class TestReportPublisher { return builder.build(); } + /** + * Adds the report to heartbeat. + * + * @param requestBuilder builder to which the report has to be added. + * @param report the report to be added. + */ + private static void addReport(SCMHeartbeatRequestProto.Builder + requestBuilder, GeneratedMessage report) { + String reportName = report.getDescriptorForType().getFullName(); + for (Descriptors.FieldDescriptor descriptor : + SCMHeartbeatRequestProto.getDescriptor().getFields()) { + String heartbeatFieldName = descriptor.getMessageType().getFullName(); + if (heartbeatFieldName.equals(reportName)) { + requestBuilder.setField(descriptor, report); + } + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java deleted file mode 100644 index 87bd811..0000000 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java +++ /dev/null @@ -1,302 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.common.states.endpoint; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerInfo; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerAction; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; -import org.apache.hadoop.ozone.container.common.statemachine - .DatanodeStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine - .DatanodeStateMachine.DatanodeStates; -import org.apache.hadoop.ozone.container.common.statemachine - .EndpointStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.protocolPB - .StorageContainerDatanodeProtocolClientSideTranslatorPB; - -import org.junit.Assert; -import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; - -import java.util.UUID; - -import static org.mockito.ArgumentMatchers.any; - -/** - * This class tests the functionality of HeartbeatEndpointTask. - */ -public class TestHeartbeatEndpointTask { - - - @Test - public void testheartbeatWithoutReports() throws Exception { - StorageContainerDatanodeProtocolClientSideTranslatorPB scm = - Mockito.mock( - StorageContainerDatanodeProtocolClientSideTranslatorPB.class); - ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor - .forClass(SCMHeartbeatRequestProto.class); - Mockito.when(scm.sendHeartbeat(argument.capture())) - .thenAnswer(invocation -> - SCMHeartbeatResponseProto.newBuilder() - .setDatanodeUUID( - ((SCMHeartbeatRequestProto)invocation.getArgument(0)) - .getDatanodeDetails().getUuid()) - .build()); - - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(scm); - endpointTask.call(); - SCMHeartbeatRequestProto heartbeat = argument.getValue(); - Assert.assertTrue(heartbeat.hasDatanodeDetails()); - Assert.assertFalse(heartbeat.hasNodeReport()); - Assert.assertFalse(heartbeat.hasContainerReport()); - Assert.assertFalse(heartbeat.hasCommandStatusReport()); - Assert.assertFalse(heartbeat.hasContainerActions()); - } - - @Test - public void testheartbeatWithNodeReports() throws Exception { - Configuration conf = new OzoneConfiguration(); - StateContext context = new StateContext(conf, DatanodeStates.RUNNING, - Mockito.mock(DatanodeStateMachine.class)); - - StorageContainerDatanodeProtocolClientSideTranslatorPB scm = - Mockito.mock( - StorageContainerDatanodeProtocolClientSideTranslatorPB.class); - ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor - .forClass(SCMHeartbeatRequestProto.class); - Mockito.when(scm.sendHeartbeat(argument.capture())) - .thenAnswer(invocation -> - SCMHeartbeatResponseProto.newBuilder() - .setDatanodeUUID( - ((SCMHeartbeatRequestProto)invocation.getArgument(0)) - .getDatanodeDetails().getUuid()) - .build()); - - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( - conf, context, scm); - context.addReport(NodeReportProto.getDefaultInstance()); - endpointTask.call(); - SCMHeartbeatRequestProto heartbeat = argument.getValue(); - Assert.assertTrue(heartbeat.hasDatanodeDetails()); - Assert.assertTrue(heartbeat.hasNodeReport()); - Assert.assertFalse(heartbeat.hasContainerReport()); - Assert.assertFalse(heartbeat.hasCommandStatusReport()); - Assert.assertFalse(heartbeat.hasContainerActions()); - } - - @Test - public void testheartbeatWithContainerReports() throws Exception { - Configuration conf = new OzoneConfiguration(); - StateContext context = new StateContext(conf, DatanodeStates.RUNNING, - Mockito.mock(DatanodeStateMachine.class)); - - StorageContainerDatanodeProtocolClientSideTranslatorPB scm = - Mockito.mock( - StorageContainerDatanodeProtocolClientSideTranslatorPB.class); - ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor - .forClass(SCMHeartbeatRequestProto.class); - Mockito.when(scm.sendHeartbeat(argument.capture())) - .thenAnswer(invocation -> - SCMHeartbeatResponseProto.newBuilder() - .setDatanodeUUID( - ((SCMHeartbeatRequestProto)invocation.getArgument(0)) - .getDatanodeDetails().getUuid()) - .build()); - - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( - conf, context, scm); - context.addReport(ContainerReportsProto.getDefaultInstance()); - endpointTask.call(); - SCMHeartbeatRequestProto heartbeat = argument.getValue(); - Assert.assertTrue(heartbeat.hasDatanodeDetails()); - Assert.assertFalse(heartbeat.hasNodeReport()); - Assert.assertTrue(heartbeat.hasContainerReport()); - Assert.assertFalse(heartbeat.hasCommandStatusReport()); - Assert.assertFalse(heartbeat.hasContainerActions()); - } - - @Test - public void testheartbeatWithCommandStatusReports() throws Exception { - Configuration conf = new OzoneConfiguration(); - StateContext context = new StateContext(conf, DatanodeStates.RUNNING, - Mockito.mock(DatanodeStateMachine.class)); - - StorageContainerDatanodeProtocolClientSideTranslatorPB scm = - Mockito.mock( - StorageContainerDatanodeProtocolClientSideTranslatorPB.class); - ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor - .forClass(SCMHeartbeatRequestProto.class); - Mockito.when(scm.sendHeartbeat(argument.capture())) - .thenAnswer(invocation -> - SCMHeartbeatResponseProto.newBuilder() - .setDatanodeUUID( - ((SCMHeartbeatRequestProto)invocation.getArgument(0)) - .getDatanodeDetails().getUuid()) - .build()); - - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( - conf, context, scm); - context.addReport(CommandStatusReportsProto.getDefaultInstance()); - endpointTask.call(); - SCMHeartbeatRequestProto heartbeat = argument.getValue(); - Assert.assertTrue(heartbeat.hasDatanodeDetails()); - Assert.assertFalse(heartbeat.hasNodeReport()); - Assert.assertFalse(heartbeat.hasContainerReport()); - Assert.assertTrue(heartbeat.hasCommandStatusReport()); - Assert.assertFalse(heartbeat.hasContainerActions()); - } - - @Test - public void testheartbeatWithContainerActions() throws Exception { - Configuration conf = new OzoneConfiguration(); - StateContext context = new StateContext(conf, DatanodeStates.RUNNING, - Mockito.mock(DatanodeStateMachine.class)); - - StorageContainerDatanodeProtocolClientSideTranslatorPB scm = - Mockito.mock( - StorageContainerDatanodeProtocolClientSideTranslatorPB.class); - ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor - .forClass(SCMHeartbeatRequestProto.class); - Mockito.when(scm.sendHeartbeat(argument.capture())) - .thenAnswer(invocation -> - SCMHeartbeatResponseProto.newBuilder() - .setDatanodeUUID( - ((SCMHeartbeatRequestProto)invocation.getArgument(0)) - .getDatanodeDetails().getUuid()) - .build()); - - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( - conf, context, scm); - context.addContainerAction(getContainerAction()); - endpointTask.call(); - SCMHeartbeatRequestProto heartbeat = argument.getValue(); - Assert.assertTrue(heartbeat.hasDatanodeDetails()); - Assert.assertFalse(heartbeat.hasNodeReport()); - Assert.assertFalse(heartbeat.hasContainerReport()); - Assert.assertFalse(heartbeat.hasCommandStatusReport()); - Assert.assertTrue(heartbeat.hasContainerActions()); - } - - @Test - public void testheartbeatWithAllReports() throws Exception { - Configuration conf = new OzoneConfiguration(); - StateContext context = new StateContext(conf, DatanodeStates.RUNNING, - Mockito.mock(DatanodeStateMachine.class)); - - StorageContainerDatanodeProtocolClientSideTranslatorPB scm = - Mockito.mock( - StorageContainerDatanodeProtocolClientSideTranslatorPB.class); - ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor - .forClass(SCMHeartbeatRequestProto.class); - Mockito.when(scm.sendHeartbeat(argument.capture())) - .thenAnswer(invocation -> - SCMHeartbeatResponseProto.newBuilder() - .setDatanodeUUID( - ((SCMHeartbeatRequestProto)invocation.getArgument(0)) - .getDatanodeDetails().getUuid()) - .build()); - - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( - conf, context, scm); - context.addReport(NodeReportProto.getDefaultInstance()); - context.addReport(ContainerReportsProto.getDefaultInstance()); - context.addReport(CommandStatusReportsProto.getDefaultInstance()); - context.addContainerAction(getContainerAction()); - endpointTask.call(); - SCMHeartbeatRequestProto heartbeat = argument.getValue(); - Assert.assertTrue(heartbeat.hasDatanodeDetails()); - Assert.assertTrue(heartbeat.hasNodeReport()); - Assert.assertTrue(heartbeat.hasContainerReport()); - Assert.assertTrue(heartbeat.hasCommandStatusReport()); - Assert.assertTrue(heartbeat.hasContainerActions()); - } - - /** - * Creates HeartbeatEndpointTask for the given StorageContainerManager proxy. - * - * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB - * - * @return HeartbeatEndpointTask - */ - private HeartbeatEndpointTask getHeartbeatEndpointTask( - StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) { - Configuration conf = new OzoneConfiguration(); - StateContext context = new StateContext(conf, DatanodeStates.RUNNING, - Mockito.mock(DatanodeStateMachine.class)); - return getHeartbeatEndpointTask(conf, context, proxy); - - } - - /** - * Creates HeartbeatEndpointTask with the given conf, context and - * StorageContainerManager client side proxy. - * - * @param conf Configuration - * @param context StateContext - * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB - * - * @return HeartbeatEndpointTask - */ - private HeartbeatEndpointTask getHeartbeatEndpointTask( - Configuration conf, - StateContext context, - StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) { - DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder() - .setUuid(UUID.randomUUID().toString()) - .setHostName("localhost") - .setIpAddress("127.0.0.1") - .build(); - EndpointStateMachine endpointStateMachine = Mockito - .mock(EndpointStateMachine.class); - Mockito.when(endpointStateMachine.getEndPoint()).thenReturn(proxy); - return HeartbeatEndpointTask.newBuilder() - .setConfig(conf) - .setDatanodeDetails(datanodeDetails) - .setContext(context) - .setEndpointStateMachine(endpointStateMachine) - .build(); - } - - private ContainerAction getContainerAction() { - ContainerAction.Builder builder = ContainerAction.newBuilder(); - ContainerInfo containerInfo = ContainerInfo.newBuilder() - .setContainerID(1L) - .build(); - builder.setContainer(containerInfo) - .setAction(ContainerAction.Action.CLOSE) - .setReason(ContainerAction.Reason.CONTAINER_FULL); - return builder.build(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java deleted file mode 100644 index d120a5c..0000000 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.states.endpoint; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index f07d22b..26f4d86 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -658,10 +658,6 @@ public class ContainerMapping implements Mapping { if (containerStore != null) { containerStore.close(); } - - if (pipelineSelector != null) { - pipelineSelector.shutdown(); - } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java index 0085542..d7d70ef 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java @@ -107,7 +107,6 @@ public class SCMException extends IOException { FAILED_TO_LOAD_OPEN_CONTAINER, FAILED_TO_ALLOCATE_CONTAINER, FAILED_TO_CHANGE_CONTAINER_STATE, - FAILED_TO_CHANGE_PIPELINE_STATE, CONTAINER_EXISTS, FAILED_TO_FIND_CONTAINER, FAILED_TO_FIND_CONTAINER_WITH_SPACE, http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java index 77d8211..a041973 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java @@ -59,16 +59,41 @@ public abstract class PipelineManager { * @return a Pipeline. */ public synchronized final Pipeline getPipeline( - ReplicationFactor replicationFactor, ReplicationType replicationType) { - Pipeline pipeline = findOpenPipeline(replicationType, replicationFactor); + ReplicationFactor replicationFactor, ReplicationType replicationType) + throws IOException { + /** + * In the Ozone world, we have a very simple policy. + * + * 1. Try to create a pipeline if there are enough free nodes. + * + * 2. This allows all nodes to part of a pipeline quickly. + * + * 3. if there are not enough free nodes, return pipeline in a + * round-robin fashion. + * + * TODO: Might have to come up with a better algorithm than this. + * Create a new placement policy that returns pipelines in round robin + * fashion. + */ + Pipeline pipeline = allocatePipeline(replicationFactor); if (pipeline != null) { - LOG.debug("re-used pipeline:{} for container with " + + LOG.debug("created new pipeline:{} for container with " + "replicationType:{} replicationFactor:{}", pipeline.getPipelineName(), replicationType, replicationFactor); + activePipelines.add(pipeline); + activePipelineMap.put(pipeline.getPipelineName(), pipeline); + node2PipelineMap.addPipeline(pipeline); + } else { + pipeline = findOpenPipeline(replicationType, replicationFactor); + if (pipeline != null) { + LOG.debug("re-used pipeline:{} for container with " + + "replicationType:{} replicationFactor:{}", + pipeline.getPipelineName(), replicationType, replicationFactor); + } } if (pipeline == null) { LOG.error("Get pipeline call failed. We are not able to find" + - " operational pipeline."); + "free nodes or operational pipeline."); return null; } else { return pipeline; @@ -84,7 +109,7 @@ public abstract class PipelineManager { public synchronized final Pipeline getPipeline(String pipelineName) { Pipeline pipeline = null; - // 1. Check if pipeline already exists + // 1. Check if pipeline channel already exists if (activePipelineMap.containsKey(pipelineName)) { pipeline = activePipelineMap.get(pipelineName); LOG.debug("Returning pipeline for pipelineName:{}", pipelineName); @@ -107,13 +132,7 @@ public abstract class PipelineManager { } public abstract Pipeline allocatePipeline( - ReplicationFactor replicationFactor); - - /** - * Initialize the pipeline - * TODO: move the initialization to Ozone Client later - */ - public abstract void initializePipeline(Pipeline pipeline) throws IOException; + ReplicationFactor replicationFactor) throws IOException; public void removePipeline(Pipeline pipeline) { activePipelines.remove(pipeline); @@ -160,23 +179,12 @@ public abstract class PipelineManager { } /** - * Creates a pipeline with a specified replication factor and type. - * @param replicationFactor - Replication Factor. - * @param replicationType - Replication Type. + * Creates a pipeline from a specified set of Nodes. + * @param pipelineID - Name of the pipeline + * @param datanodes - The list of datanodes that make this pipeline. */ - public Pipeline createPipeline(ReplicationFactor replicationFactor, - ReplicationType replicationType) throws IOException { - Pipeline pipeline = allocatePipeline(replicationFactor); - if (pipeline != null) { - LOG.debug("created new pipeline:{} for container with " - + "replicationType:{} replicationFactor:{}", - pipeline.getPipelineName(), replicationType, replicationFactor); - activePipelines.add(pipeline); - activePipelineMap.put(pipeline.getPipelineName(), pipeline); - node2PipelineMap.addPipeline(pipeline); - } - return pipeline; - } + public abstract void createPipeline(String pipelineID, + List<DatanodeDetails> datanodes) throws IOException; /** * Close the pipeline with the given clusterId. http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java index 08710e7..2955af5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms .ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.container.placement.algorithms .SCMContainerPlacementRandom; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl; import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl; @@ -34,28 +33,17 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.common.statemachine - .InvalidStateTransitionException; -import org.apache.hadoop.ozone.common.statemachine.StateMachine; -import org.apache.hadoop.ozone.lease.Lease; -import org.apache.hadoop.ozone.lease.LeaseException; -import org.apache.hadoop.ozone.lease.LeaseManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes - .FAILED_TO_CHANGE_PIPELINE_STATE; - /** * Sends the request to the right pipeline manager. */ @@ -69,10 +57,6 @@ public class PipelineSelector { private final StandaloneManagerImpl standaloneManager; private final long containerSize; private final Node2PipelineMap node2PipelineMap; - private final LeaseManager<Pipeline> pipelineLeaseManager; - private final StateMachine<LifeCycleState, - HddsProtos.LifeCycleEvent> stateMachine; - /** * Constructs a pipeline Selector. * @@ -93,74 +77,6 @@ public class PipelineSelector { this.ratisManager = new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize, conf, node2PipelineMap); - // Initialize the container state machine. - Set<HddsProtos.LifeCycleState> finalStates = new HashSet(); - long pipelineCreationLeaseTimeout = conf.getTimeDuration( - ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT, - ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - LOG.trace("Starting Pipeline Lease Manager."); - pipelineLeaseManager = new LeaseManager<>(pipelineCreationLeaseTimeout); - pipelineLeaseManager.start(); - - // These are the steady states of a container. - finalStates.add(HddsProtos.LifeCycleState.OPEN); - finalStates.add(HddsProtos.LifeCycleState.CLOSED); - - this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED, - finalStates); - initializeStateMachine(); - } - - /** - * Event and State Transition Mapping: - * - * State: ALLOCATED ---------------> CREATING - * Event: CREATE - * - * State: CREATING ---------------> OPEN - * Event: CREATED - * - * State: OPEN ---------------> CLOSING - * Event: FINALIZE - * - * State: CLOSING ---------------> CLOSED - * Event: CLOSE - * - * State: CREATING ---------------> CLOSED - * Event: TIMEOUT - * - * - * Container State Flow: - * - * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING] - * (CREATE) | (CREATED) (FINALIZE) | - * | | - * | | - * |(TIMEOUT) |(CLOSE) - * | | - * +--------> [CLOSED] <--------+ - */ - private void initializeStateMachine() { - stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED, - HddsProtos.LifeCycleState.CREATING, - HddsProtos.LifeCycleEvent.CREATE); - - stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING, - HddsProtos.LifeCycleState.OPEN, - HddsProtos.LifeCycleEvent.CREATED); - - stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN, - HddsProtos.LifeCycleState.CLOSING, - HddsProtos.LifeCycleEvent.FINALIZE); - - stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING, - HddsProtos.LifeCycleState.CLOSED, - HddsProtos.LifeCycleEvent.CLOSE); - - stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING, - HddsProtos.LifeCycleState.CLOSED, - HddsProtos.LifeCycleEvent.TIMEOUT); } /** @@ -172,14 +88,15 @@ public class PipelineSelector { * @return pipeline corresponding to nodes */ public static Pipeline newPipelineFromNodes( - List<DatanodeDetails> nodes, ReplicationType replicationType, - ReplicationFactor replicationFactor, String name) { + List<DatanodeDetails> nodes, LifeCycleState state, + ReplicationType replicationType, ReplicationFactor replicationFactor, + String name) { Preconditions.checkNotNull(nodes); Preconditions.checkArgument(nodes.size() > 0); String leaderId = nodes.get(0).getUuidString(); - // A new pipeline always starts in allocated state - Pipeline pipeline = new Pipeline(leaderId, LifeCycleState.ALLOCATED, - replicationType, replicationFactor, name); + Pipeline + pipeline = new Pipeline(leaderId, state, replicationType, + replicationFactor, name); for (DatanodeDetails node : nodes) { pipeline.addMember(node); } @@ -258,35 +175,8 @@ public class PipelineSelector { LOG.debug("Getting replication pipeline forReplicationType {} :" + " ReplicationFactor {}", replicationType.toString(), replicationFactor.toString()); - - /** - * In the Ozone world, we have a very simple policy. - * - * 1. Try to create a pipeline if there are enough free nodes. - * - * 2. This allows all nodes to part of a pipeline quickly. - * - * 3. if there are not enough free nodes, return already allocated pipeline - * in a round-robin fashion. - * - * TODO: Might have to come up with a better algorithm than this. - * Create a new placement policy that returns pipelines in round robin - * fashion. - */ - Pipeline pipeline = - manager.createPipeline(replicationFactor, replicationType); - if (pipeline == null) { - // try to return a pipeline from already allocated pipelines - pipeline = manager.getPipeline(replicationFactor, replicationType); - } else { - // if a new pipeline is created, initialize its state machine - updatePipelineState(pipeline,HddsProtos.LifeCycleEvent.CREATE); - - //TODO: move the initialization of pipeline to Ozone Client - manager.initializePipeline(pipeline); - updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATED); - } - return pipeline; + return manager. + getPipeline(replicationFactor, replicationType); } /** @@ -304,6 +194,19 @@ public class PipelineSelector { " pipelineName:{}", replicationType, pipelineName); return manager.getPipeline(pipelineName); } + /** + * Creates a pipeline from a specified set of Nodes. + */ + + public void createPipeline(ReplicationType replicationType, String + pipelineID, List<DatanodeDetails> datanodes) throws IOException { + PipelineManager manager = getPipelineManager(replicationType); + Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); + LOG.debug("Creating a pipeline: {} with nodes:{}", pipelineID, + datanodes.stream().map(DatanodeDetails::toString) + .collect(Collectors.joining(","))); + manager.createPipeline(pipelineID, datanodes); + } /** * Close the pipeline with the given clusterId. @@ -348,77 +251,12 @@ public class PipelineSelector { } public void removePipeline(UUID dnId) { - Set<Pipeline> pipelineSet = + Set<Pipeline> pipelineChannelSet = node2PipelineMap.getPipelines(dnId); - for (Pipeline pipeline : pipelineSet) { - getPipelineManager(pipeline.getType()) - .removePipeline(pipeline); + for (Pipeline pipelineChannel : pipelineChannelSet) { + getPipelineManager(pipelineChannel.getType()) + .removePipeline(pipelineChannel); } node2PipelineMap.removeDatanode(dnId); } - - /** - * Update the Pipeline State to the next state. - * - * @param pipeline - Pipeline - * @param event - LifeCycle Event - * @throws SCMException on Failure. - */ - public void updatePipelineState(Pipeline pipeline, - HddsProtos.LifeCycleEvent event) throws IOException { - HddsProtos.LifeCycleState newState; - try { - newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event); - } catch (InvalidStateTransitionException ex) { - String error = String.format("Failed to update pipeline state %s, " + - "reason: invalid state transition from state: %s upon " + - "event: %s.", - pipeline.getPipelineName(), pipeline.getLifeCycleState(), event); - LOG.error(error); - throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE); - } - - // This is a post condition after executing getNextState. - Preconditions.checkNotNull(newState); - Preconditions.checkNotNull(pipeline); - try { - switch (event) { - case CREATE: - // Acquire lease on pipeline - Lease<Pipeline> pipelineLease = pipelineLeaseManager.acquire(pipeline); - // Register callback to be executed in case of timeout - pipelineLease.registerCallBack(() -> { - updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.TIMEOUT); - return null; - }); - break; - case CREATED: - // Release the lease on pipeline - pipelineLeaseManager.release(pipeline); - break; - - case FINALIZE: - //TODO: cleanup pipeline by closing all the containers on the pipeline - break; - - case CLOSE: - case TIMEOUT: - // TODO: Release the nodes here when pipelines are destroyed - break; - default: - throw new SCMException("Unsupported pipeline LifeCycleEvent.", - FAILED_TO_CHANGE_PIPELINE_STATE); - } - - pipeline.setLifeCycleState(newState); - } catch (LeaseException e) { - throw new IOException("Lease Exception.", e); - } - } - - public void shutdown() { - if (pipelineLeaseManager != null) { - pipelineLeaseManager.shutdown(); - } - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java index c726ef6..a8f8b20 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap; import org.apache.hadoop.hdds.scm.pipelines.PipelineManager; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; @@ -71,7 +72,7 @@ public class RatisManagerImpl extends PipelineManager { * Allocates a new ratis Pipeline from the free nodes. * * @param factor - One or Three - * @return Pipeline. + * @return PipelineChannel. */ public Pipeline allocatePipeline(ReplicationFactor factor) { List<DatanodeDetails> newNodesList = new LinkedList<>(); @@ -88,23 +89,35 @@ public class RatisManagerImpl extends PipelineManager { // further allocations ratisMembers.addAll(newNodesList); LOG.info("Allocating a new ratis pipeline of size: {}", count); - // Start all pipeline names with "Ratis", easy to grep the logs. + // Start all channel names with "Ratis", easy to grep the logs. String pipelineName = PREFIX + UUID.randomUUID().toString().substring(PREFIX.length()); - return PipelineSelector.newPipelineFromNodes(newNodesList, - ReplicationType.RATIS, factor, pipelineName); + Pipeline pipeline= + PipelineSelector.newPipelineFromNodes(newNodesList, + LifeCycleState.OPEN, ReplicationType.RATIS, factor, pipelineName); + try (XceiverClientRatis client = + XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { + client.createPipeline(pipeline.getPipelineName(), newNodesList); + } catch (IOException e) { + return null; + } + return pipeline; } } } return null; } - public void initializePipeline(Pipeline pipeline) throws IOException { - //TODO:move the initialization from SCM to client - try (XceiverClientRatis client = - XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { - client.createPipeline(pipeline.getPipelineName(), pipeline.getMachines()); - } + /** + * Creates a pipeline from a specified set of Nodes. + * + * @param pipelineID - Name of the pipeline + * @param datanodes - The list of datanodes that make this pipeline. + */ + @Override + public void createPipeline(String pipelineID, + List<DatanodeDetails> datanodes) { + } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java index bb4951f..cf691bf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap; import org.apache.hadoop.hdds.scm.pipelines.PipelineManager; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; @@ -85,19 +86,29 @@ public class StandaloneManagerImpl extends PipelineManager { // once a datanode has been added to a pipeline, exclude it from // further allocations standAloneMembers.addAll(newNodesList); - LOG.info("Allocating a new standalone pipeline of size: {}", count); - String pipelineName = + LOG.info("Allocating a new standalone pipeline channel of size: {}", + count); + String channelName = "SA-" + UUID.randomUUID().toString().substring(3); return PipelineSelector.newPipelineFromNodes(newNodesList, - ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName); + LifeCycleState.OPEN, ReplicationType.STAND_ALONE, + ReplicationFactor.ONE, channelName); } } } return null; } - public void initializePipeline(Pipeline pipeline) { - // Nothing to be done for standalone pipeline + /** + * Creates a pipeline from a specified set of Nodes. + * + * @param pipelineID - Name of the pipeline + * @param datanodes - The list of datanodes that make this pipeline. + */ + @Override + public void createPipeline(String pipelineID, + List<DatanodeDetails> datanodes) { + //return newPipelineFromNodes(datanodes, pipelineID); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java index ffac6d5..bc3505f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java @@ -26,8 +26,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers .ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.junit.AfterClass; @@ -53,7 +51,6 @@ public class TestNode2PipelineMap { private static ContainerWithPipeline ratisContainer; private static ContainerStateMap stateMap; private static ContainerMapping mapping; - private static PipelineSelector pipelineSelector; /** * Create a MiniDFSCluster for testing. @@ -69,7 +66,6 @@ public class TestNode2PipelineMap { mapping = (ContainerMapping)scm.getScmContainerManager(); stateMap = mapping.getStateManager().getContainerStateMap(); ratisContainer = mapping.allocateContainer(RATIS, THREE, "testOwner"); - pipelineSelector = mapping.getPipelineSelector(); } /** @@ -117,15 +113,5 @@ public class TestNode2PipelineMap { NavigableSet<ContainerID> set2 = stateMap.getOpenContainerIDsByPipeline( ratisContainer.getPipeline().getPipelineName()); Assert.assertEquals(0, set2.size()); - - try { - pipelineSelector.updatePipelineState(ratisContainer.getPipeline(), - HddsProtos.LifeCycleEvent.CLOSE); - Assert.fail("closing of pipeline without finalize should fail"); - } catch (Exception e) { - Assert.assertTrue(e instanceof SCMException); - Assert.assertEquals(((SCMException)e).getResult(), - SCMException.ResultCodes.FAILED_TO_CHANGE_PIPELINE_STATE); - } } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org