This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 94bf8a2754f Fix internal procedure deduplication (#18036) (#18043)
94bf8a2754f is described below
commit 94bf8a2754fcd96e4b0b9ab68dc76a2e1bc4aa9a
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 30 10:17:08 2026 +0800
Fix internal procedure deduplication (#18036) (#18043)
(cherry picked from commit 7f41f064858f9376d7c6f7e443dc541918d20ac8)
---
.../confignode/procedure/ProcedureExecutor.java | 1 -
.../confignode/procedure/TimeoutExecutorThread.java | 20 +++++++++++++++++---
.../confignode/procedure/TestProcedureExecutor.java | 10 ++++++++++
3 files changed, 27 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 1633f78eec9..f79ff888c38 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -289,7 +289,6 @@ public class ProcedureExecutor<Env> {
if (interalProcedure == null) {
return;
}
- interalProcedure.setState(ProcedureState.WAITING_TIMEOUT);
timeoutExecutor.add(interalProcedure);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
index c998ad903c2..8ade48f4001 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
@@ -19,6 +19,10 @@
package org.apache.iotdb.confignode.procedure;
+import org.apache.iotdb.confignode.procedure.state.ProcedureState;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@@ -28,6 +32,7 @@ public class TimeoutExecutorThread<Env> extends
StoppableThread {
private static final int DELAY_QUEUE_TIMEOUT = 20;
private final ProcedureExecutor<Env> executor;
private final DelayQueue<ProcedureDelayContainer<Env>> queue = new
DelayQueue<>();
+ private final Set<Procedure<Env>> registeredInternalProcedures =
ConcurrentHashMap.newKeySet();
public TimeoutExecutorThread(
ProcedureExecutor<Env> envProcedureExecutor, ThreadGroup threadGroup,
String name) {
@@ -38,12 +43,21 @@ public class TimeoutExecutorThread<Env> extends
StoppableThread {
public void add(Procedure<Env> procedure) {
ProcedureDelayContainer<Env> delayTask = new
ProcedureDelayContainer<>(procedure);
+ if (procedure instanceof InternalProcedure) {
+ if (!registeredInternalProcedures.add(procedure)) {
+ return;
+ }
+ procedure.setState(ProcedureState.WAITING_TIMEOUT);
+ }
queue.remove(delayTask);
queue.add(delayTask);
}
public boolean remove(Procedure<Env> procedure) {
- return queue.remove(new ProcedureDelayContainer<>(procedure)) ||
procedure.isFinished();
+ boolean unregistered =
+ procedure instanceof InternalProcedure &&
registeredInternalProcedures.remove(procedure);
+ boolean removed = queue.remove(new ProcedureDelayContainer<>(procedure));
+ return unregistered || removed || procedure.isFinished();
}
private ProcedureDelayContainer<Env> takeQuietly() {
@@ -64,12 +78,12 @@ public class TimeoutExecutorThread<Env> extends
StoppableThread {
}
Procedure<Env> procedure = delayTask.getProcedure();
if (procedure instanceof InternalProcedure) {
- if (procedure.isFinished()) {
+ if (!registeredInternalProcedures.contains(procedure) ||
procedure.isFinished()) {
continue;
}
InternalProcedure internal = (InternalProcedure) procedure;
internal.periodicExecute(executor.getEnvironment());
- if (!procedure.isFinished()) {
+ if (!procedure.isFinished() &&
registeredInternalProcedures.contains(procedure)) {
procedure.updateTimestamp();
queue.add(delayTask);
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
index ba5f635507a..75a7168c1f6 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
@@ -127,6 +127,16 @@ public class TestProcedureExecutor extends
TestProcedureBase {
Assert.assertFalse(internalProcedure.awaitExecution(300,
TimeUnit.MILLISECONDS));
Assert.assertEquals(1, internalProcedure.getExecutionCount());
+ procExecutor.addInternalProcedure(internalProcedure);
+ Assert.assertFalse(internalProcedure.awaitExecution(300,
TimeUnit.MILLISECONDS));
+ Assert.assertEquals(1, internalProcedure.getExecutionCount());
+
+ Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure));
+
+ procExecutor.addInternalProcedure(internalProcedure);
+ Assert.assertTrue(internalProcedure.awaitExecution(30, TimeUnit.SECONDS));
+ Assert.assertEquals(2, internalProcedure.getExecutionCount());
+
Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure));
}