This is an automated email from the ASF dual-hosted git repository.

ivandika pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 87c394568e HDDS-10875. XceiverRatisServer#getRaftPeersInPipeline 
should be called before XceiverRatisServer#removeGroup (#6696)
87c394568e is described below

commit 87c394568e68c7f640e0185f3c64303d4b5994b4
Author: Ivan Andika <[email protected]>
AuthorDate: Tue May 21 11:52:22 2024 +0800

    HDDS-10875. XceiverRatisServer#getRaftPeersInPipeline should be called 
before XceiverRatisServer#removeGroup (#6696)
---
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  |  7 +++++
 .../ClosePipelineCommandHandler.java               | 32 +++++++++++++++++-----
 2 files changed, 32 insertions(+), 7 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
index bcea4d0193..5288c0bf50 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
@@ -61,6 +61,7 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.RoutingTable;
+import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
@@ -244,6 +245,12 @@ public final class RatisHelper {
         RatisHelper.createRetryPolicy(conf), tlsConfig, conf);
   }
 
+  public static BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> 
newRaftClientNoRetry(
+      ConfigurationSource conf) {
+    return (leader, tlsConfig) -> newRaftClient(getRpcType(conf), leader,
+        RetryPolicies.noRetry(), tlsConfig, conf);
+  }
+
   public static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
       RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
       ConfigurationSource configuration) {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
index 5242c8686d..241abb6f4a 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.ozone.container.common.statemachine
     .SCMConnectionManager;
@@ -68,7 +69,7 @@ public class ClosePipelineCommandHandler implements 
CommandHandler {
    */
   public ClosePipelineCommandHandler(ConfigurationSource conf,
                                      Executor executor) {
-    this(RatisHelper.newRaftClient(conf), executor);
+    this(RatisHelper.newRaftClientNoRetry(conf), executor);
   }
 
   /**
@@ -105,14 +106,16 @@ public class ClosePipelineCommandHandler implements 
CommandHandler {
       try {
         XceiverServerSpi server = ozoneContainer.getWriteChannel();
         if (server.isExist(pipelineIdProto)) {
-          server.removeGroup(pipelineIdProto);
           if (server instanceof XceiverServerRatis) {
             // TODO: Refactor Ratis logic to XceiverServerRatis
             // Propagate the group remove to the other Raft peers in the 
pipeline
             XceiverServerRatis ratisServer = (XceiverServerRatis) server;
             final RaftGroupId raftGroupId = 
RaftGroupId.valueOf(pipelineID.getId());
-            final Collection<RaftPeer> peers = 
ratisServer.getRaftPeersInPipeline(pipelineID);
             final boolean shouldDeleteRatisLogDirectory = 
ratisServer.getShouldDeleteRatisLogDirectory();
+            // This might throw GroupMismatchException if the Ratis group has 
been closed by other datanodes
+            final Collection<RaftPeer> peers = 
ratisServer.getRaftPeersInPipeline(pipelineID);
+            // Try to send remove group for the other datanodes first, 
ignoring GroupMismatchException
+            // if the Ratis group has been closed in the other datanodes
             peers.stream()
                 .filter(peer -> 
!peer.getId().equals(ratisServer.getServer().getId()))
                 .forEach(peer -> {
@@ -122,19 +125,34 @@ public class ClosePipelineCommandHandler implements 
CommandHandler {
                   } catch (GroupMismatchException ae) {
                     // ignore silently since this means that the group has 
been closed by earlier close pipeline
                     // command in another datanode
+                    LOG.debug("Failed to remove group {} for pipeline {} on 
peer {} since the group has " +
+                        "been removed by earlier close pipeline command 
handled in another datanode", raftGroupId,
+                        pipelineID, peer.getId());
                   } catch (IOException ioe) {
-                    LOG.warn("Failed to remove group {} for peer {}", 
raftGroupId, peer.getId(), ioe);
+                    LOG.warn("Failed to remove group {} of pipeline {} on peer 
{}",
+                        raftGroupId, pipelineID, peer.getId(), ioe);
                   }
                 });
           }
+          // Remove the Ratis group from the current datanode pipeline, might 
throw GroupMismatchException as
+          // well. It is a no-op for XceiverServerSpi implementations (e.g. 
XceiverServerGrpc)
+          server.removeGroup(pipelineIdProto);
           LOG.info("Close Pipeline {} command on datanode {}.", pipelineID,
               dn.getUuidString());
         } else {
-          LOG.debug("Ignoring close pipeline command for pipeline {} " +
-              "as it does not exist", pipelineID);
+          LOG.debug("Ignoring close pipeline command for pipeline {} on 
datanode {} " +
+              "as it does not exist", pipelineID, dn.getUuidString());
         }
       } catch (IOException e) {
-        LOG.error("Can't close pipeline {}", pipelineID, e);
+        Throwable gme = HddsClientUtils.containsException(e, 
GroupMismatchException.class);
+        if (gme != null) {
+          // ignore silently since this means that the group has been closed 
by earlier close pipeline
+          // command in another datanode
+          LOG.debug("The group for pipeline {} on datanode {} has been removed 
by earlier close " +
+              "pipeline command handled in another datanode", pipelineID, 
dn.getUuidString());
+        } else {
+          LOG.error("Can't close pipeline {}", pipelineID, e);
+        }
       } finally {
         long endTime = Time.monotonicNow();
         totalTime += endTime - startTime;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to