This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 94bf8a2754f Fix internal procedure deduplication (#18036) (#18043)
94bf8a2754f is described below

commit 94bf8a2754fcd96e4b0b9ab68dc76a2e1bc4aa9a
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 30 10:17:08 2026 +0800

    Fix internal procedure deduplication (#18036) (#18043)
    
    (cherry picked from commit 7f41f064858f9376d7c6f7e443dc541918d20ac8)
---
 .../confignode/procedure/ProcedureExecutor.java      |  1 -
 .../confignode/procedure/TimeoutExecutorThread.java  | 20 +++++++++++++++++---
 .../confignode/procedure/TestProcedureExecutor.java  | 10 ++++++++++
 3 files changed, 27 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 1633f78eec9..f79ff888c38 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -289,7 +289,6 @@ public class ProcedureExecutor<Env> {
     if (interalProcedure == null) {
       return;
     }
-    interalProcedure.setState(ProcedureState.WAITING_TIMEOUT);
     timeoutExecutor.add(interalProcedure);
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
index c998ad903c2..8ade48f4001 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
@@ -19,6 +19,10 @@
 
 package org.apache.iotdb.confignode.procedure;
 
+import org.apache.iotdb.confignode.procedure.state.ProcedureState;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
@@ -28,6 +32,7 @@ public class TimeoutExecutorThread<Env> extends 
StoppableThread {
   private static final int DELAY_QUEUE_TIMEOUT = 20;
   private final ProcedureExecutor<Env> executor;
   private final DelayQueue<ProcedureDelayContainer<Env>> queue = new 
DelayQueue<>();
+  private final Set<Procedure<Env>> registeredInternalProcedures = 
ConcurrentHashMap.newKeySet();
 
   public TimeoutExecutorThread(
       ProcedureExecutor<Env> envProcedureExecutor, ThreadGroup threadGroup, 
String name) {
@@ -38,12 +43,21 @@ public class TimeoutExecutorThread<Env> extends 
StoppableThread {
 
   public void add(Procedure<Env> procedure) {
     ProcedureDelayContainer<Env> delayTask = new 
ProcedureDelayContainer<>(procedure);
+    if (procedure instanceof InternalProcedure) {
+      if (!registeredInternalProcedures.add(procedure)) {
+        return;
+      }
+      procedure.setState(ProcedureState.WAITING_TIMEOUT);
+    }
     queue.remove(delayTask);
     queue.add(delayTask);
   }
 
   public boolean remove(Procedure<Env> procedure) {
-    return queue.remove(new ProcedureDelayContainer<>(procedure)) || 
procedure.isFinished();
+    boolean unregistered =
+        procedure instanceof InternalProcedure && 
registeredInternalProcedures.remove(procedure);
+    boolean removed = queue.remove(new ProcedureDelayContainer<>(procedure));
+    return unregistered || removed || procedure.isFinished();
   }
 
   private ProcedureDelayContainer<Env> takeQuietly() {
@@ -64,12 +78,12 @@ public class TimeoutExecutorThread<Env> extends 
StoppableThread {
       }
       Procedure<Env> procedure = delayTask.getProcedure();
       if (procedure instanceof InternalProcedure) {
-        if (procedure.isFinished()) {
+        if (!registeredInternalProcedures.contains(procedure) || 
procedure.isFinished()) {
           continue;
         }
         InternalProcedure internal = (InternalProcedure) procedure;
         internal.periodicExecute(executor.getEnvironment());
-        if (!procedure.isFinished()) {
+        if (!procedure.isFinished() && 
registeredInternalProcedures.contains(procedure)) {
           procedure.updateTimestamp();
           queue.add(delayTask);
         }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
index ba5f635507a..75a7168c1f6 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
@@ -127,6 +127,16 @@ public class TestProcedureExecutor extends 
TestProcedureBase {
     Assert.assertFalse(internalProcedure.awaitExecution(300, 
TimeUnit.MILLISECONDS));
     Assert.assertEquals(1, internalProcedure.getExecutionCount());
 
+    procExecutor.addInternalProcedure(internalProcedure);
+    Assert.assertFalse(internalProcedure.awaitExecution(300, 
TimeUnit.MILLISECONDS));
+    Assert.assertEquals(1, internalProcedure.getExecutionCount());
+
+    Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure));
+
+    procExecutor.addInternalProcedure(internalProcedure);
+    Assert.assertTrue(internalProcedure.awaitExecution(30, TimeUnit.SECONDS));
+    Assert.assertEquals(2, internalProcedure.getExecutionCount());
+
     Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure));
   }
 

Reply via email to