This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new ac68072619 HDDS-9959. Propagate group remove to other datanodes during
pipeline close (#5827)
ac68072619 is described below
commit ac6807261937415b3f9c00b90b065a57aa5858c4
Author: Ivan Andika <[email protected]>
AuthorDate: Thu Jan 4 20:32:44 2024 +0700
HDDS-9959. Propagate group remove to other datanodes during pipeline close
(#5827)
---
.../common/statemachine/DatanodeStateMachine.java | 4 +-
.../ClosePipelineCommandHandler.java | 45 ++++++-
.../transport/server/ratis/XceiverServerRatis.java | 10 ++
.../TestClosePipelineCommandHandler.java | 144 +++++++++++++++++++++
.../hdds/scm/pipeline/RatisPipelineUtils.java | 65 +---------
5 files changed, 201 insertions(+), 67 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 51290cf80d..9677144054 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -250,8 +250,8 @@ public class DatanodeStateMachine implements Closeable {
.addHandler(new DeleteContainerCommandHandler(
dnConf.getContainerDeleteThreads(), clock,
dnConf.getCommandQueueLimit(), threadNamePrefix))
- .addHandler(
- new ClosePipelineCommandHandler(pipelineCommandExecutorService))
+ .addHandler(new ClosePipelineCommandHandler(conf,
+ pipelineCommandExecutorService))
.addHandler(new CreatePipelineCommandHandler(conf,
pipelineCommandExecutorService))
.addHandler(new SetNodeOperationalStateCommandHandler(conf,
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
index f332ad4f13..5242c8686d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -16,28 +16,38 @@
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server
.XceiverServerSpi;
+import
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
/**
* Handler for close pipeline command received from SCM.
@@ -51,11 +61,23 @@ public class ClosePipelineCommandHandler implements
CommandHandler {
private final AtomicInteger queuedCount = new AtomicInteger(0);
private long totalTime;
private final Executor executor;
+ private final BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient;
/**
* Constructs a closePipelineCommand handler.
*/
- public ClosePipelineCommandHandler(Executor executor) {
+ public ClosePipelineCommandHandler(ConfigurationSource conf,
+ Executor executor) {
+ this(RatisHelper.newRaftClient(conf), executor);
+ }
+
+ /**
+ * Constructs a closePipelineCommand handler.
+ */
+ public ClosePipelineCommandHandler(
+ BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient,
+ Executor executor) {
+ this.newRaftClient = newRaftClient;
this.executor = executor;
}
@@ -84,6 +106,27 @@ public class ClosePipelineCommandHandler implements
CommandHandler {
XceiverServerSpi server = ozoneContainer.getWriteChannel();
if (server.isExist(pipelineIdProto)) {
server.removeGroup(pipelineIdProto);
+ if (server instanceof XceiverServerRatis) {
+ // TODO: Refactor Ratis logic to XceiverServerRatis
+ // Propagate the group remove to the other Raft peers in the
pipeline
+ XceiverServerRatis ratisServer = (XceiverServerRatis) server;
+ final RaftGroupId raftGroupId =
RaftGroupId.valueOf(pipelineID.getId());
+ final Collection<RaftPeer> peers =
ratisServer.getRaftPeersInPipeline(pipelineID);
+ final boolean shouldDeleteRatisLogDirectory =
ratisServer.getShouldDeleteRatisLogDirectory();
+ peers.stream()
+ .filter(peer ->
!peer.getId().equals(ratisServer.getServer().getId()))
+ .forEach(peer -> {
+ try (RaftClient client = newRaftClient.apply(peer,
ozoneContainer.getTlsClientConfig())) {
+ client.getGroupManagementApi(peer.getId())
+ .remove(raftGroupId, shouldDeleteRatisLogDirectory,
!shouldDeleteRatisLogDirectory);
+ } catch (GroupMismatchException ae) {
+ // ignore silently since this means that the group has
been closed by earlier close pipeline
+ // command in another datanode
+ } catch (IOException ioe) {
+ LOG.warn("Failed to remove group {} for peer {}",
raftGroupId, peer.getId(), ioe);
+ }
+ });
+ }
LOG.info("Close Pipeline {} command on datanode {}.", pipelineID,
dn.getUuidString());
} else {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 4688ce4b27..6d119b17b3 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -83,6 +83,7 @@ import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.protocol.ClientId;
@@ -622,6 +623,10 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
return server.getDivision(id);
}
+ public boolean getShouldDeleteRatisLogDirectory() {
+ return this.shouldDeleteRatisLogDirectory;
+ }
+
private void processReply(RaftClientReply reply) throws IOException {
// NotLeader exception is thrown only when the raft server to which the
// request is submitted is not the leader. The request will be rejected
@@ -919,6 +924,11 @@ public final class XceiverServerRatis implements
XceiverServerSpi {
return minIndex == null ? -1 : minIndex;
}
+ public Collection<RaftPeer> getRaftPeersInPipeline(PipelineID pipelineId)
throws IOException {
+ final RaftGroupId groupId = RaftGroupId.valueOf(pipelineId.getId());
+ return server.getDivision(groupId).getGroup().getPeers();
+ }
+
public void notifyGroupRemove(RaftGroupId gid) {
raftGids.remove(gid);
// Remove any entries for group leader map
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java
new file mode 100644
index 0000000000..d161f5537a
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.api.GroupManagementApi;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases to verify ClosePipelineCommandHandler.
+ */
+public class TestClosePipelineCommandHandler {
+
+ private OzoneContainer ozoneContainer;
+ private StateContext stateContext;
+ private SCMConnectionManager connectionManager;
+ private RaftClient raftClient;
+ private GroupManagementApi raftClientGroupManager;
+ private OzoneConfiguration conf;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ conf = new OzoneConfiguration();
+ ozoneContainer = mock(OzoneContainer.class);
+ connectionManager = mock(SCMConnectionManager.class);
+ raftClient = mock(RaftClient.class);
+ raftClientGroupManager = mock(GroupManagementApi.class);
+ lenient().when(raftClient.getGroupManagementApi(
+ any(RaftPeerId.class))).thenReturn(raftClientGroupManager);
+ }
+
+ @Test
+ void testPipelineClose() throws IOException {
+ final List<DatanodeDetails> datanodes = getDatanodes();
+ final DatanodeDetails currentDatanode = datanodes.get(0);
+ final PipelineID pipelineID = PipelineID.randomId();
+ final SCMCommand<ClosePipelineCommandProto> command =
+ new ClosePipelineCommand(pipelineID);
+ stateContext = ContainerTestUtils.getMockContext(currentDatanode, conf);
+
+ final boolean shouldDeleteRatisLogDirectory = true;
+ XceiverServerRatis writeChannel = mock(XceiverServerRatis.class);
+ when(ozoneContainer.getWriteChannel()).thenReturn(writeChannel);
+
when(writeChannel.getShouldDeleteRatisLogDirectory()).thenReturn(shouldDeleteRatisLogDirectory);
+ when(writeChannel.isExist(pipelineID.getProtobuf())).thenReturn(true);
+ Collection<RaftPeer> raftPeers = datanodes.stream()
+ .map(RatisHelper::toRaftPeer)
+ .collect(Collectors.toList());
+ when(writeChannel.getServer()).thenReturn(mock(RaftServer.class));
+
when(writeChannel.getServer().getId()).thenReturn(RatisHelper.toRaftPeerId(currentDatanode));
+
when(writeChannel.getRaftPeersInPipeline(pipelineID)).thenReturn(raftPeers);
+
+ final ClosePipelineCommandHandler commandHandler =
+ new ClosePipelineCommandHandler((leader, tls) -> raftClient,
MoreExecutors.directExecutor());
+ commandHandler.handle(command, ozoneContainer, stateContext,
connectionManager);
+
+ verify(writeChannel, times(1))
+ .removeGroup(pipelineID.getProtobuf());
+
+ verify(raftClientGroupManager, times(2))
+ .remove(any(), eq(shouldDeleteRatisLogDirectory),
eq(!shouldDeleteRatisLogDirectory));
+ }
+
+ @Test
+ void testCommandIdempotency() throws IOException {
+ final List<DatanodeDetails> datanodes = getDatanodes();
+ final DatanodeDetails currentDatanode = datanodes.get(0);
+ final PipelineID pipelineID = PipelineID.randomId();
+ final SCMCommand<ClosePipelineCommandProto> command =
+ new ClosePipelineCommand(pipelineID);
+ stateContext = ContainerTestUtils.getMockContext(currentDatanode, conf);
+
+ XceiverServerRatis writeChannel = mock(XceiverServerRatis.class);
+ when(ozoneContainer.getWriteChannel()).thenReturn(writeChannel);
+ // When the pipeline has been closed earlier by other datanode that
received a close pipeline command
+ when(writeChannel.isExist(pipelineID.getProtobuf())).thenReturn(false);
+
+ final ClosePipelineCommandHandler commandHandler =
+ new ClosePipelineCommandHandler(conf, MoreExecutors.directExecutor());
+ commandHandler.handle(command, ozoneContainer, stateContext,
connectionManager);
+
+ verify(writeChannel, times(0))
+ .removeGroup(pipelineID.getProtobuf());
+
+ verify(raftClientGroupManager, times(0))
+ .remove(any(), anyBoolean(), anyBoolean());
+ }
+
+ private List<DatanodeDetails> getDatanodes() {
+ final DatanodeDetails dnOne = MockDatanodeDetails.randomDatanodeDetails();
+ final DatanodeDetails dnTwo = MockDatanodeDetails.randomDatanodeDetails();
+ final DatanodeDetails dnThree =
MockDatanodeDetails.randomDatanodeDetails();
+ return Arrays.asList(dnOne, dnTwo, dnThree);
+ }
+
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index 6c68b80883..04c35cc1fe 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -17,29 +17,16 @@
*/
package org.apache.hadoop.hdds.scm.pipeline;
-import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.ratis.RatisHelper;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.rpc.SupportedRpcType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Utility class for Ratis pipelines. Contains methods to create and destroy
- * ratis pipelines.
+ * Utility class for Ratis pipelines.
*/
public final class RatisPipelineUtils {
@@ -48,56 +35,6 @@ public final class RatisPipelineUtils {
private RatisPipelineUtils() {
}
- /**
- * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
- * the datanodes.
- *
- * @param pipeline - Pipeline to be destroyed
- * @param ozoneConf - Ozone configuration
- * @param grpcTlsConfig
- * @throws IOException
- */
- public static void destroyPipeline(Pipeline pipeline,
- ConfigurationSource ozoneConf,
- GrpcTlsConfig grpcTlsConfig) {
- final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
- if (LOG.isDebugEnabled()) {
- LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
- }
- for (DatanodeDetails dn : pipeline.getNodes()) {
- try {
- destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
- } catch (IOException e) {
- LOG.warn("Pipeline destroy failed for pipeline={} dn={} exception={}",
- pipeline.getId(), dn, e.getMessage());
- }
- }
- }
-
- /**
- * Sends ratis command to destroy pipeline on the given datanode.
- *
- * @param dn - Datanode on which pipeline needs to be destroyed
- * @param pipelineID - ID of pipeline to be destroyed
- * @param ozoneConf - Ozone configuration
- * @param grpcTlsConfig - grpc tls configuration
- * @throws IOException
- */
- static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
- ConfigurationSource ozoneConf, GrpcTlsConfig grpcTlsConfig)
- throws IOException {
- final String rpcType = ozoneConf
- .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
- ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
- final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
- final RaftPeer p = RatisHelper.toRaftPeer(dn);
- try (RaftClient client = RatisHelper
- .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
- retryPolicy, grpcTlsConfig, ozoneConf)) {
- client.getGroupManagementApi(p.getId())
- .remove(RaftGroupId.valueOf(pipelineID.getId()), true, false);
- }
- }
/**
* Return the list of pipelines who share the same set of datanodes
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]