This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7f41f064858 Fix internal procedure deduplication (#18036)
7f41f064858 is described below
commit 7f41f064858f9376d7c6f7e443dc541918d20ac8
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 26 14:23:12 2026 +0800
Fix internal procedure deduplication (#18036)
---
.../iotdb/confignode/procedure/ProcedureExecutor.java | 1 -
.../confignode/procedure/TimeoutExecutorThread.java | 19 ++++++++++++++++---
.../confignode/procedure/TestProcedureExecutor.java | 10 ++++++++++
3 files changed, 26 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index f3652dab1d0..bdaace3d768 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -298,7 +298,6 @@ public class ProcedureExecutor<Env> {
if (interalProcedure == null) {
return;
}
- interalProcedure.setState(ProcedureState.WAITING_TIMEOUT);
timeoutExecutor.add(interalProcedure);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
index cfd7bf023b0..d8e16ca1b1f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
@@ -20,10 +20,13 @@
package org.apache.iotdb.confignode.procedure;
import org.apache.iotdb.confignode.i18n.ProcedureMessages;
+import org.apache.iotdb.confignode.procedure.state.ProcedureState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@@ -34,6 +37,7 @@ public class TimeoutExecutorThread<Env> extends
StoppableThread {
private static final int DELAY_QUEUE_TIMEOUT = 20;
private final ProcedureExecutor<Env> executor;
private final DelayQueue<ProcedureDelayContainer<Env>> queue = new
DelayQueue<>();
+ private final Set<Procedure<Env>> registeredInternalProcedures =
ConcurrentHashMap.newKeySet();
public TimeoutExecutorThread(
ProcedureExecutor<Env> envProcedureExecutor, ThreadGroup threadGroup,
String name) {
@@ -44,12 +48,21 @@ public class TimeoutExecutorThread<Env> extends
StoppableThread {
public void add(Procedure<Env> procedure) {
ProcedureDelayContainer<Env> delayTask = new
ProcedureDelayContainer<>(procedure);
+ if (procedure instanceof InternalProcedure) {
+ if (!registeredInternalProcedures.add(procedure)) {
+ return;
+ }
+ procedure.setState(ProcedureState.WAITING_TIMEOUT);
+ }
queue.remove(delayTask);
queue.add(delayTask);
}
public boolean remove(Procedure<Env> procedure) {
- return queue.remove(new ProcedureDelayContainer<>(procedure)) ||
procedure.isFinished();
+ boolean unregistered =
+ procedure instanceof InternalProcedure &&
registeredInternalProcedures.remove(procedure);
+ boolean removed = queue.remove(new ProcedureDelayContainer<>(procedure));
+ return unregistered || removed || procedure.isFinished();
}
private ProcedureDelayContainer<Env> takeQuietly() {
@@ -70,12 +83,12 @@ public class TimeoutExecutorThread<Env> extends
StoppableThread {
}
Procedure<Env> procedure = delayTask.getProcedure();
if (procedure instanceof InternalProcedure) {
- if (procedure.isFinished()) {
+ if (!registeredInternalProcedures.contains(procedure) ||
procedure.isFinished()) {
continue;
}
InternalProcedure internal = (InternalProcedure) procedure;
internal.periodicExecute(executor.getEnvironment());
- if (!procedure.isFinished()) {
+ if (!procedure.isFinished() &&
registeredInternalProcedures.contains(procedure)) {
procedure.updateTimestamp();
queue.add(delayTask);
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
index ba5f635507a..75a7168c1f6 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java
@@ -127,6 +127,16 @@ public class TestProcedureExecutor extends
TestProcedureBase {
Assert.assertFalse(internalProcedure.awaitExecution(300,
TimeUnit.MILLISECONDS));
Assert.assertEquals(1, internalProcedure.getExecutionCount());
+ procExecutor.addInternalProcedure(internalProcedure);
+ Assert.assertFalse(internalProcedure.awaitExecution(300,
TimeUnit.MILLISECONDS));
+ Assert.assertEquals(1, internalProcedure.getExecutionCount());
+
+ Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure));
+
+ procExecutor.addInternalProcedure(internalProcedure);
+ Assert.assertTrue(internalProcedure.awaitExecution(30, TimeUnit.SECONDS));
+ Assert.assertEquals(2, internalProcedure.getExecutionCount());
+
Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure));
}