This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new cf47339128 HDDS-9735. Datanodes to retry close pipeline commands until 
pipeline is removed. (#5643)
cf47339128 is described below

commit cf47339128a38e71805760fe120577a61b61682e
Author: SaketaChalamchala <[email protected]>
AuthorDate: Wed Nov 22 06:30:30 2023 -0800

    HDDS-9735. Datanodes to retry close pipeline commands until pipeline is 
removed. (#5643)
---
 .../common/statemachine/StateContext.java          | 18 ++++-
 .../common/statemachine/TestStateContext.java      | 91 +++++++++++++++++++++-
 2 files changed, 104 insertions(+), 5 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index b29db88890..13f7ad6150 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -515,8 +515,6 @@ public class StateContext {
         int size = actions.size();
         int limit = size > maxLimit ? maxLimit : size;
         for (int count = 0; count < limit; count++) {
-          // we need to remove the action from the containerAction queue
-          // as well
           ContainerAction action = actions.poll();
           Preconditions.checkNotNull(action);
           containerActionList.add(action);
@@ -580,6 +578,7 @@ public class StateContext {
       InetSocketAddress endpoint,
       int maxLimit) {
     List<PipelineAction> pipelineActionList = new ArrayList<>();
+    List<PipelineAction> persistPipelineAction = new ArrayList<>();
     synchronized (pipelineActions) {
       if (!pipelineActions.isEmpty() &&
           CollectionUtils.isNotEmpty(pipelineActions.get(endpoint))) {
@@ -588,8 +587,21 @@ public class StateContext {
         int size = actionsForEndpoint.size();
         int limit = size > maxLimit ? maxLimit : size;
         for (int count = 0; count < limit; count++) {
-          pipelineActionList.add(actionsForEndpoint.poll());
+          // Add closePipeline back to the pipelineAction queue until
+          // pipeline is closed and removed from the DN.
+          PipelineAction action = actionsForEndpoint.poll();
+          if (action.hasClosePipeline()) {
+            if (parentDatanodeStateMachine.getContainer().getPipelineReport()
+                .getPipelineReportList().stream().noneMatch(
+                    report -> action.getClosePipeline().getPipelineID()
+                        .equals(report.getPipelineID()))) {
+              continue;
+            }
+            persistPipelineAction.add(action);
+          }
+          pipelineActionList.add(action);
         }
+        actionsForEndpoint.addAll(persistPipelineAction);
       }
       return pipelineActionList;
     }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index 2b20fbce7a..117ddd5b4e 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.ozone.container.common.statemachine;
 
 import static 
com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction.Action.CLOSE;
 import static org.apache.ozone.test.GenericTestUtils.waitFor;
@@ -50,6 +52,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.Descriptors.Descriptor;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
@@ -359,11 +362,94 @@ public class TestStateContext {
     assertFalse(stateContext.getAllAvailableReports(scm1).isEmpty());
   }
 
+  @Test
+  public void testClosePipelineActions() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    DatanodeStateMachine datanodeStateMachineMock =
+        mock(DatanodeStateMachine.class);
+    OzoneContainer container = mock(OzoneContainer.class);
+    HddsProtos.PipelineID pipelineIDProto = 
PipelineID.randomId().getProtobuf();
+
+    // Mock DN PipelineReport
+    // Report pipelineId the first two time.
+    // Return empty pipeline report the third time to mock the pipeline being
+    // closed and removed from the DN.
+    PipelineReportsProto.Builder
+        pipelineReportsProtoBuilder = PipelineReportsProto.newBuilder();
+    pipelineReportsProtoBuilder.addAllPipelineReport(Collections.singletonList(
+        PipelineReport.newBuilder()
+            .setPipelineID(pipelineIDProto)
+            .setIsLeader(false)
+            .build()));
+    PipelineReportsProto pipelineReportsProto =
+        pipelineReportsProtoBuilder.build();
+
+    StorageContainerDatanodeProtocolProtos.PipelineReportsProto.Builder
+        emptyPipelineReportsBuilder = PipelineReportsProto.newBuilder();
+    emptyPipelineReportsBuilder.addAllPipelineReport(Collections.emptyList());
+
+    when(container.getPipelineReport()).thenReturn(pipelineReportsProto,
+        pipelineReportsProto, emptyPipelineReportsBuilder.build());
+    when(datanodeStateMachineMock.getContainer()).thenReturn(container);
+
+    StateContext stateContext = new StateContext(conf,
+        DatanodeStates.getInitState(), datanodeStateMachineMock, "");
+
+    InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
+
+    // Add SCM endpoint.
+    stateContext.addEndpoint(scm1);
+
+    final ClosePipelineInfo closePipelineInfo = ClosePipelineInfo.newBuilder()
+        .setPipelineID(pipelineIDProto)
+        .setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED)
+        .setDetailedReason("Test").build();
+    final PipelineAction pipelineAction = PipelineAction.newBuilder()
+        .setClosePipeline(closePipelineInfo)
+        .setAction(PipelineAction.Action.CLOSE)
+        .build();
+
+    // Add PipelineAction. Should be added to all endpoints.
+    stateContext.addPipelineActionIfAbsent(pipelineAction);
+
+    // Get pipeline actions for scm1.
+    List<PipelineAction> pipelineActions =
+        stateContext.getPendingPipelineAction(scm1, 10);
+    assertEquals(1, pipelineActions.size());
+
+    // Ensure that the close pipeline action is not dequeued from scm1 since
+    // DN reports the pipelineID.
+    pipelineActions = stateContext.getPendingPipelineAction(scm1, 10);
+    assertEquals(1, pipelineActions.size());
+
+    // Ensure that the pipeline action is dequeued from scm1 when
+    // the DN closes and removes the pipeline
+    pipelineActions = stateContext.getPendingPipelineAction(scm1, 10);
+    assertEquals(0, pipelineActions.size());
+  }
+
   @Test
   public void testActionAPIs() {
     OzoneConfiguration conf = new OzoneConfiguration();
     DatanodeStateMachine datanodeStateMachineMock =
         mock(DatanodeStateMachine.class);
+    OzoneContainer container = mock(OzoneContainer.class);
+    HddsProtos.PipelineID pipelineIDProto = 
PipelineID.randomId().getProtobuf();
+
+    // Mock DN PipelineReport
+    PipelineReportsProto.Builder
+        pipelineReportsProtoBuilder = PipelineReportsProto.newBuilder();
+    pipelineReportsProtoBuilder.addAllPipelineReport(Collections.singletonList(
+        PipelineReport.newBuilder()
+            .setPipelineID(pipelineIDProto)
+            .setIsLeader(false)
+            .build()));
+    PipelineReportsProto pipelineReportsProto =
+        pipelineReportsProtoBuilder.build();
+
+    when(container.getPipelineReport()).thenReturn(pipelineReportsProto);
+    when(datanodeStateMachineMock.getContainer()).thenReturn(container);
+
     StateContext stateContext = new StateContext(conf,
         DatanodeStates.getInitState(), datanodeStateMachineMock, "");
 
@@ -385,7 +471,7 @@ public class TestStateContext {
     stateContext.addEndpoint(scm2);
 
     final ClosePipelineInfo closePipelineInfo = ClosePipelineInfo.newBuilder()
-        .setPipelineID(PipelineID.randomId().getProtobuf())
+        .setPipelineID(pipelineIDProto)
         .setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED)
         .setDetailedReason("Test").build();
     final PipelineAction pipelineAction = PipelineAction.newBuilder()
@@ -398,7 +484,8 @@ public class TestStateContext {
 
     pipelineActions = stateContext.getPendingPipelineAction(scm2, 10);
     assertEquals(1, pipelineActions.size());
-    // The pipeline action is dequeued from scm2 now, but still in scm1
+    // The pipeline action will not be dequeued from scm2
+    // until it is removed from the DN.
 
     // The same pipeline action will not be added if it already exists
     stateContext.addPipelineActionIfAbsent(pipelineAction);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to