This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 57c012620c9 [to dev/1.3] fix AbstractEnv.ensureNodeStatus & Fix: send 
set configuration only to target nodes and harden compaction schedule 
interruption handling (#17447)
57c012620c9 is described below

commit 57c012620c969b36231a95ee1fe6d090f81cf31d
Author: shuwenwei <[email protected]>
AuthorDate: Wed Apr 22 17:17:08 2026 +0800

    [to dev/1.3] fix AbstractEnv.ensureNodeStatus & Fix: send set configuration 
only to target nodes and harden compaction schedule interruption handling 
(#17447)
---
 .../iotdb/it/env/cluster/env/AbstractEnv.java      | 28 ++++++++++++++++++----
 .../iotdb/confignode/manager/node/NodeManager.java |  2 +-
 .../schedule/CompactionScheduleTaskManager.java    |  9 +++++++
 .../schedule/CompactionScheduleTaskWorker.java     | 21 ++++++++++++++--
 .../compaction/schedule/TTLScheduleTask.java       | 17 +++++++++++--
 5 files changed, 68 insertions(+), 9 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 60ff77aa03c..07b7364d004 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -56,7 +56,10 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.pool.SessionPool;
 
+import org.apache.thrift.TConfiguration;
 import org.apache.thrift.TException;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 
 import java.io.File;
@@ -1119,7 +1122,7 @@ public abstract class AbstractEnv implements BaseEnv {
 
   @Override
   public void ensureNodeStatus(
-      final List<BaseNodeWrapper> nodes, final List<NodeStatus> targetStatus)
+      final List<BaseNodeWrapper> nodes, final List<NodeStatus> 
targetStatusList)
       throws IllegalStateException {
     Throwable lastException = null;
     for (int i = 0; i < retryCount; i++) {
@@ -1147,7 +1150,9 @@ public abstract class AbstractEnv implements BaseEnv {
                             + node.getClientRpcEndPoint().getPort(),
                         node.getDataNodeId()));
         for (int j = 0; j < nodes.size(); j++) {
-          final String endpoint = nodes.get(j).getIpAndPortString();
+          BaseNodeWrapper nodeWrapper = nodes.get(j);
+          String ipAndPortString = nodeWrapper.getIpAndPortString();
+          final String endpoint = ipAndPortString;
           if (!nodeIds.containsKey(endpoint)) {
             // Node not exist
             // Notice: Never modify this line, since the NodeLocation might be 
modified in IT
@@ -1155,12 +1160,27 @@ public abstract class AbstractEnv implements BaseEnv {
             continue;
           }
           final String status = 
showClusterResp.getNodeStatus().get(nodeIds.get(endpoint));
-          if (!targetStatus.get(j).getStatus().equals(status)) {
+          final NodeStatus targetStatus = targetStatusList.get(j);
+          if (!targetStatus.getStatus().equals(status)) {
             // Error status
             errorMessages.add(
                 String.format(
                     "Node %s is in status %s, but expected %s",
-                    endpoint, status, targetStatus.get(j)));
+                    endpoint, status, targetStatusList.get(j)));
+            continue;
+          }
+          if (nodeWrapper instanceof DataNodeWrapper && 
targetStatus.equals(NodeStatus.Running)) {
+            final String[] ipPort = 
nodeWrapper.getIpAndPortString().split(":");
+            final String ip = ipPort[0];
+            final int port = Integer.parseInt(ipPort[1]);
+            try (TSocket socket = new TSocket(new TConfiguration(), ip, port, 
1000)) {
+              socket.open();
+            } catch (final TTransportException e) {
+              errorMessages.add(
+                  String.format(
+                      "DataNode %s is not reachable: %s",
+                      nodeWrapper.getIpAndPortString(), e.getMessage()));
+            }
           }
         }
         if (errorMessages.isEmpty()) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 2e0ac35d5df..81cad2f3bef 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -930,7 +930,7 @@ public class NodeManager {
     if (!targetDataNodes.isEmpty()) {
       DataNodeAsyncRequestContext<Object, TSStatus> clientHandler =
           new DataNodeAsyncRequestContext<>(
-              CnToDnAsyncRequestType.SET_CONFIGURATION, req, 
dataNodeLocationMap);
+              CnToDnAsyncRequestType.SET_CONFIGURATION, req, targetDataNodes);
       CnToDnInternalServiceAsyncRequestManager.getInstance()
           .sendAsyncRequestWithRetry(clientHandler);
       responseList.addAll(clientHandler.getResponseList());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
index 8348b878137..516b1489a20 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
@@ -65,6 +65,7 @@ public class CompactionScheduleTaskManager implements 
IService {
       ConcurrentHashMap.newKeySet();
   private ReentrantLock lock = new ReentrantLock();
   private volatile boolean init = false;
+  private volatile boolean isStoppingAllScheduleTask = false;
 
   @Override
   public void start() throws StartupException {
@@ -76,8 +77,13 @@ public class CompactionScheduleTaskManager implements 
IService {
     logger.info("Compaction schedule task manager started.");
   }
 
+  public boolean isStoppingAllScheduleTask() {
+    return isStoppingAllScheduleTask;
+  }
+
   public void stopCompactionScheduleTasks() throws InterruptedException {
     lock.lock();
+    isStoppingAllScheduleTask = true;
     try {
       for (Future<Void> task : submitCompactionScheduleTaskFutures) {
         task.cancel(true);
@@ -121,6 +127,7 @@ public class CompactionScheduleTaskManager implements 
IService {
 
   public void startScheduleTasks() {
     lock.lock();
+    isStoppingAllScheduleTask = false;
     try {
       // compaction selector
       for (int workerId = 0; workerId < compactionSelectorNum; workerId++) {
@@ -144,6 +151,7 @@ public class CompactionScheduleTaskManager implements 
IService {
   @Override
   public void stop() {
     lock.lock();
+    isStoppingAllScheduleTask = true;
     try {
       if (!init) {
         return;
@@ -160,6 +168,7 @@ public class CompactionScheduleTaskManager implements 
IService {
   @Override
   public void waitAndStop(long milliseconds) {
     lock.lock();
+    isStoppingAllScheduleTask = true;
     try {
       if (!init) {
         return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
index 17ad0dd4334..714d232d625 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
@@ -72,9 +72,26 @@ public class CompactionScheduleTaskWorker implements 
Callable<Void> {
           dataRegion.executeCompaction();
         }
       } catch (InterruptedException ignored) {
+        boolean isStoppedByUser =
+            
CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask();
         logger.info(
-            "[CompactionScheduleTaskWorker-{}] compaction schedule is 
interrupted", workerId);
-        return null;
+            "[CompactionScheduleTaskWorker-{}] compaction schedule is 
interrupted, isStopByUser: {}",
+            workerId,
+            isStoppedByUser);
+        if (isStoppedByUser) {
+          return null;
+        }
+      } catch (Exception e) {
+        logger.error(
+            "[CompactionScheduleTaskWorker-{}] Failed to execute compaction 
schedule task",
+            workerId,
+            e);
+      } catch (Throwable t) {
+        logger.error(
+            "[CompactionScheduleTaskWorker-{}] Failed to execute compaction 
schedule task and cannot recover",
+            workerId,
+            t);
+        throw t;
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
index 393a9f6d2dc..c8cba2b5292 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
@@ -66,8 +66,21 @@ public class TTLScheduleTask implements Callable<Void> {
           }
         }
       } catch (InterruptedException ignored) {
-        logger.info("[TTLCheckTask-{}] TTL checker is interrupted", workerId);
-        return null;
+        boolean isStoppedByUser =
+            
CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask();
+        logger.info(
+            "[TTLCheckTask-{}] TTL checker is interrupted, isStoppedByUser: 
{}",
+            workerId,
+            isStoppedByUser);
+        if (isStoppedByUser) {
+          return null;
+        }
+      } catch (Exception e) {
+        logger.error("[TTLCheckTask-{}] Failed to execute ttl check", 
workerId, e);
+      } catch (Throwable t) {
+        logger.error(
+            "[TTLCheckTask-{}] Failed to execute ttl check and cannot 
recover", workerId, t);
+        throw t;
       }
     }
   }

Reply via email to