This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4559929126e Fix: send set configuration only to target nodes and 
harden compaction schedule interruption handling (#17469)
4559929126e is described below

commit 4559929126e6504b11d7a56236bd0039379959aa
Author: shuwenwei <[email protected]>
AuthorDate: Tue Apr 14 16:09:11 2026 +0800

    Fix: send set configuration only to target nodes and harden compaction 
schedule interruption handling (#17469)
---
 .../it/db/it/IoTDBSetConfigurationTableIT.java     | 14 +++++++++++++
 .../iotdb/confignode/manager/node/NodeManager.java |  2 +-
 .../db/storageengine/dataregion/DataRegion.java    |  6 +++++-
 .../compaction/execute/utils/CompactionUtils.java  | 24 ++++++++++++++++++----
 .../schedule/CompactionScheduleTaskManager.java    |  9 ++++++++
 .../schedule/CompactionScheduleTaskWorker.java     | 21 +++++++++++++++++--
 .../compaction/schedule/TTLScheduleTask.java       | 17 +++++++++++++--
 7 files changed, 83 insertions(+), 10 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBSetConfigurationTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBSetConfigurationTableIT.java
index 2295b88f1bc..8a4bc388a59 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBSetConfigurationTableIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBSetConfigurationTableIT.java
@@ -109,6 +109,9 @@ public class IoTDBSetConfigurationTableIT {
         statement.execute(
             "set configuration 
inner_compaction_candidate_file_num='1',max_cross_compaction_candidate_file_num='1'
 on "
                 + dnId);
+        if (dnId == 5) {
+          statement.execute("set configuration 
compaction_schedule_thread_num='2' on 5");
+        }
       }
     } catch (Exception e) {
       Assert.fail(e.getMessage());
@@ -131,6 +134,17 @@ public class IoTDBSetConfigurationTableIT {
               "enable_cross_space_compaction=false",
               "inner_compaction_candidate_file_num=1",
               "max_cross_compaction_candidate_file_num=1"));
+      boolean scheduleThreadNumChanged =
+          checkConfigFileContains(
+              dnId,
+              EnvFactory.getEnv().getDataNodeWrapperList().get(i),
+              "compaction_schedule_thread_num=2");
+      if (scheduleThreadNumChanged && dnId != 5) {
+        Assert.fail();
+      }
+      if (!scheduleThreadNumChanged && dnId == 5) {
+        Assert.fail();
+      }
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index e3d775259d6..2e50c6b787b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -1052,7 +1052,7 @@ public class NodeManager {
     if (!targetDataNodes.isEmpty()) {
       DataNodeAsyncRequestContext<Object, TSStatus> clientHandler =
           new DataNodeAsyncRequestContext<>(
-              CnToDnAsyncRequestType.SET_CONFIGURATION, req, 
dataNodeLocationMap);
+              CnToDnAsyncRequestType.SET_CONFIGURATION, req, targetDataNodes);
       CnToDnInternalServiceAsyncRequestManager.getInstance()
           .sendAsyncRequestWithRetry(clientHandler);
       responseList.addAll(clientHandler.getResponseList());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index eb37cb1d5d2..c65105ae54c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -3705,7 +3705,11 @@ public class DataRegion implements IDataRegionForQuery {
       if (!regionObjectDir.isDirectory()) {
         continue;
       }
-      CompactionUtils.executeTTLCheckObjectFilesForTableModel(regionObjectDir, 
databaseName);
+      try {
+        
CompactionUtils.executeTTLCheckObjectFilesForTableModel(regionObjectDir, 
databaseName);
+      } catch (Exception e) {
+        logger.error("Failed to execute object ttl check", e);
+      }
     }
     CompactionMetrics.getInstance()
         .updateTTLCheckForObjectFileCost(System.currentTimeMillis() - 
startTime);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
index a6358539f4f..3cd21d0e0a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
@@ -69,9 +69,11 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -595,7 +597,10 @@ public class CompactionUtils {
           checkTTLAndDeleteExpiredObjectFile(currentFile, basicFileAttributes, 
lowerBoundInMS);
           return;
         }
-      } catch (IOException ignored) {
+      } catch (FileNotFoundException | NoSuchFileException ignored) {
+        // may be deleted by other thread
+      } catch (IOException e) {
+        logger.warn("Failed to read file attributes: {}", currentFile, e);
       }
     }
     File[] children = currentFile.listFiles();
@@ -606,8 +611,16 @@ public class CompactionUtils {
     // block-aligned and reflects allocated directory entry blocks.
     acquireCompactionReadRate(currentFile.length());
     for (File child : children) {
-      recursiveTTLCheckForTableDir(
-          child, depth + 1, maxObjectFileDepth, 
canDistinguishDirectoryByFileName, lowerBoundInMS);
+      try {
+        recursiveTTLCheckForTableDir(
+            child,
+            depth + 1,
+            maxObjectFileDepth,
+            canDistinguishDirectoryByFileName,
+            lowerBoundInMS);
+      } catch (Exception e) {
+        logger.warn("Failed to check table dir: {}", child, e);
+      }
     }
   }
 
@@ -637,7 +650,10 @@ public class CompactionUtils {
       FileMetrics.getInstance().decreaseObjectFileNum(1);
       FileMetrics.getInstance().decreaseObjectFileSize(attributes.size());
       logger.info("Remove object file {}, size is {}(byte)", file.getPath(), 
attributes.size());
-    } catch (Exception ignored) {
+    } catch (FileNotFoundException | NoSuchFileException ignored) {
+      // may be deleted by other thread
+    } catch (Exception e) {
+      logger.warn("Failed to delete expired object file: {}", file, e);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
index 8348b878137..516b1489a20 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
@@ -65,6 +65,7 @@ public class CompactionScheduleTaskManager implements 
IService {
       ConcurrentHashMap.newKeySet();
   private ReentrantLock lock = new ReentrantLock();
   private volatile boolean init = false;
+  private volatile boolean isStoppingAllScheduleTask = false;
 
   @Override
   public void start() throws StartupException {
@@ -76,8 +77,13 @@ public class CompactionScheduleTaskManager implements 
IService {
     logger.info("Compaction schedule task manager started.");
   }
 
+  public boolean isStoppingAllScheduleTask() {
+    return isStoppingAllScheduleTask;
+  }
+
   public void stopCompactionScheduleTasks() throws InterruptedException {
     lock.lock();
+    isStoppingAllScheduleTask = true;
     try {
       for (Future<Void> task : submitCompactionScheduleTaskFutures) {
         task.cancel(true);
@@ -121,6 +127,7 @@ public class CompactionScheduleTaskManager implements 
IService {
 
   public void startScheduleTasks() {
     lock.lock();
+    isStoppingAllScheduleTask = false;
     try {
       // compaction selector
       for (int workerId = 0; workerId < compactionSelectorNum; workerId++) {
@@ -144,6 +151,7 @@ public class CompactionScheduleTaskManager implements 
IService {
   @Override
   public void stop() {
     lock.lock();
+    isStoppingAllScheduleTask = true;
     try {
       if (!init) {
         return;
@@ -160,6 +168,7 @@ public class CompactionScheduleTaskManager implements 
IService {
   @Override
   public void waitAndStop(long milliseconds) {
     lock.lock();
+    isStoppingAllScheduleTask = true;
     try {
       if (!init) {
         return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
index 17ad0dd4334..714d232d625 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
@@ -72,9 +72,26 @@ public class CompactionScheduleTaskWorker implements 
Callable<Void> {
           dataRegion.executeCompaction();
         }
       } catch (InterruptedException ignored) {
+        boolean isStoppedByUser =
+            
CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask();
         logger.info(
-            "[CompactionScheduleTaskWorker-{}] compaction schedule is 
interrupted", workerId);
-        return null;
+            "[CompactionScheduleTaskWorker-{}] compaction schedule is 
interrupted, isStopByUser: {}",
+            workerId,
+            isStoppedByUser);
+        if (isStoppedByUser) {
+          return null;
+        }
+      } catch (Exception e) {
+        logger.error(
+            "[CompactionScheduleTaskWorker-{}] Failed to execute compaction 
schedule task",
+            workerId,
+            e);
+      } catch (Throwable t) {
+        logger.error(
+            "[CompactionScheduleTaskWorker-{}] Failed to execute compaction 
schedule task and cannot recover",
+            workerId,
+            t);
+        throw t;
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
index e15a908ac24..565f7077caf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
@@ -78,8 +78,21 @@ public class TTLScheduleTask implements Callable<Void> {
           }
         }
       } catch (InterruptedException ignored) {
-        logger.info("[TTLCheckTask-{}] TTL checker is interrupted", workerId);
-        return null;
+        boolean isStoppedByUser =
+            
CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask();
+        logger.info(
+            "[TTLCheckTask-{}] TTL checker is interrupted, isStoppedByUser: 
{}",
+            workerId,
+            isStoppedByUser);
+        if (isStoppedByUser) {
+          return null;
+        }
+      } catch (Exception e) {
+        logger.error("[TTLCheckTask-{}] Failed to execute ttl check", 
workerId, e);
+      } catch (Throwable t) {
+        logger.error(
+            "[TTLCheckTask-{}] Failed to execute ttl check and cannot 
recover", workerId, t);
+        throw t;
       }
     }
   }

Reply via email to