This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b422e9a5c59 Fix CN pipe procedures restore dead lock (#16324)
b422e9a5c59 is described below
commit b422e9a5c59391a2aa17234c3cf5a230bbce9830
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed Sep 10 03:19:00 2025 -0500
Fix CN pipe procedures restore dead lock (#16324)
* fix pipe
* fix
* rename
* fix
* add comment
---
.../pipe/coordinator/runtime/PipeMetaSyncer.java | 4 +--
.../runtime/heartbeat/PipeHeartbeatParser.java | 2 +-
.../pipe/coordinator/task/PipeTaskCoordinator.java | 14 ++++----
.../coordinator/task/PipeTaskCoordinatorLock.java | 13 +++++---
.../subscription/SubscriptionCoordinator.java | 2 +-
.../confignode/procedure/ProcedureExecutor.java | 38 +++++++++++++++++++++-
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 19 ++++++++---
.../impl/pipe/plugin/DropPipePluginProcedure.java | 2 +-
.../runtime/PipeHandleMetaChangeProcedure.java | 14 +++++---
.../impl/pipe/runtime/PipeMetaSyncProcedure.java | 14 +++++---
...bstractOperateSubscriptionAndPipeProcedure.java | 7 +++-
11 files changed, 96 insertions(+), 33 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
index 1df5ac021ca..aaafcda346d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
@@ -146,7 +146,7 @@ public class PipeMetaSyncer {
private boolean autoRestartWithLock() {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
- configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
+ configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left;
if (pipeTaskInfo == null) {
LOGGER.warn("Failed to acquire pipe lock for auto restart pipe task.");
return false;
@@ -160,7 +160,7 @@ public class PipeMetaSyncer {
private boolean handleSuccessfulRestartWithLock() {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
- configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
+ configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left;
if (pipeTaskInfo == null) {
LOGGER.warn("Failed to acquire pipe lock for handling successful
restart.");
return false;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index ace07f5e2d3..8e13a0460e4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -99,7 +99,7 @@ public class PipeHeartbeatParser {
.submit(
() -> {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
-
configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
+
configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left;
if (pipeTaskInfo == null) {
LOGGER.warn(
"Failed to acquire lock when parseHeartbeat from node
(id={}).", nodeId);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index 4aaf3ab46c3..25f8e0c0cc5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,10 +69,11 @@ public class PipeTaskCoordinator {
* @return the pipe task info holder, which can be used to get the pipe task
info. The holder is
* null if the lock is not acquired.
*/
- public AtomicReference<PipeTaskInfo> tryLock() {
- if (pipeTaskCoordinatorLock.tryLock()) {
+ public Pair<AtomicReference<PipeTaskInfo>, Long> tryLock() {
+ long lockSeqId = pipeTaskCoordinatorLock.tryLock();
+ if (lockSeqId != -1) {
pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
- return pipeTaskInfoHolder;
+ return new Pair<>(pipeTaskInfoHolder, lockSeqId);
}
return null;
@@ -83,10 +85,10 @@ public class PipeTaskCoordinator {
* @return the {@link PipeTaskInfo} holder, which can be used to get the
{@link PipeTaskInfo}.
* Wait until lock is acquired
*/
- public AtomicReference<PipeTaskInfo> lock() {
- pipeTaskCoordinatorLock.lock();
+ public Pair<AtomicReference<PipeTaskInfo>, Long> lock() {
+ long lockSeqId = pipeTaskCoordinatorLock.lock();
pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
- return pipeTaskInfoHolder;
+ return new Pair<>(pipeTaskInfoHolder, lockSeqId);
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index e57add9a001..0caf620e7f2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -37,8 +37,9 @@ public class PipeTaskCoordinatorLock {
private final BlockingDeque<Long> deque = new LinkedBlockingDeque<>(1);
private final AtomicLong idGenerator = new AtomicLong(0);
+ private final AtomicLong lockSeqIdGenerator = new AtomicLong(0);
- public void lock() {
+ public long lock() {
try {
final long id = idGenerator.incrementAndGet();
LOGGER.debug(
@@ -50,15 +51,17 @@ public class PipeTaskCoordinatorLock {
"PipeTaskCoordinator lock (id: {}) acquired by thread {}",
id,
Thread.currentThread().getName());
+ return lockSeqIdGenerator.incrementAndGet();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error(
"Interrupted while waiting for PipeTaskCoordinator lock, current
thread: {}",
Thread.currentThread().getName());
+ return -1;
}
}
- public boolean tryLock() {
+ public long tryLock() {
try {
final long id = idGenerator.incrementAndGet();
LOGGER.debug(
@@ -70,20 +73,20 @@ public class PipeTaskCoordinatorLock {
"PipeTaskCoordinator lock (id: {}) acquired by thread {}",
id,
Thread.currentThread().getName());
- return true;
+ return lockSeqIdGenerator.incrementAndGet();
} else {
LOGGER.info(
"PipeTaskCoordinator lock (id: {}) failed to acquire by thread {}
because of timeout",
id,
Thread.currentThread().getName());
- return false;
+ return -1;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error(
"Interrupted while waiting for PipeTaskCoordinator lock, current
thread: {}",
Thread.currentThread().getName());
- return false;
+ return -1;
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index b52f958d30a..d90e1bbaa30 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -79,7 +79,7 @@ public class SubscriptionCoordinator {
/////////////////////////////// Lock ///////////////////////////////
public AtomicReference<SubscriptionInfo> tryLock() {
- if (coordinatorLock.tryLock()) {
+ if (coordinatorLock.tryLock() != -1) {
subscriptionInfoHolder = new AtomicReference<>(subscriptionInfo);
return subscriptionInfoHolder;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 0d8368583b4..f0f28ec2741 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import
org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
@@ -37,10 +38,12 @@ import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -49,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import static org.apache.iotdb.confignode.procedure.Procedure.NO_PROC_ID;
@@ -116,9 +120,41 @@ public class ProcedureExecutor<Env> {
recover();
}
+ /***
+ * Filter out pipe procedures that do not need to re-acquire lock and
re-execute when there are multiple locked pipe procedures during restore.
+ * @return non pipe procedures and one pipe procedure with max lock seq id
(if there is.)
+ */
+ private List<Procedure<Env>> filteredProcedureList(final
List<Procedure<Env>> procedures) {
+ List<Procedure<Env>> nonPipeOrLockedProcedures =
+ procedures.stream()
+ .filter(p -> !(p instanceof AbstractOperatePipeProcedureV2) ||
!p.isLockedWhenLoading())
+ .collect(Collectors.toList());
+
+ List<AbstractOperatePipeProcedureV2> lockedPipeProcedures =
+ procedures.stream()
+ .filter(p -> p instanceof AbstractOperatePipeProcedureV2 &&
p.isLockedWhenLoading())
+ .map(AbstractOperatePipeProcedureV2.class::cast)
+ .collect(Collectors.toList());
+ Optional<Procedure<Env>> maxPipeProcedure =
+ lockedPipeProcedures.stream()
+
.max(Comparator.comparingLong(AbstractOperatePipeProcedureV2::getLockSeqId))
+ .map(p -> (Procedure<Env>) p);
+
+ if (lockedPipeProcedures.size() > 1) {
+ LOG.warn(
+ "[Procedure restore]Detected multiple locked pipe procedures in
procedure executor {}, only keep last one {}",
+ lockedPipeProcedures,
+ maxPipeProcedure.get());
+ }
+
+ maxPipeProcedure.ifPresent(nonPipeOrLockedProcedures::add);
+ return nonPipeOrLockedProcedures;
+ }
+
private void recover() {
// 1.Build rollback stack
- List<Procedure<Env>> procedureList =
getProcedureListFromDifferentVersion();
+ List<Procedure<Env>> procedureList =
+ filteredProcedureList(getProcedureListFromDifferentVersion());
// Load procedure wal file
for (Procedure<Env> proc : procedureList) {
if (proc.isFinished()) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 25466d33983..0dde52ab5dd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,17 +95,17 @@ public abstract class AbstractOperatePipeProcedureV2
// This variable should not be serialized into procedure store,
// putting it here is just for convenience
protected AtomicReference<PipeTaskInfo> pipeTaskInfo;
+ protected long lockSeqId;
private static final String SKIP_PIPE_PROCEDURE_MESSAGE =
"Try to start a RUNNING pipe or stop a STOPPED pipe, do nothing.";
protected AtomicReference<PipeTaskInfo> acquireLockInternal(
ConfigNodeProcedureEnv configNodeProcedureEnv) {
- return configNodeProcedureEnv
- .getConfigManager()
- .getPipeManager()
- .getPipeTaskCoordinator()
- .lock();
+ Pair<AtomicReference<PipeTaskInfo>, Long> lockRes =
+
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
+ lockSeqId = lockRes.right;
+ return lockRes.left;
}
@Override
@@ -188,6 +189,10 @@ public abstract class AbstractOperatePipeProcedureV2
}
}
+ public long getLockSeqId() {
+ return lockSeqId;
+ }
+
protected abstract PipeTaskOperation getOperation();
/**
@@ -612,11 +617,15 @@ public abstract class AbstractOperatePipeProcedureV2
public void serialize(DataOutputStream stream) throws IOException {
super.serialize(stream);
ReadWriteIOUtils.write(isRollbackFromOperateOnDataNodesSuccessful, stream);
+ ReadWriteIOUtils.write(lockSeqId, stream);
}
@Override
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
isRollbackFromOperateOnDataNodesSuccessful =
ReadWriteIOUtils.readBool(byteBuffer);
+ if (byteBuffer.remaining() >= Long.BYTES) {
+ lockSeqId = ReadWriteIOUtils.readLong(byteBuffer);
+ }
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 665a3782a91..28c59933cb0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -122,7 +122,7 @@ public class DropPipePluginProcedure extends
AbstractNodeProcedure<DropPipePlugi
final SubscriptionCoordinator subscriptionCoordinator =
env.getConfigManager().getSubscriptionManager().getSubscriptionCoordinator();
- final AtomicReference<PipeTaskInfo> pipeTaskInfo =
pipeTaskCoordinator.lock();
+ final AtomicReference<PipeTaskInfo> pipeTaskInfo =
pipeTaskCoordinator.lock().left;
pipePluginCoordinator.lock();
SubscriptionInfo subscriptionInfo =
subscriptionCoordinator.getSubscriptionInfo();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 401859f0a7e..6b2fec6f381 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,11 +65,14 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
@Override
protected AtomicReference<PipeTaskInfo> acquireLockInternal(
ConfigNodeProcedureEnv configNodeProcedureEnv) {
- return configNodeProcedureEnv
- .getConfigManager()
- .getPipeManager()
- .getPipeTaskCoordinator()
- .tryLock();
+ Pair<AtomicReference<PipeTaskInfo>, Long> lockRes =
+ configNodeProcedureEnv
+ .getConfigManager()
+ .getPipeManager()
+ .getPipeTaskCoordinator()
+ .tryLock();
+ lockSeqId = lockRes.right;
+ return lockRes.left;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 393a8bd5ab8..81ed75b7db2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,11 +74,14 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
@Override
protected AtomicReference<PipeTaskInfo> acquireLockInternal(
ConfigNodeProcedureEnv configNodeProcedureEnv) {
- return configNodeProcedureEnv
- .getConfigManager()
- .getPipeManager()
- .getPipeTaskCoordinator()
- .tryLock();
+ Pair<AtomicReference<PipeTaskInfo>, Long> lockRes =
+ configNodeProcedureEnv
+ .getConfigManager()
+ .getPipeManager()
+ .getPipeTaskCoordinator()
+ .tryLock();
+ lockSeqId = lockRes.right;
+ return lockRes.left;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
index de90951262c..16391a9ef58 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
@@ -50,7 +50,12 @@ public abstract class
AbstractOperateSubscriptionAndPipeProcedure
LOGGER.info("ProcedureId {} try to acquire subscription and pipe lock.",
getProcId());
pipeTaskInfo =
-
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
+ configNodeProcedureEnv
+ .getConfigManager()
+ .getPipeManager()
+ .getPipeTaskCoordinator()
+ .lock()
+ .left;
if (pipeTaskInfo == null) {
LOGGER.warn("ProcedureId {} failed to acquire pipe lock.", getProcId());
} else {