This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4559929126e Fix: send set configuration only to target nodes and
harden compaction schedule interruption handling (#17469)
4559929126e is described below
commit 4559929126e6504b11d7a56236bd0039379959aa
Author: shuwenwei <[email protected]>
AuthorDate: Tue Apr 14 16:09:11 2026 +0800
Fix: send set configuration only to target nodes and harden compaction
schedule interruption handling (#17469)
---
.../it/db/it/IoTDBSetConfigurationTableIT.java | 14 +++++++++++++
.../iotdb/confignode/manager/node/NodeManager.java | 2 +-
.../db/storageengine/dataregion/DataRegion.java | 6 +++++-
.../compaction/execute/utils/CompactionUtils.java | 24 ++++++++++++++++++----
.../schedule/CompactionScheduleTaskManager.java | 9 ++++++++
.../schedule/CompactionScheduleTaskWorker.java | 21 +++++++++++++++++--
.../compaction/schedule/TTLScheduleTask.java | 17 +++++++++++++--
7 files changed, 83 insertions(+), 10 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBSetConfigurationTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBSetConfigurationTableIT.java
index 2295b88f1bc..8a4bc388a59 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBSetConfigurationTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBSetConfigurationTableIT.java
@@ -109,6 +109,9 @@ public class IoTDBSetConfigurationTableIT {
statement.execute(
"set configuration
inner_compaction_candidate_file_num='1',max_cross_compaction_candidate_file_num='1'
on "
+ dnId);
+ if (dnId == 5) {
+ statement.execute("set configuration
compaction_schedule_thread_num='2' on 5");
+ }
}
} catch (Exception e) {
Assert.fail(e.getMessage());
@@ -131,6 +134,17 @@ public class IoTDBSetConfigurationTableIT {
"enable_cross_space_compaction=false",
"inner_compaction_candidate_file_num=1",
"max_cross_compaction_candidate_file_num=1"));
+ boolean scheduleThreadNumChanged =
+ checkConfigFileContains(
+ dnId,
+ EnvFactory.getEnv().getDataNodeWrapperList().get(i),
+ "compaction_schedule_thread_num=2");
+ if (scheduleThreadNumChanged && dnId != 5) {
+ Assert.fail();
+ }
+ if (!scheduleThreadNumChanged && dnId == 5) {
+ Assert.fail();
+ }
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index e3d775259d6..2e50c6b787b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -1052,7 +1052,7 @@ public class NodeManager {
if (!targetDataNodes.isEmpty()) {
DataNodeAsyncRequestContext<Object, TSStatus> clientHandler =
new DataNodeAsyncRequestContext<>(
- CnToDnAsyncRequestType.SET_CONFIGURATION, req,
dataNodeLocationMap);
+ CnToDnAsyncRequestType.SET_CONFIGURATION, req, targetDataNodes);
CnToDnInternalServiceAsyncRequestManager.getInstance()
.sendAsyncRequestWithRetry(clientHandler);
responseList.addAll(clientHandler.getResponseList());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index eb37cb1d5d2..c65105ae54c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -3705,7 +3705,11 @@ public class DataRegion implements IDataRegionForQuery {
if (!regionObjectDir.isDirectory()) {
continue;
}
- CompactionUtils.executeTTLCheckObjectFilesForTableModel(regionObjectDir,
databaseName);
+ try {
+
CompactionUtils.executeTTLCheckObjectFilesForTableModel(regionObjectDir,
databaseName);
+ } catch (Exception e) {
+ logger.error("Failed to execute object ttl check", e);
+ }
}
CompactionMetrics.getInstance()
.updateTTLCheckForObjectFileCost(System.currentTimeMillis() -
startTime);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
index a6358539f4f..3cd21d0e0a7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
@@ -69,9 +69,11 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Collection;
@@ -595,7 +597,10 @@ public class CompactionUtils {
checkTTLAndDeleteExpiredObjectFile(currentFile, basicFileAttributes,
lowerBoundInMS);
return;
}
- } catch (IOException ignored) {
+ } catch (FileNotFoundException | NoSuchFileException ignored) {
+ // may be deleted by other thread
+ } catch (IOException e) {
+ logger.warn("Failed to read file attributes: {}", currentFile, e);
}
}
File[] children = currentFile.listFiles();
@@ -606,8 +611,16 @@ public class CompactionUtils {
// block-aligned and reflects allocated directory entry blocks.
acquireCompactionReadRate(currentFile.length());
for (File child : children) {
- recursiveTTLCheckForTableDir(
- child, depth + 1, maxObjectFileDepth,
canDistinguishDirectoryByFileName, lowerBoundInMS);
+ try {
+ recursiveTTLCheckForTableDir(
+ child,
+ depth + 1,
+ maxObjectFileDepth,
+ canDistinguishDirectoryByFileName,
+ lowerBoundInMS);
+ } catch (Exception e) {
+ logger.warn("Failed to check table dir: {}", child, e);
+ }
}
}
@@ -637,7 +650,10 @@ public class CompactionUtils {
FileMetrics.getInstance().decreaseObjectFileNum(1);
FileMetrics.getInstance().decreaseObjectFileSize(attributes.size());
logger.info("Remove object file {}, size is {}(byte)", file.getPath(),
attributes.size());
- } catch (Exception ignored) {
+ } catch (FileNotFoundException | NoSuchFileException ignored) {
+ // may be deleted by other thread
+ } catch (Exception e) {
+ logger.warn("Failed to delete expired object file: {}", file, e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
index 8348b878137..516b1489a20 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
@@ -65,6 +65,7 @@ public class CompactionScheduleTaskManager implements
IService {
ConcurrentHashMap.newKeySet();
private ReentrantLock lock = new ReentrantLock();
private volatile boolean init = false;
+ private volatile boolean isStoppingAllScheduleTask = false;
@Override
public void start() throws StartupException {
@@ -76,8 +77,13 @@ public class CompactionScheduleTaskManager implements
IService {
logger.info("Compaction schedule task manager started.");
}
+ public boolean isStoppingAllScheduleTask() {
+ return isStoppingAllScheduleTask;
+ }
+
public void stopCompactionScheduleTasks() throws InterruptedException {
lock.lock();
+ isStoppingAllScheduleTask = true;
try {
for (Future<Void> task : submitCompactionScheduleTaskFutures) {
task.cancel(true);
@@ -121,6 +127,7 @@ public class CompactionScheduleTaskManager implements
IService {
public void startScheduleTasks() {
lock.lock();
+ isStoppingAllScheduleTask = false;
try {
// compaction selector
for (int workerId = 0; workerId < compactionSelectorNum; workerId++) {
@@ -144,6 +151,7 @@ public class CompactionScheduleTaskManager implements
IService {
@Override
public void stop() {
lock.lock();
+ isStoppingAllScheduleTask = true;
try {
if (!init) {
return;
@@ -160,6 +168,7 @@ public class CompactionScheduleTaskManager implements
IService {
@Override
public void waitAndStop(long milliseconds) {
lock.lock();
+ isStoppingAllScheduleTask = true;
try {
if (!init) {
return;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
index 17ad0dd4334..714d232d625 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
@@ -72,9 +72,26 @@ public class CompactionScheduleTaskWorker implements
Callable<Void> {
dataRegion.executeCompaction();
}
} catch (InterruptedException ignored) {
+ boolean isStoppedByUser =
+
CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask();
logger.info(
- "[CompactionScheduleTaskWorker-{}] compaction schedule is
interrupted", workerId);
- return null;
+ "[CompactionScheduleTaskWorker-{}] compaction schedule is
interrupted, isStopByUser: {}",
+ workerId,
+ isStoppedByUser);
+ if (isStoppedByUser) {
+ return null;
+ }
+ } catch (Exception e) {
+ logger.error(
+ "[CompactionScheduleTaskWorker-{}] Failed to execute compaction
schedule task",
+ workerId,
+ e);
+ } catch (Throwable t) {
+ logger.error(
+ "[CompactionScheduleTaskWorker-{}] Failed to execute compaction
schedule task and cannot recover",
+ workerId,
+ t);
+ throw t;
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
index e15a908ac24..565f7077caf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
@@ -78,8 +78,21 @@ public class TTLScheduleTask implements Callable<Void> {
}
}
} catch (InterruptedException ignored) {
- logger.info("[TTLCheckTask-{}] TTL checker is interrupted", workerId);
- return null;
+ boolean isStoppedByUser =
+
CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask();
+ logger.info(
+ "[TTLCheckTask-{}] TTL checker is interrupted, isStoppedByUser:
{}",
+ workerId,
+ isStoppedByUser);
+ if (isStoppedByUser) {
+ return null;
+ }
+ } catch (Exception e) {
+ logger.error("[TTLCheckTask-{}] Failed to execute ttl check",
workerId, e);
+ } catch (Throwable t) {
+ logger.error(
+ "[TTLCheckTask-{}] Failed to execute ttl check and cannot
recover", workerId, t);
+ throw t;
}
}
}