This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 81f8e7e1fa3 [AINode] Fix the bug that AINode would not stop during 
remove process (#17088)
81f8e7e1fa3 is described below

commit 81f8e7e1fa36fba141ca4a94a829fbb5bac158a7
Author: Yongzao <[email protected]>
AuthorDate: Tue Jan 27 18:03:33 2026 +0800

    [AINode] Fix the bug that AINode would not stop during remove process 
(#17088)
---
 .../client/sync/CnToAnSyncRequestType.java         |  25 ++++
 .../client/sync/SyncAINodeClientPool.java          | 151 ++++++++++++++++++++
 .../procedure/impl/node/RemoveAINodeProcedure.java |  22 ++-
 .../iotdb/commons/client/ClientPoolFactory.java    |  22 +++
 .../commons/client/sync/SyncAINodeClient.java      | 153 +++++++++++++++++++++
 5 files changed, 360 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToAnSyncRequestType.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToAnSyncRequestType.java
new file mode 100644
index 00000000000..9636d2ecf5f
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToAnSyncRequestType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client.sync;
+
+public enum CnToAnSyncRequestType {
+  // Node Maintenance
+  STOP_AI_NODE,
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncAINodeClientPool.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncAINodeClientPool.java
new file mode 100644
index 00000000000..5ef9df1c997
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncAINodeClientPool.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client.sync;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncAINodeClient;
+import org.apache.iotdb.commons.exception.UncheckedStartupException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.ratis.util.function.CheckedBiFunction;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class SyncAINodeClientPool {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SyncAINodeClientPool.class);
+
+  private static final int DEFAULT_RETRY_NUM = 10;
+
+  private final IClientManager<TEndPoint, SyncAINodeClient> clientManager;
+
+  protected ImmutableMap<
+          CnToAnSyncRequestType, CheckedBiFunction<Object, SyncAINodeClient, 
Object, Exception>>
+      actionMap;
+
+  private SyncAINodeClientPool() {
+    clientManager =
+        new IClientManager.Factory<TEndPoint, SyncAINodeClient>()
+            .createClientManager(new 
ClientPoolFactory.SyncAINodeClientPoolFactory());
+    buildActionMap();
+    checkActionMapCompleteness();
+  }
+
+  private void buildActionMap() {
+    ImmutableMap.Builder<
+            CnToAnSyncRequestType, CheckedBiFunction<Object, SyncAINodeClient, 
Object, Exception>>
+        actionMapBuilder = ImmutableMap.builder();
+    actionMapBuilder.put(CnToAnSyncRequestType.STOP_AI_NODE, (req, client) -> 
client.stopAINode());
+    actionMap = actionMapBuilder.build();
+  }
+
+  private void checkActionMapCompleteness() {
+    List<CnToAnSyncRequestType> lackList =
+        Arrays.stream(CnToAnSyncRequestType.values())
+            .filter(type -> !actionMap.containsKey(type))
+            .collect(Collectors.toList());
+    if (!lackList.isEmpty()) {
+      throw new UncheckedStartupException(
+          String.format("These request types should be added to actionMap: 
%s", lackList));
+    }
+  }
+
+  public Object sendSyncRequestToAINodeWithRetry(
+      TEndPoint endPoint, Object req, CnToAnSyncRequestType requestType) {
+    Throwable lastException = new TException();
+    for (int retry = 0; retry < DEFAULT_RETRY_NUM; retry++) {
+      try (SyncAINodeClient client = clientManager.borrowClient(endPoint)) {
+        return executeSyncRequest(requestType, client, req);
+      } catch (Exception e) {
+        lastException = e;
+        if (retry != DEFAULT_RETRY_NUM - 1) {
+          LOGGER.warn("{} failed on AINode {}, retrying {}...", requestType, 
endPoint, retry + 1);
+          doRetryWait(retry);
+        }
+      }
+    }
+    LOGGER.error("{} failed on AINode {}", requestType, endPoint, 
lastException);
+    return new 
TSStatus(TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode())
+        .setMessage("All retry failed due to: " + lastException.getMessage());
+  }
+
+  public Object sendSyncRequestToAINodeWithGivenRetry(
+      TEndPoint endPoint, Object req, CnToAnSyncRequestType requestType, int 
retryNum) {
+    Throwable lastException = new TException();
+    for (int retry = 0; retry < retryNum; retry++) {
+      try (SyncAINodeClient client = clientManager.borrowClient(endPoint)) {
+        return executeSyncRequest(requestType, client, req);
+      } catch (Exception e) {
+        lastException = e;
+        if (retry != retryNum - 1) {
+          LOGGER.warn("{} failed on AINode {}, retrying {}...", requestType, 
endPoint, retry + 1);
+          doRetryWait(retry);
+        }
+      }
+    }
+    LOGGER.error("{} failed on AINode {}", requestType, endPoint, 
lastException);
+    return new 
TSStatus(TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode())
+        .setMessage("All retry failed due to: " + lastException.getMessage());
+  }
+
+  private Object executeSyncRequest(
+      CnToAnSyncRequestType requestType, SyncAINodeClient client, Object req) 
throws Exception {
+    return Objects.requireNonNull(actionMap.get(requestType)).apply(req, 
client);
+  }
+
+  private void doRetryWait(int retryNum) {
+    try {
+      if (retryNum < 3) {
+        TimeUnit.MILLISECONDS.sleep(800L);
+      } else if (retryNum < 5) {
+        TimeUnit.MILLISECONDS.sleep(100L * (long) Math.pow(2, retryNum));
+      } else {
+        TimeUnit.MILLISECONDS.sleep(3200L);
+      }
+    } catch (InterruptedException e) {
+      LOGGER.warn("Retry wait failed.", e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private static class ClientPoolHolder {
+
+    private static final SyncAINodeClientPool INSTANCE = new 
SyncAINodeClientPool();
+
+    private ClientPoolHolder() {
+      // Empty constructor
+    }
+  }
+
+  public static SyncAINodeClientPool getInstance() {
+    return SyncAINodeClientPool.ClientPoolHolder.INSTANCE;
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java
index 2a1c6881b14..facc2ce4ab7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java
@@ -22,13 +22,13 @@ package org.apache.iotdb.confignode.procedure.impl.node;
 import org.apache.iotdb.common.rpc.thrift.TAINodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.client.sync.CnToAnSyncRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncAINodeClientPool;
 import 
org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.state.RemoveAINodeState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.db.protocol.client.an.AINodeClient;
-import org.apache.iotdb.db.protocol.client.an.AINodeClientManager;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -65,16 +65,13 @@ public class RemoveAINodeProcedure extends 
AbstractNodeProcedure<RemoveAINodeSta
     try {
       switch (state) {
         case NODE_STOP:
-          TSStatus resp = null;
-          try (AINodeClient client =
-              AINodeClientManager.getInstance()
-                  .borrowClient(AINodeClientManager.AINODE_ID_PLACEHOLDER)) {
-            resp = client.stopAINode();
-          } catch (Exception e) {
-            LOGGER.warn(
-                "Failed to stop AINode {}, but the remove process will 
continue.",
-                removedAINode.getInternalEndPoint());
-          }
+          TSStatus resp =
+              (TSStatus)
+                  SyncAINodeClientPool.getInstance()
+                      .sendSyncRequestToAINodeWithRetry(
+                          removedAINode.getInternalEndPoint(),
+                          null,
+                          CnToAnSyncRequestType.STOP_AI_NODE);
           if (resp != null && resp.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
             LOGGER.info("Successfully stopped AINode {}", 
removedAINode.getInternalEndPoint());
           } else {
@@ -92,7 +89,6 @@ public class RemoveAINodeProcedure extends 
AbstractNodeProcedure<RemoveAINodeSta
               env.getConfigManager()
                   .getConsensusManager()
                   .write(new RemoveAINodePlan(removedAINode));
-
           if (response.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
             throw new ProcedureException(
                 String.format(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index d09b9245e84..7ed34359b21 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.commons.client.property.ClientPoolProperty;
 import org.apache.iotdb.commons.client.property.PipeConsensusClientProperty;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import 
org.apache.iotdb.commons.client.property.ThriftClientProperty.DefaultProperty;
+import org.apache.iotdb.commons.client.sync.SyncAINodeClient;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import 
org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
@@ -393,6 +394,27 @@ public class ClientPoolFactory {
     }
   }
 
+  public static class SyncAINodeClientPoolFactory
+      implements IClientPoolFactory<TEndPoint, SyncAINodeClient> {
+
+    @Override
+    public GenericKeyedObjectPool<TEndPoint, SyncAINodeClient> 
createClientPool(
+        ClientManager<TEndPoint, SyncAINodeClient> manager) {
+      GenericKeyedObjectPool<TEndPoint, SyncAINodeClient> clientPool =
+          new GenericKeyedObjectPool<>(
+              new SyncAINodeClient.Factory(
+                  manager,
+                  new ThriftClientProperty.Builder()
+                      
.setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS())
+                      
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+                      .build()),
+              new 
ClientPoolProperty.Builder<SyncAINodeClient>().build().getConfig());
+      ClientManagerMetrics.getInstance()
+          .registerClientManager(this.getClass().getSimpleName(), clientPool);
+      return clientPool;
+    }
+  }
+
   public static class AsyncAINodeHeartbeatServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, 
AsyncAINodeInternalServiceClient> {
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncAINodeClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncAINodeClient.java
new file mode 100644
index 00000000000..054b6446099
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncAINodeClient.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client.sync;
+
+import org.apache.iotdb.ainode.rpc.thrift.IAINodeRPCService;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.net.SocketException;
+
+public class SyncAINodeClient extends IAINodeRPCService.Client
+    implements ThriftClient, AutoCloseable {
+
+  private final boolean printLogWhenEncounterException;
+  private final TEndPoint endpoint;
+  private final ClientManager<TEndPoint, SyncAINodeClient> clientManager;
+  private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
+
+  public SyncAINodeClient(
+      ThriftClientProperty property,
+      TEndPoint endpoint,
+      ClientManager<TEndPoint, SyncAINodeClient> clientManager)
+      throws TTransportException {
+    super(
+        property
+            .getProtocolFactory()
+            .getProtocol(
+                COMMON_CONFIG.isEnableInternalSSL()
+                    ? DeepCopyRpcTransportFactory.INSTANCE.getTransport(
+                        endpoint.getIp(),
+                        endpoint.getPort(),
+                        property.getConnectionTimeoutMs(),
+                        COMMON_CONFIG.getTrustStorePath(),
+                        COMMON_CONFIG.getTrustStorePwd(),
+                        COMMON_CONFIG.getKeyStorePath(),
+                        COMMON_CONFIG.getKeyStorePwd())
+                    : DeepCopyRpcTransportFactory.INSTANCE.getTransport(
+                        new TSocket(
+                            TConfigurationConst.defaultTConfiguration,
+                            endpoint.getIp(),
+                            endpoint.getPort(),
+                            property.getConnectionTimeoutMs()))));
+    this.printLogWhenEncounterException = 
property.isPrintLogWhenEncounterException();
+    this.endpoint = endpoint;
+    this.clientManager = clientManager;
+    if (!getInputProtocol().getTransport().isOpen()) {
+      getInputProtocol().getTransport().open();
+    }
+  }
+
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) 
getInputProtocol().getTransport()).getTimeOut();
+  }
+
+  public void setTimeout(int timeout) {
+    // the same transport is used in both input and output
+    ((TimeoutChangeableTransport) 
(getInputProtocol().getTransport())).setTimeout(timeout);
+  }
+
+  public TEndPoint getEndpoint() {
+    return endpoint;
+  }
+
+  public ClientManager<TEndPoint, SyncAINodeClient> getClientManager() {
+    return clientManager;
+  }
+
+  @Override
+  public void close() throws Exception {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  @Override
+  public void invalidate() {
+    getInputProtocol().getTransport().close();
+  }
+
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endpoint);
+  }
+
+  @Override
+  public boolean printLogWhenEncounterException() {
+    return printLogWhenEncounterException;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("SyncAINodeClient{%s}", endpoint);
+  }
+
+  public static class Factory extends ThriftClientFactory<TEndPoint, 
SyncAINodeClient> {
+
+    public Factory(
+        ClientManager<TEndPoint, SyncAINodeClient> clientManager,
+        ThriftClientProperty thriftClientProperty) {
+      super(clientManager, thriftClientProperty);
+    }
+
+    @Override
+    public void destroyObject(TEndPoint endpoint, 
PooledObject<SyncAINodeClient> pooledObject) {
+      pooledObject.getObject().invalidate();
+    }
+
+    @Override
+    public PooledObject<SyncAINodeClient> makeObject(TEndPoint endpoint) 
throws Exception {
+      return new DefaultPooledObject<>(
+          SyncThriftClientWithErrorHandler.newErrorHandler(
+              SyncAINodeClient.class,
+              SyncAINodeClient.class.getConstructor(
+                  thriftClientProperty.getClass(), endpoint.getClass(), 
clientManager.getClass()),
+              thriftClientProperty,
+              endpoint,
+              clientManager));
+    }
+
+    @Override
+    public boolean validateObject(TEndPoint endpoint, 
PooledObject<SyncAINodeClient> pooledObject) {
+      return 
pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+    }
+  }
+}

Reply via email to