This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 57c012620c9 [to dev/1.3] fix AbstractEnv.ensureNodeStatus & Fix: send
set configuration only to target nodes and harden compaction schedule
interruption handling (#17447)
57c012620c9 is described below
commit 57c012620c969b36231a95ee1fe6d090f81cf31d
Author: shuwenwei <[email protected]>
AuthorDate: Wed Apr 22 17:17:08 2026 +0800
[to dev/1.3] fix AbstractEnv.ensureNodeStatus & Fix: send set configuration
only to target nodes and harden compaction schedule interruption handling
(#17447)
---
.../iotdb/it/env/cluster/env/AbstractEnv.java | 28 ++++++++++++++++++----
.../iotdb/confignode/manager/node/NodeManager.java | 2 +-
.../schedule/CompactionScheduleTaskManager.java | 9 +++++++
.../schedule/CompactionScheduleTaskWorker.java | 21 ++++++++++++++--
.../compaction/schedule/TTLScheduleTask.java | 17 +++++++++++--
5 files changed, 68 insertions(+), 9 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 60ff77aa03c..07b7364d004 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -56,7 +56,10 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import java.io.File;
@@ -1119,7 +1122,7 @@ public abstract class AbstractEnv implements BaseEnv {
@Override
public void ensureNodeStatus(
- final List<BaseNodeWrapper> nodes, final List<NodeStatus> targetStatus)
+ final List<BaseNodeWrapper> nodes, final List<NodeStatus>
targetStatusList)
throws IllegalStateException {
Throwable lastException = null;
for (int i = 0; i < retryCount; i++) {
@@ -1147,7 +1150,9 @@ public abstract class AbstractEnv implements BaseEnv {
+ node.getClientRpcEndPoint().getPort(),
node.getDataNodeId()));
for (int j = 0; j < nodes.size(); j++) {
- final String endpoint = nodes.get(j).getIpAndPortString();
+ BaseNodeWrapper nodeWrapper = nodes.get(j);
+ String ipAndPortString = nodeWrapper.getIpAndPortString();
+ final String endpoint = ipAndPortString;
if (!nodeIds.containsKey(endpoint)) {
// Node not exist
// Notice: Never modify this line, since the NodeLocation might be
modified in IT
@@ -1155,12 +1160,27 @@ public abstract class AbstractEnv implements BaseEnv {
continue;
}
final String status =
showClusterResp.getNodeStatus().get(nodeIds.get(endpoint));
- if (!targetStatus.get(j).getStatus().equals(status)) {
+ final NodeStatus targetStatus = targetStatusList.get(j);
+ if (!targetStatus.getStatus().equals(status)) {
// Error status
errorMessages.add(
String.format(
"Node %s is in status %s, but expected %s",
- endpoint, status, targetStatus.get(j)));
+ endpoint, status, targetStatusList.get(j)));
+ continue;
+ }
+ if (nodeWrapper instanceof DataNodeWrapper &&
targetStatus.equals(NodeStatus.Running)) {
+ final String[] ipPort =
nodeWrapper.getIpAndPortString().split(":");
+ final String ip = ipPort[0];
+ final int port = Integer.parseInt(ipPort[1]);
+ try (TSocket socket = new TSocket(new TConfiguration(), ip, port,
1000)) {
+ socket.open();
+ } catch (final TTransportException e) {
+ errorMessages.add(
+ String.format(
+ "DataNode %s is not reachable: %s",
+ nodeWrapper.getIpAndPortString(), e.getMessage()));
+ }
}
}
if (errorMessages.isEmpty()) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 2e0ac35d5df..81cad2f3bef 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -930,7 +930,7 @@ public class NodeManager {
if (!targetDataNodes.isEmpty()) {
DataNodeAsyncRequestContext<Object, TSStatus> clientHandler =
new DataNodeAsyncRequestContext<>(
- CnToDnAsyncRequestType.SET_CONFIGURATION, req,
dataNodeLocationMap);
+ CnToDnAsyncRequestType.SET_CONFIGURATION, req, targetDataNodes);
CnToDnInternalServiceAsyncRequestManager.getInstance()
.sendAsyncRequestWithRetry(clientHandler);
responseList.addAll(clientHandler.getResponseList());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
index 8348b878137..516b1489a20 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
@@ -65,6 +65,7 @@ public class CompactionScheduleTaskManager implements
IService {
ConcurrentHashMap.newKeySet();
private ReentrantLock lock = new ReentrantLock();
private volatile boolean init = false;
+ private volatile boolean isStoppingAllScheduleTask = false;
@Override
public void start() throws StartupException {
@@ -76,8 +77,13 @@ public class CompactionScheduleTaskManager implements
IService {
logger.info("Compaction schedule task manager started.");
}
+ public boolean isStoppingAllScheduleTask() {
+ return isStoppingAllScheduleTask;
+ }
+
public void stopCompactionScheduleTasks() throws InterruptedException {
lock.lock();
+ isStoppingAllScheduleTask = true;
try {
for (Future<Void> task : submitCompactionScheduleTaskFutures) {
task.cancel(true);
@@ -121,6 +127,7 @@ public class CompactionScheduleTaskManager implements
IService {
public void startScheduleTasks() {
lock.lock();
+ isStoppingAllScheduleTask = false;
try {
// compaction selector
for (int workerId = 0; workerId < compactionSelectorNum; workerId++) {
@@ -144,6 +151,7 @@ public class CompactionScheduleTaskManager implements
IService {
@Override
public void stop() {
lock.lock();
+ isStoppingAllScheduleTask = true;
try {
if (!init) {
return;
@@ -160,6 +168,7 @@ public class CompactionScheduleTaskManager implements
IService {
@Override
public void waitAndStop(long milliseconds) {
lock.lock();
+ isStoppingAllScheduleTask = true;
try {
if (!init) {
return;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
index 17ad0dd4334..714d232d625 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
@@ -72,9 +72,26 @@ public class CompactionScheduleTaskWorker implements
Callable<Void> {
dataRegion.executeCompaction();
}
} catch (InterruptedException ignored) {
+ boolean isStoppedByUser =
+
CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask();
logger.info(
- "[CompactionScheduleTaskWorker-{}] compaction schedule is
interrupted", workerId);
- return null;
+ "[CompactionScheduleTaskWorker-{}] compaction schedule is
interrupted, isStopByUser: {}",
+ workerId,
+ isStoppedByUser);
+ if (isStoppedByUser) {
+ return null;
+ }
+ } catch (Exception e) {
+ logger.error(
+ "[CompactionScheduleTaskWorker-{}] Failed to execute compaction
schedule task",
+ workerId,
+ e);
+ } catch (Throwable t) {
+ logger.error(
+ "[CompactionScheduleTaskWorker-{}] Failed to execute compaction
schedule task and cannot recover",
+ workerId,
+ t);
+ throw t;
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
index 393a9f6d2dc..c8cba2b5292 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
@@ -66,8 +66,21 @@ public class TTLScheduleTask implements Callable<Void> {
}
}
} catch (InterruptedException ignored) {
- logger.info("[TTLCheckTask-{}] TTL checker is interrupted", workerId);
- return null;
+ boolean isStoppedByUser =
+
CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask();
+ logger.info(
+ "[TTLCheckTask-{}] TTL checker is interrupted, isStoppedByUser:
{}",
+ workerId,
+ isStoppedByUser);
+ if (isStoppedByUser) {
+ return null;
+ }
+ } catch (Exception e) {
+ logger.error("[TTLCheckTask-{}] Failed to execute ttl check",
workerId, e);
+ } catch (Throwable t) {
+ logger.error(
+ "[TTLCheckTask-{}] Failed to execute ttl check and cannot
recover", workerId, t);
+ throw t;
}
}
}