This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new cf47339128 HDDS-9735. Datanodes to retry close pipeline commands until
pipeline is removed. (#5643)
cf47339128 is described below
commit cf47339128a38e71805760fe120577a61b61682e
Author: SaketaChalamchala <[email protected]>
AuthorDate: Wed Nov 22 06:30:30 2023 -0800
HDDS-9735. Datanodes to retry close pipeline commands until pipeline is
removed. (#5643)
---
.../common/statemachine/StateContext.java | 18 ++++-
.../common/statemachine/TestStateContext.java | 91 +++++++++++++++++++++-
2 files changed, 104 insertions(+), 5 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index b29db88890..13f7ad6150 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -515,8 +515,6 @@ public class StateContext {
int size = actions.size();
int limit = size > maxLimit ? maxLimit : size;
for (int count = 0; count < limit; count++) {
- // we need to remove the action from the containerAction queue
- // as well
ContainerAction action = actions.poll();
Preconditions.checkNotNull(action);
containerActionList.add(action);
@@ -580,6 +578,7 @@ public class StateContext {
InetSocketAddress endpoint,
int maxLimit) {
List<PipelineAction> pipelineActionList = new ArrayList<>();
+ List<PipelineAction> persistPipelineAction = new ArrayList<>();
synchronized (pipelineActions) {
if (!pipelineActions.isEmpty() &&
CollectionUtils.isNotEmpty(pipelineActions.get(endpoint))) {
@@ -588,8 +587,21 @@ public class StateContext {
int size = actionsForEndpoint.size();
int limit = size > maxLimit ? maxLimit : size;
for (int count = 0; count < limit; count++) {
- pipelineActionList.add(actionsForEndpoint.poll());
+ // Add closePipeline back to the pipelineAction queue until
+ // pipeline is closed and removed from the DN.
+ PipelineAction action = actionsForEndpoint.poll();
+ if (action.hasClosePipeline()) {
+ if (parentDatanodeStateMachine.getContainer().getPipelineReport()
+ .getPipelineReportList().stream().noneMatch(
+ report -> action.getClosePipeline().getPipelineID()
+ .equals(report.getPipelineID()))) {
+ continue;
+ }
+ persistPipelineAction.add(action);
+ }
+ pipelineActionList.add(action);
}
+ actionsForEndpoint.addAll(persistPipelineAction);
}
return pipelineActionList;
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index 2b20fbce7a..117ddd5b4e 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.ozone.container.common.statemachine;
import static
com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction.Action.CLOSE;
import static org.apache.ozone.test.GenericTestUtils.waitFor;
@@ -50,6 +52,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Descriptors.Descriptor;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
@@ -359,11 +362,94 @@ public class TestStateContext {
assertFalse(stateContext.getAllAvailableReports(scm1).isEmpty());
}
+ @Test
+ public void testClosePipelineActions() {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ DatanodeStateMachine datanodeStateMachineMock =
+ mock(DatanodeStateMachine.class);
+ OzoneContainer container = mock(OzoneContainer.class);
+ HddsProtos.PipelineID pipelineIDProto =
PipelineID.randomId().getProtobuf();
+
+ // Mock DN PipelineReport
+ // Report pipelineId the first two time.
+ // Return empty pipeline report the third time to mock the pipeline being
+ // closed and removed from the DN.
+ PipelineReportsProto.Builder
+ pipelineReportsProtoBuilder = PipelineReportsProto.newBuilder();
+ pipelineReportsProtoBuilder.addAllPipelineReport(Collections.singletonList(
+ PipelineReport.newBuilder()
+ .setPipelineID(pipelineIDProto)
+ .setIsLeader(false)
+ .build()));
+ PipelineReportsProto pipelineReportsProto =
+ pipelineReportsProtoBuilder.build();
+
+ StorageContainerDatanodeProtocolProtos.PipelineReportsProto.Builder
+ emptyPipelineReportsBuilder = PipelineReportsProto.newBuilder();
+ emptyPipelineReportsBuilder.addAllPipelineReport(Collections.emptyList());
+
+ when(container.getPipelineReport()).thenReturn(pipelineReportsProto,
+ pipelineReportsProto, emptyPipelineReportsBuilder.build());
+ when(datanodeStateMachineMock.getContainer()).thenReturn(container);
+
+ StateContext stateContext = new StateContext(conf,
+ DatanodeStates.getInitState(), datanodeStateMachineMock, "");
+
+ InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
+
+ // Add SCM endpoint.
+ stateContext.addEndpoint(scm1);
+
+ final ClosePipelineInfo closePipelineInfo = ClosePipelineInfo.newBuilder()
+ .setPipelineID(pipelineIDProto)
+ .setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED)
+ .setDetailedReason("Test").build();
+ final PipelineAction pipelineAction = PipelineAction.newBuilder()
+ .setClosePipeline(closePipelineInfo)
+ .setAction(PipelineAction.Action.CLOSE)
+ .build();
+
+ // Add PipelineAction. Should be added to all endpoints.
+ stateContext.addPipelineActionIfAbsent(pipelineAction);
+
+ // Get pipeline actions for scm1.
+ List<PipelineAction> pipelineActions =
+ stateContext.getPendingPipelineAction(scm1, 10);
+ assertEquals(1, pipelineActions.size());
+
+ // Ensure that the close pipeline action is not dequeued from scm1 since
+ // DN reports the pipelineID.
+ pipelineActions = stateContext.getPendingPipelineAction(scm1, 10);
+ assertEquals(1, pipelineActions.size());
+
+ // Ensure that the pipeline action is dequeued from scm1 when
+ // the DN closes and removes the pipeline
+ pipelineActions = stateContext.getPendingPipelineAction(scm1, 10);
+ assertEquals(0, pipelineActions.size());
+ }
+
@Test
public void testActionAPIs() {
OzoneConfiguration conf = new OzoneConfiguration();
DatanodeStateMachine datanodeStateMachineMock =
mock(DatanodeStateMachine.class);
+ OzoneContainer container = mock(OzoneContainer.class);
+ HddsProtos.PipelineID pipelineIDProto =
PipelineID.randomId().getProtobuf();
+
+ // Mock DN PipelineReport
+ PipelineReportsProto.Builder
+ pipelineReportsProtoBuilder = PipelineReportsProto.newBuilder();
+ pipelineReportsProtoBuilder.addAllPipelineReport(Collections.singletonList(
+ PipelineReport.newBuilder()
+ .setPipelineID(pipelineIDProto)
+ .setIsLeader(false)
+ .build()));
+ PipelineReportsProto pipelineReportsProto =
+ pipelineReportsProtoBuilder.build();
+
+ when(container.getPipelineReport()).thenReturn(pipelineReportsProto);
+ when(datanodeStateMachineMock.getContainer()).thenReturn(container);
+
StateContext stateContext = new StateContext(conf,
DatanodeStates.getInitState(), datanodeStateMachineMock, "");
@@ -385,7 +471,7 @@ public class TestStateContext {
stateContext.addEndpoint(scm2);
final ClosePipelineInfo closePipelineInfo = ClosePipelineInfo.newBuilder()
- .setPipelineID(PipelineID.randomId().getProtobuf())
+ .setPipelineID(pipelineIDProto)
.setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED)
.setDetailedReason("Test").build();
final PipelineAction pipelineAction = PipelineAction.newBuilder()
@@ -398,7 +484,8 @@ public class TestStateContext {
pipelineActions = stateContext.getPendingPipelineAction(scm2, 10);
assertEquals(1, pipelineActions.size());
- // The pipeline action is dequeued from scm2 now, but still in scm1
+ // The pipeline action will not be dequeued from scm2
+ // until it is removed from the DN.
// The same pipeline action will not be added if it already exists
stateContext.addPipelineActionIfAbsent(pipelineAction);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]