This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 81f8e7e1fa3 [AINode] Fix the bug that AINode would not stop during
remove process (#17088)
81f8e7e1fa3 is described below
commit 81f8e7e1fa36fba141ca4a94a829fbb5bac158a7
Author: Yongzao <[email protected]>
AuthorDate: Tue Jan 27 18:03:33 2026 +0800
[AINode] Fix the bug that AINode would not stop during remove process
(#17088)
---
.../client/sync/CnToAnSyncRequestType.java | 25 ++++
.../client/sync/SyncAINodeClientPool.java | 151 ++++++++++++++++++++
.../procedure/impl/node/RemoveAINodeProcedure.java | 22 ++-
.../iotdb/commons/client/ClientPoolFactory.java | 22 +++
.../commons/client/sync/SyncAINodeClient.java | 153 +++++++++++++++++++++
5 files changed, 360 insertions(+), 13 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToAnSyncRequestType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToAnSyncRequestType.java
new file mode 100644
index 00000000000..9636d2ecf5f
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToAnSyncRequestType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client.sync;
+
+public enum CnToAnSyncRequestType {
+ // Node Maintenance
+ STOP_AI_NODE,
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncAINodeClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncAINodeClientPool.java
new file mode 100644
index 00000000000..5ef9df1c997
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncAINodeClientPool.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client.sync;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncAINodeClient;
+import org.apache.iotdb.commons.exception.UncheckedStartupException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.ratis.util.function.CheckedBiFunction;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class SyncAINodeClientPool {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SyncAINodeClientPool.class);
+
+ private static final int DEFAULT_RETRY_NUM = 10;
+
+ private final IClientManager<TEndPoint, SyncAINodeClient> clientManager;
+
+ protected ImmutableMap<
+ CnToAnSyncRequestType, CheckedBiFunction<Object, SyncAINodeClient,
Object, Exception>>
+ actionMap;
+
+ private SyncAINodeClientPool() {
+ clientManager =
+ new IClientManager.Factory<TEndPoint, SyncAINodeClient>()
+ .createClientManager(new
ClientPoolFactory.SyncAINodeClientPoolFactory());
+ buildActionMap();
+ checkActionMapCompleteness();
+ }
+
+ private void buildActionMap() {
+ ImmutableMap.Builder<
+ CnToAnSyncRequestType, CheckedBiFunction<Object, SyncAINodeClient,
Object, Exception>>
+ actionMapBuilder = ImmutableMap.builder();
+ actionMapBuilder.put(CnToAnSyncRequestType.STOP_AI_NODE, (req, client) ->
client.stopAINode());
+ actionMap = actionMapBuilder.build();
+ }
+
+ private void checkActionMapCompleteness() {
+ List<CnToAnSyncRequestType> lackList =
+ Arrays.stream(CnToAnSyncRequestType.values())
+ .filter(type -> !actionMap.containsKey(type))
+ .collect(Collectors.toList());
+ if (!lackList.isEmpty()) {
+ throw new UncheckedStartupException(
+ String.format("These request types should be added to actionMap:
%s", lackList));
+ }
+ }
+
+ public Object sendSyncRequestToAINodeWithRetry(
+ TEndPoint endPoint, Object req, CnToAnSyncRequestType requestType) {
+ Throwable lastException = new TException();
+ for (int retry = 0; retry < DEFAULT_RETRY_NUM; retry++) {
+ try (SyncAINodeClient client = clientManager.borrowClient(endPoint)) {
+ return executeSyncRequest(requestType, client, req);
+ } catch (Exception e) {
+ lastException = e;
+ if (retry != DEFAULT_RETRY_NUM - 1) {
+ LOGGER.warn("{} failed on AINode {}, retrying {}...", requestType,
endPoint, retry + 1);
+ doRetryWait(retry);
+ }
+ }
+ }
+ LOGGER.error("{} failed on AINode {}", requestType, endPoint,
lastException);
+ return new
TSStatus(TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode())
+ .setMessage("All retry failed due to: " + lastException.getMessage());
+ }
+
+ public Object sendSyncRequestToAINodeWithGivenRetry(
+ TEndPoint endPoint, Object req, CnToAnSyncRequestType requestType, int
retryNum) {
+ Throwable lastException = new TException();
+ for (int retry = 0; retry < retryNum; retry++) {
+ try (SyncAINodeClient client = clientManager.borrowClient(endPoint)) {
+ return executeSyncRequest(requestType, client, req);
+ } catch (Exception e) {
+ lastException = e;
+ if (retry != retryNum - 1) {
+ LOGGER.warn("{} failed on AINode {}, retrying {}...", requestType,
endPoint, retry + 1);
+ doRetryWait(retry);
+ }
+ }
+ }
+ LOGGER.error("{} failed on AINode {}", requestType, endPoint,
lastException);
+ return new
TSStatus(TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode())
+ .setMessage("All retry failed due to: " + lastException.getMessage());
+ }
+
+ private Object executeSyncRequest(
+ CnToAnSyncRequestType requestType, SyncAINodeClient client, Object req)
throws Exception {
+ return Objects.requireNonNull(actionMap.get(requestType)).apply(req,
client);
+ }
+
+ private void doRetryWait(int retryNum) {
+ try {
+ if (retryNum < 3) {
+ TimeUnit.MILLISECONDS.sleep(800L);
+ } else if (retryNum < 5) {
+ TimeUnit.MILLISECONDS.sleep(100L * (long) Math.pow(2, retryNum));
+ } else {
+ TimeUnit.MILLISECONDS.sleep(3200L);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("Retry wait failed.", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private static class ClientPoolHolder {
+
+ private static final SyncAINodeClientPool INSTANCE = new
SyncAINodeClientPool();
+
+ private ClientPoolHolder() {
+ // Empty constructor
+ }
+ }
+
+ public static SyncAINodeClientPool getInstance() {
+ return SyncAINodeClientPool.ClientPoolHolder.INSTANCE;
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java
index 2a1c6881b14..facc2ce4ab7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java
@@ -22,13 +22,13 @@ package org.apache.iotdb.confignode.procedure.impl.node;
import org.apache.iotdb.common.rpc.thrift.TAINodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.client.sync.CnToAnSyncRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncAINodeClientPool;
import
org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.state.RemoveAINodeState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.db.protocol.client.an.AINodeClient;
-import org.apache.iotdb.db.protocol.client.an.AINodeClientManager;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -65,16 +65,13 @@ public class RemoveAINodeProcedure extends
AbstractNodeProcedure<RemoveAINodeSta
try {
switch (state) {
case NODE_STOP:
- TSStatus resp = null;
- try (AINodeClient client =
- AINodeClientManager.getInstance()
- .borrowClient(AINodeClientManager.AINODE_ID_PLACEHOLDER)) {
- resp = client.stopAINode();
- } catch (Exception e) {
- LOGGER.warn(
- "Failed to stop AINode {}, but the remove process will
continue.",
- removedAINode.getInternalEndPoint());
- }
+ TSStatus resp =
+ (TSStatus)
+ SyncAINodeClientPool.getInstance()
+ .sendSyncRequestToAINodeWithRetry(
+ removedAINode.getInternalEndPoint(),
+ null,
+ CnToAnSyncRequestType.STOP_AI_NODE);
if (resp != null && resp.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.info("Successfully stopped AINode {}",
removedAINode.getInternalEndPoint());
} else {
@@ -92,7 +89,6 @@ public class RemoveAINodeProcedure extends
AbstractNodeProcedure<RemoveAINodeSta
env.getConfigManager()
.getConsensusManager()
.write(new RemoveAINodePlan(removedAINode));
-
if (response.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new ProcedureException(
String.format(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index d09b9245e84..7ed34359b21 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.commons.client.property.ClientPoolProperty;
import org.apache.iotdb.commons.client.property.PipeConsensusClientProperty;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import
org.apache.iotdb.commons.client.property.ThriftClientProperty.DefaultProperty;
+import org.apache.iotdb.commons.client.sync.SyncAINodeClient;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import
org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
@@ -393,6 +394,27 @@ public class ClientPoolFactory {
}
}
+ public static class SyncAINodeClientPoolFactory
+ implements IClientPoolFactory<TEndPoint, SyncAINodeClient> {
+
+ @Override
+ public GenericKeyedObjectPool<TEndPoint, SyncAINodeClient>
createClientPool(
+ ClientManager<TEndPoint, SyncAINodeClient> manager) {
+ GenericKeyedObjectPool<TEndPoint, SyncAINodeClient> clientPool =
+ new GenericKeyedObjectPool<>(
+ new SyncAINodeClient.Factory(
+ manager,
+ new ThriftClientProperty.Builder()
+
.setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS())
+
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+ .build()),
+ new
ClientPoolProperty.Builder<SyncAINodeClient>().build().getConfig());
+ ClientManagerMetrics.getInstance()
+ .registerClientManager(this.getClass().getSimpleName(), clientPool);
+ return clientPool;
+ }
+ }
+
public static class AsyncAINodeHeartbeatServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint,
AsyncAINodeInternalServiceClient> {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncAINodeClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncAINodeClient.java
new file mode 100644
index 00000000000..054b6446099
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncAINodeClient.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client.sync;
+
+import org.apache.iotdb.ainode.rpc.thrift.IAINodeRPCService;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.net.SocketException;
+
+public class SyncAINodeClient extends IAINodeRPCService.Client
+ implements ThriftClient, AutoCloseable {
+
+ private final boolean printLogWhenEncounterException;
+ private final TEndPoint endpoint;
+ private final ClientManager<TEndPoint, SyncAINodeClient> clientManager;
+ private static final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
+
+ public SyncAINodeClient(
+ ThriftClientProperty property,
+ TEndPoint endpoint,
+ ClientManager<TEndPoint, SyncAINodeClient> clientManager)
+ throws TTransportException {
+ super(
+ property
+ .getProtocolFactory()
+ .getProtocol(
+ COMMON_CONFIG.isEnableInternalSSL()
+ ? DeepCopyRpcTransportFactory.INSTANCE.getTransport(
+ endpoint.getIp(),
+ endpoint.getPort(),
+ property.getConnectionTimeoutMs(),
+ COMMON_CONFIG.getTrustStorePath(),
+ COMMON_CONFIG.getTrustStorePwd(),
+ COMMON_CONFIG.getKeyStorePath(),
+ COMMON_CONFIG.getKeyStorePwd())
+ : DeepCopyRpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ TConfigurationConst.defaultTConfiguration,
+ endpoint.getIp(),
+ endpoint.getPort(),
+ property.getConnectionTimeoutMs()))));
+ this.printLogWhenEncounterException =
property.isPrintLogWhenEncounterException();
+ this.endpoint = endpoint;
+ this.clientManager = clientManager;
+ if (!getInputProtocol().getTransport().isOpen()) {
+ getInputProtocol().getTransport().open();
+ }
+ }
+
+ public int getTimeout() throws SocketException {
+ return ((TimeoutChangeableTransport)
getInputProtocol().getTransport()).getTimeOut();
+ }
+
+ public void setTimeout(int timeout) {
+ // the same transport is used in both input and output
+ ((TimeoutChangeableTransport)
(getInputProtocol().getTransport())).setTimeout(timeout);
+ }
+
+ public TEndPoint getEndpoint() {
+ return endpoint;
+ }
+
+ public ClientManager<TEndPoint, SyncAINodeClient> getClientManager() {
+ return clientManager;
+ }
+
+ @Override
+ public void close() throws Exception {
+ clientManager.returnClient(endpoint, this);
+ }
+
+ @Override
+ public void invalidate() {
+ getInputProtocol().getTransport().close();
+ }
+
+ @Override
+ public void invalidateAll() {
+ clientManager.clear(endpoint);
+ }
+
+ @Override
+ public boolean printLogWhenEncounterException() {
+ return printLogWhenEncounterException;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("SyncAINodeClient{%s}", endpoint);
+ }
+
+ public static class Factory extends ThriftClientFactory<TEndPoint,
SyncAINodeClient> {
+
+ public Factory(
+ ClientManager<TEndPoint, SyncAINodeClient> clientManager,
+ ThriftClientProperty thriftClientProperty) {
+ super(clientManager, thriftClientProperty);
+ }
+
+ @Override
+ public void destroyObject(TEndPoint endpoint,
PooledObject<SyncAINodeClient> pooledObject) {
+ pooledObject.getObject().invalidate();
+ }
+
+ @Override
+ public PooledObject<SyncAINodeClient> makeObject(TEndPoint endpoint)
throws Exception {
+ return new DefaultPooledObject<>(
+ SyncThriftClientWithErrorHandler.newErrorHandler(
+ SyncAINodeClient.class,
+ SyncAINodeClient.class.getConstructor(
+ thriftClientProperty.getClass(), endpoint.getClass(),
clientManager.getClass()),
+ thriftClientProperty,
+ endpoint,
+ clientManager));
+ }
+
+ @Override
+ public boolean validateObject(TEndPoint endpoint,
PooledObject<SyncAINodeClient> pooledObject) {
+ return
pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+ }
+ }
+}