This is an automated email from the ASF dual-hosted git repository. CRZbulabula pushed a commit to branch fix/add-confignode-idempotent-retry in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8a2485aee4d9c6c7d2429138fa7760d06d43ec9d Author: Yongzao <[email protected]> AuthorDate: Tue Jun 9 14:51:22 2026 +0800 Fix AddConfigNode retry idempotency --- .../manager/consensus/ConsensusManager.java | 23 +++- .../thrift/ConfigNodeRPCServiceProcessor.java | 9 +- .../manager/consensus/ConsensusManagerTest.java | 116 +++++++++++++++++++++ .../thrift/ConfigNodeRPCServiceProcessorTest.java | 47 +++++++++ 4 files changed, 192 insertions(+), 3 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java index 8b4eeed5a1b..84594b0d7a8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.ConfigRegionId; import org.apache.iotdb.commons.consensus.ConsensusGroupId; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.conf.SystemPropertiesUtils; @@ -44,6 +45,9 @@ import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.config.ConsensusConfig; import org.apache.iotdb.consensus.config.RatisConfig; import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; +import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException; +import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.rpc.TSStatusCode; @@ -86,6 +90,12 @@ public class ConsensusManager { setConsensusLayer(stateMachine); } + @TestOnly + ConsensusManager(IManager configManager, IConsensus consensusImpl) { + this.configManager = configManager; + this.consensusImpl = consensusImpl; + } + public void start() throws IOException { consensusImpl.start(); if (SystemPropertiesUtils.isRestarted()) { @@ -289,7 +299,11 @@ public class ConsensusManager { configNodeLocation.getConfigNodeId(), configNodeLocation.getConsensusEndPoint())); } - consensusImpl.createLocalPeer(DEFAULT_CONSENSUS_GROUP_ID, peerList); + try { + consensusImpl.createLocalPeer(DEFAULT_CONSENSUS_GROUP_ID, peerList); + } catch (ConsensusGroupAlreadyExistException e) { + LOGGER.info("ConfigNode local peer has already been created: {}", e.getMessage()); + } } /** @@ -306,6 +320,9 @@ public class ConsensusManager { DEFAULT_CONSENSUS_GROUP_ID, configNodeLocation.getConfigNodeId(), configNodeLocation.getConsensusEndPoint())); + } catch (PeerAlreadyInConsensusGroupException e) { + LOGGER.info( + "ConfigNode peer {} has already been added: {}", configNodeLocation, e.getMessage()); } catch (ConsensusException e) { throw new AddPeerException(configNodeLocation); } @@ -327,6 +344,10 @@ public class ConsensusManager { configNodeLocation.getConfigNodeId(), configNodeLocation.getConsensusEndPoint())); return true; + } catch (PeerNotInConsensusGroupException e) { + LOGGER.info( + "ConfigNode peer {} has already been removed: {}", configNodeLocation, e.getMessage()); + return true; } catch (ConsensusException e) { return false; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index dbb35839045..589d4e2a9c7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -230,6 +230,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq; import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq; import org.apache.iotdb.confignode.service.ConfigNode; import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; import org.apache.iotdb.db.queryengine.plan.relational.type.AuthorRType; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -859,15 +860,19 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac @Override public TSStatus deleteConfigNodePeer(TConfigNodeLocation configNodeLocation) { - if (!configManager.getNodeManager().getRegisteredConfigNodes().contains(configNodeLocation)) { + if (configNodeConfig.getConfigNodeId() != -1 + && configNodeLocation.getConfigNodeId() != configNodeConfig.getConfigNodeId()) { return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode()) .setMessage( - "remove ConsensusGroup failed because the ConfigNode not in current Cluster."); + "remove ConsensusGroup failed because the target ConfigNode is not current ConfigNode."); } ConsensusGroupId groupId = configManager.getConsensusManager().getConsensusGroupId(); try { configManager.getConsensusManager().getConsensusImpl().deleteLocalPeer(groupId); + } catch (ConsensusGroupNotExistException e) { + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()) + .setMessage(ConfigNodeMessages.REMOVE_CONSENSUSGROUP_SUCCESS); } catch (ConsensusException e) { return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode()) .setMessage( diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManagerTest.java new file mode 100644 index 00000000000..2f3b9b31890 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManagerTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.consensus; + +import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.confignode.exception.AddPeerException; +import org.apache.iotdb.confignode.manager.IManager; +import org.apache.iotdb.consensus.IConsensus; +import org.apache.iotdb.consensus.common.Peer; +import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; +import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException; +import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Collections; + +public class ConsensusManagerTest { + + @Test + public void createPeerForConsensusGroupShouldIgnoreAlreadyCreatedLocalPeer() throws Exception { + final IConsensus consensus = Mockito.mock(IConsensus.class); + Mockito.doThrow( + new ConsensusGroupAlreadyExistException(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID)) + .when(consensus) + .createLocalPeer( + Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), Mockito.anyList()); + + newConsensusManager(consensus) + .createPeerForConsensusGroup(Collections.singletonList(newConfigNodeLocation(1))); + + Mockito.verify(consensus) + .createLocalPeer( + Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), Mockito.anyList()); + } + + @Test + public void addConfigNodePeerShouldIgnoreAlreadyAddedPeer() throws Exception { + final IConsensus consensus = Mockito.mock(IConsensus.class); + Mockito.doThrow( + new PeerAlreadyInConsensusGroupException( + ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID, + new Peer( + ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID, + 1, + new TEndPoint("127.0.0.1", 10720)))) + .when(consensus) + .addRemotePeer( + Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), Mockito.any(Peer.class)); + + newConsensusManager(consensus).addConfigNodePeer(newConfigNodeLocation(1)); + + Mockito.verify(consensus) + .addRemotePeer( + Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), Mockito.any(Peer.class)); + } + + @Test + public void addConfigNodePeerShouldKeepFailingForOtherConsensusErrors() throws Exception { + final IConsensus consensus = Mockito.mock(IConsensus.class); + Mockito.doThrow(new ConsensusException("reconfiguration failed")) + .when(consensus) + .addRemotePeer( + Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), Mockito.any(Peer.class)); + + Assert.assertThrows( + AddPeerException.class, + () -> newConsensusManager(consensus).addConfigNodePeer(newConfigNodeLocation(1))); + } + + @Test + public void removeConfigNodePeerShouldIgnoreAlreadyRemovedPeer() throws Exception { + final IConsensus consensus = Mockito.mock(IConsensus.class); + Mockito.doThrow( + new PeerNotInConsensusGroupException( + ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID, "127.0.0.1:10720")) + .when(consensus) + .removeRemotePeer( + Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), Mockito.any(Peer.class)); + + Assert.assertTrue( + newConsensusManager(consensus).removeConfigNodePeer(newConfigNodeLocation(1))); + } + + private static ConsensusManager newConsensusManager(final IConsensus consensus) throws Exception { + return new ConsensusManager(Mockito.mock(IManager.class), consensus); + } + + private static TConfigNodeLocation newConfigNodeLocation(final int configNodeId) { + return new TConfigNodeLocation( + configNodeId, + new TEndPoint("127.0.0.1", 10710 + configNodeId), + new TEndPoint("127.0.0.1", 10720 + configNodeId)); + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java index c4d993b5a79..336bba31c37 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java @@ -27,12 +27,15 @@ import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp; import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp; import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration; import org.apache.iotdb.confignode.service.ConfigNode; +import org.apache.iotdb.consensus.IConsensus; +import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.rpc.TimeoutChangeableTFastFramedTransport; @@ -161,4 +164,48 @@ public class ConfigNodeRPCServiceProcessorTest extends TestCase { "1.2.3.4", sentRequest.getDataNodeConfiguration().getLocation().getClientRpcEndPoint().getIp()); } + + public void testDeleteConfigNodePeerShouldIgnoreMissingLocalPeer() throws Exception { + CommonConfig commonConfig = Mockito.mock(CommonConfig.class); + ConfigNodeConfig configNodeConfig = Mockito.mock(ConfigNodeConfig.class); + ConfigNode configNode = Mockito.mock(ConfigNode.class); + ConfigManager configManager = Mockito.mock(ConfigManager.class); + ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); + IConsensus consensus = Mockito.mock(IConsensus.class); + Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); + Mockito.when(consensusManager.getConsensusGroupId()) + .thenReturn(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID); + Mockito.when(consensusManager.getConsensusImpl()).thenReturn(consensus); + Mockito.doThrow( + new ConsensusGroupNotExistException(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID)) + .when(consensus) + .deleteLocalPeer(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID); + ConfigNodeRPCServiceProcessor sut = + new ConfigNodeRPCServiceProcessor( + commonConfig, configNodeConfig, configNode, configManager); + + TSStatus status = sut.deleteConfigNodePeer(new TConfigNodeLocation()); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + Mockito.verify(configManager, Mockito.never()).getNodeManager(); + } + + public void testDeleteConfigNodePeerShouldRejectMismatchedTarget() throws Exception { + CommonConfig commonConfig = Mockito.mock(CommonConfig.class); + ConfigNodeConfig configNodeConfig = Mockito.mock(ConfigNodeConfig.class); + ConfigNode configNode = Mockito.mock(ConfigNode.class); + ConfigManager configManager = Mockito.mock(ConfigManager.class); + Mockito.when(configNodeConfig.getConfigNodeId()).thenReturn(2); + ConfigNodeRPCServiceProcessor sut = + new ConfigNodeRPCServiceProcessor( + commonConfig, configNodeConfig, configNode, configManager); + + TSStatus status = + sut.deleteConfigNodePeer( + new TConfigNodeLocation( + 1, new TEndPoint("127.0.0.1", 10710), new TEndPoint("127.0.0.1", 10720))); + + Assert.assertEquals(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode(), status.getCode()); + Mockito.verify(configManager, Mockito.never()).getConsensusManager(); + } }
