This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f41f064858 Fix internal procedure deduplication (#18036)
7f41f064858 is described below

commit 7f41f064858f9376d7c6f7e443dc541918d20ac8
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 26 14:23:12 2026 +0800

    Fix internal procedure deduplication (#18036)
---
 .../iotdb/confignode/procedure/ProcedureExecutor.java |  1 -
 .../confignode/procedure/TimeoutExecutorThread.java   | 19 ++++++++++++++++---
 .../confignode/procedure/TestProcedureExecutor.java   | 10 ++++++++++
 3 files changed, 26 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index f3652dab1d0..bdaace3d768 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -298,7 +298,6 @@ public class ProcedureExecutor<Env> {
     if (interalProcedure == null) {
       return;
     }
-    interalProcedure.setState(ProcedureState.WAITING_TIMEOUT);
     timeoutExecutor.add(interalProcedure);
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
index cfd7bf023b0..d8e16ca1b1f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
@@ -20,10 +20,13 @@
 package org.apache.iotdb.confignode.procedure;
 
 import org.apache.iotdb.confignode.i18n.ProcedureMessages;
+import org.apache.iotdb.confignode.procedure.state.ProcedureState;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
@@ -34,6 +37,7 @@ public class TimeoutExecutorThread<Env> extends 
StoppableThread {
   private static final int DELAY_QUEUE_TIMEOUT = 20;
   private final ProcedureExecutor<Env> executor;
   private final DelayQueue<ProcedureDelayContainer<Env>> queue = new 
DelayQueue<>();
+  private final Set<Procedure<Env>> registeredInternalProcedures = 
ConcurrentHashMap.newKeySet();
 
   public TimeoutExecutorThread(
       ProcedureExecutor<Env> envProcedureExecutor, ThreadGroup threadGroup, 
String name) {
@@ -44,12 +48,21 @@ public class TimeoutExecutorThread<Env> extends 
StoppableThread {
 
   public void add(Procedure<Env> procedure) {
     ProcedureDelayContainer<Env> delayTask = new 
ProcedureDelayContainer<>(procedure);
+    if (procedure instanceof InternalProcedure) {
+      if (!registeredInternalProcedures.add(procedure)) {
+        return;
+      }
+      procedure.setState(ProcedureState.WAITING_TIMEOUT);
+    }
     queue.remove(delayTask);
     queue.add(delayTask);
   }
 
   public boolean remove(Procedure<Env> procedure) {
-    return queue.remove(new ProcedureDelayContainer<>(procedure)) || 
procedure.isFinished();
+    boolean unregistered =
+        procedure instanceof InternalProcedure && 
registeredInternalProcedures.remove(procedure);
+    boolean removed = queue.remove(new ProcedureDelayContainer<>(procedure));
+    return unregistered || removed || procedure.isFinished();
   }
 
   private ProcedureDelayContainer<Env> takeQuietly() {
@@ -70,12 +83,12 @@ public class TimeoutExecutorThread<Env> extends 
StoppableThread {
       }
       Procedure<Env> procedure = delayTask.getProcedure();
       if (procedure instanceof InternalProcedure) {
-        if (procedure.isFinished()) {
+        if (!registeredInternalProcedures.contains(procedure) || 
procedure.isFinished()) {
           continue;
         }
         InternalProcedure internal = (InternalProcedure) procedure;
         internal.periodicExecute(executor.getEnvironment());
-        if (!procedure.isFinished()) {
+        if (!procedure.isFinished() && 
registeredInternalProcedures.contains(procedure)) {
           procedure.updateTimestamp();
           queue.add(delayTask);
         }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
index ba5f635507a..75a7168c1f6 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
@@ -127,6 +127,16 @@ public class TestProcedureExecutor extends 
TestProcedureBase {
     Assert.assertFalse(internalProcedure.awaitExecution(300, 
TimeUnit.MILLISECONDS));
     Assert.assertEquals(1, internalProcedure.getExecutionCount());
 
+    procExecutor.addInternalProcedure(internalProcedure);
+    Assert.assertFalse(internalProcedure.awaitExecution(300, 
TimeUnit.MILLISECONDS));
+    Assert.assertEquals(1, internalProcedure.getExecutionCount());
+
+    Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure));
+
+    procExecutor.addInternalProcedure(internalProcedure);
+    Assert.assertTrue(internalProcedure.awaitExecution(30, TimeUnit.SECONDS));
+    Assert.assertEquals(2, internalProcedure.getExecutionCount());
+
     Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure));
   }
 

Reply via email to