This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8106d97db49 Revert "Fix CN pipe procedures restore dead lock (#16324)"
(#16384)
8106d97db49 is described below
commit 8106d97db4954f74b77c26084f2735a38deb4c5b
Author: Jiang Tian <[email protected]>
AuthorDate: Wed Sep 10 16:20:05 2025 +0800
Revert "Fix CN pipe procedures restore dead lock (#16324)" (#16384)
This reverts commit b422e9a5c59391a2aa17234c3cf5a230bbce9830.
---
.../pipe/coordinator/runtime/PipeMetaSyncer.java | 4 +--
.../runtime/heartbeat/PipeHeartbeatParser.java | 2 +-
.../pipe/coordinator/task/PipeTaskCoordinator.java | 14 ++++----
.../coordinator/task/PipeTaskCoordinatorLock.java | 13 +++-----
.../subscription/SubscriptionCoordinator.java | 2 +-
.../confignode/procedure/ProcedureExecutor.java | 38 +---------------------
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 19 +++--------
.../impl/pipe/plugin/DropPipePluginProcedure.java | 2 +-
.../runtime/PipeHandleMetaChangeProcedure.java | 14 +++-----
.../impl/pipe/runtime/PipeMetaSyncProcedure.java | 14 +++-----
...bstractOperateSubscriptionAndPipeProcedure.java | 7 +---
11 files changed, 33 insertions(+), 96 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
index aaafcda346d..1df5ac021ca 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
@@ -146,7 +146,7 @@ public class PipeMetaSyncer {
private boolean autoRestartWithLock() {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
- configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left;
+ configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
if (pipeTaskInfo == null) {
LOGGER.warn("Failed to acquire pipe lock for auto restart pipe task.");
return false;
@@ -160,7 +160,7 @@ public class PipeMetaSyncer {
private boolean handleSuccessfulRestartWithLock() {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
- configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left;
+ configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
if (pipeTaskInfo == null) {
LOGGER.warn("Failed to acquire pipe lock for handling successful
restart.");
return false;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 8e13a0460e4..ace07f5e2d3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -99,7 +99,7 @@ public class PipeHeartbeatParser {
.submit(
() -> {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
-
configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left;
+
configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
if (pipeTaskInfo == null) {
LOGGER.warn(
"Failed to acquire lock when parseHeartbeat from node
(id={}).", nodeId);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index 25f8e0c0cc5..4aaf3ab46c3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -37,7 +37,6 @@ import
org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,11 +68,10 @@ public class PipeTaskCoordinator {
* @return the pipe task info holder, which can be used to get the pipe task
info. The holder is
* null if the lock is not acquired.
*/
- public Pair<AtomicReference<PipeTaskInfo>, Long> tryLock() {
- long lockSeqId = pipeTaskCoordinatorLock.tryLock();
- if (lockSeqId != -1) {
+ public AtomicReference<PipeTaskInfo> tryLock() {
+ if (pipeTaskCoordinatorLock.tryLock()) {
pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
- return new Pair<>(pipeTaskInfoHolder, lockSeqId);
+ return pipeTaskInfoHolder;
}
return null;
@@ -85,10 +83,10 @@ public class PipeTaskCoordinator {
* @return the {@link PipeTaskInfo} holder, which can be used to get the
{@link PipeTaskInfo}.
* Wait until lock is acquired
*/
- public Pair<AtomicReference<PipeTaskInfo>, Long> lock() {
- long lockSeqId = pipeTaskCoordinatorLock.lock();
+ public AtomicReference<PipeTaskInfo> lock() {
+ pipeTaskCoordinatorLock.lock();
pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
- return new Pair<>(pipeTaskInfoHolder, lockSeqId);
+ return pipeTaskInfoHolder;
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index 0caf620e7f2..e57add9a001 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -37,9 +37,8 @@ public class PipeTaskCoordinatorLock {
private final BlockingDeque<Long> deque = new LinkedBlockingDeque<>(1);
private final AtomicLong idGenerator = new AtomicLong(0);
- private final AtomicLong lockSeqIdGenerator = new AtomicLong(0);
- public long lock() {
+ public void lock() {
try {
final long id = idGenerator.incrementAndGet();
LOGGER.debug(
@@ -51,17 +50,15 @@ public class PipeTaskCoordinatorLock {
"PipeTaskCoordinator lock (id: {}) acquired by thread {}",
id,
Thread.currentThread().getName());
- return lockSeqIdGenerator.incrementAndGet();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error(
"Interrupted while waiting for PipeTaskCoordinator lock, current
thread: {}",
Thread.currentThread().getName());
- return -1;
}
}
- public long tryLock() {
+ public boolean tryLock() {
try {
final long id = idGenerator.incrementAndGet();
LOGGER.debug(
@@ -73,20 +70,20 @@ public class PipeTaskCoordinatorLock {
"PipeTaskCoordinator lock (id: {}) acquired by thread {}",
id,
Thread.currentThread().getName());
- return lockSeqIdGenerator.incrementAndGet();
+ return true;
} else {
LOGGER.info(
"PipeTaskCoordinator lock (id: {}) failed to acquire by thread {}
because of timeout",
id,
Thread.currentThread().getName());
- return -1;
+ return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error(
"Interrupted while waiting for PipeTaskCoordinator lock, current
thread: {}",
Thread.currentThread().getName());
- return -1;
+ return false;
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index d90e1bbaa30..b52f958d30a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -79,7 +79,7 @@ public class SubscriptionCoordinator {
/////////////////////////////// Lock ///////////////////////////////
public AtomicReference<SubscriptionInfo> tryLock() {
- if (coordinatorLock.tryLock() != -1) {
+ if (coordinatorLock.tryLock()) {
subscriptionInfoHolder = new AtomicReference<>(subscriptionInfo);
return subscriptionInfoHolder;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index f0f28ec2741..0d8368583b4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import
org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
@@ -38,12 +37,10 @@ import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Comparator;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -52,7 +49,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
import static org.apache.iotdb.confignode.procedure.Procedure.NO_PROC_ID;
@@ -120,41 +116,9 @@ public class ProcedureExecutor<Env> {
recover();
}
- /***
- * Filter out pipe procedures that do not need to re-acquire lock and
re-execute when there are multiple locked pipe procedures during restore.
- * @return non pipe procedures and one pipe procedure with max lock seq id
(if there is.)
- */
- private List<Procedure<Env>> filteredProcedureList(final
List<Procedure<Env>> procedures) {
- List<Procedure<Env>> nonPipeOrLockedProcedures =
- procedures.stream()
- .filter(p -> !(p instanceof AbstractOperatePipeProcedureV2) ||
!p.isLockedWhenLoading())
- .collect(Collectors.toList());
-
- List<AbstractOperatePipeProcedureV2> lockedPipeProcedures =
- procedures.stream()
- .filter(p -> p instanceof AbstractOperatePipeProcedureV2 &&
p.isLockedWhenLoading())
- .map(AbstractOperatePipeProcedureV2.class::cast)
- .collect(Collectors.toList());
- Optional<Procedure<Env>> maxPipeProcedure =
- lockedPipeProcedures.stream()
-
.max(Comparator.comparingLong(AbstractOperatePipeProcedureV2::getLockSeqId))
- .map(p -> (Procedure<Env>) p);
-
- if (lockedPipeProcedures.size() > 1) {
- LOG.warn(
- "[Procedure restore]Detected multiple locked pipe procedures in
procedure executor {}, only keep last one {}",
- lockedPipeProcedures,
- maxPipeProcedure.get());
- }
-
- maxPipeProcedure.ifPresent(nonPipeOrLockedProcedures::add);
- return nonPipeOrLockedProcedures;
- }
-
private void recover() {
// 1.Build rollback stack
- List<Procedure<Env>> procedureList =
- filteredProcedureList(getProcedureListFromDifferentVersion());
+ List<Procedure<Env>> procedureList =
getProcedureListFromDifferentVersion();
// Load procedure wal file
for (Procedure<Env> proc : procedureList) {
if (proc.isFinished()) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 0dde52ab5dd..25466d33983 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,17 +94,17 @@ public abstract class AbstractOperatePipeProcedureV2
// This variable should not be serialized into procedure store,
// putting it here is just for convenience
protected AtomicReference<PipeTaskInfo> pipeTaskInfo;
- protected long lockSeqId;
private static final String SKIP_PIPE_PROCEDURE_MESSAGE =
"Try to start a RUNNING pipe or stop a STOPPED pipe, do nothing.";
protected AtomicReference<PipeTaskInfo> acquireLockInternal(
ConfigNodeProcedureEnv configNodeProcedureEnv) {
- Pair<AtomicReference<PipeTaskInfo>, Long> lockRes =
-
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
- lockSeqId = lockRes.right;
- return lockRes.left;
+ return configNodeProcedureEnv
+ .getConfigManager()
+ .getPipeManager()
+ .getPipeTaskCoordinator()
+ .lock();
}
@Override
@@ -189,10 +188,6 @@ public abstract class AbstractOperatePipeProcedureV2
}
}
- public long getLockSeqId() {
- return lockSeqId;
- }
-
protected abstract PipeTaskOperation getOperation();
/**
@@ -617,15 +612,11 @@ public abstract class AbstractOperatePipeProcedureV2
public void serialize(DataOutputStream stream) throws IOException {
super.serialize(stream);
ReadWriteIOUtils.write(isRollbackFromOperateOnDataNodesSuccessful, stream);
- ReadWriteIOUtils.write(lockSeqId, stream);
}
@Override
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
isRollbackFromOperateOnDataNodesSuccessful =
ReadWriteIOUtils.readBool(byteBuffer);
- if (byteBuffer.remaining() >= Long.BYTES) {
- lockSeqId = ReadWriteIOUtils.readLong(byteBuffer);
- }
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 28c59933cb0..665a3782a91 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -122,7 +122,7 @@ public class DropPipePluginProcedure extends
AbstractNodeProcedure<DropPipePlugi
final SubscriptionCoordinator subscriptionCoordinator =
env.getConfigManager().getSubscriptionManager().getSubscriptionCoordinator();
- final AtomicReference<PipeTaskInfo> pipeTaskInfo =
pipeTaskCoordinator.lock().left;
+ final AtomicReference<PipeTaskInfo> pipeTaskInfo =
pipeTaskCoordinator.lock();
pipePluginCoordinator.lock();
SubscriptionInfo subscriptionInfo =
subscriptionCoordinator.getSubscriptionInfo();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 6b2fec6f381..401859f0a7e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -31,7 +31,6 @@ import
org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,14 +64,11 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
@Override
protected AtomicReference<PipeTaskInfo> acquireLockInternal(
ConfigNodeProcedureEnv configNodeProcedureEnv) {
- Pair<AtomicReference<PipeTaskInfo>, Long> lockRes =
- configNodeProcedureEnv
- .getConfigManager()
- .getPipeManager()
- .getPipeTaskCoordinator()
- .tryLock();
- lockSeqId = lockRes.right;
- return lockRes.left;
+ return configNodeProcedureEnv
+ .getConfigManager()
+ .getPipeManager()
+ .getPipeTaskCoordinator()
+ .tryLock();
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 81ed75b7db2..393a8bd5ab8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -39,7 +39,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,14 +73,11 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
@Override
protected AtomicReference<PipeTaskInfo> acquireLockInternal(
ConfigNodeProcedureEnv configNodeProcedureEnv) {
- Pair<AtomicReference<PipeTaskInfo>, Long> lockRes =
- configNodeProcedureEnv
- .getConfigManager()
- .getPipeManager()
- .getPipeTaskCoordinator()
- .tryLock();
- lockSeqId = lockRes.right;
- return lockRes.left;
+ return configNodeProcedureEnv
+ .getConfigManager()
+ .getPipeManager()
+ .getPipeTaskCoordinator()
+ .tryLock();
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
index 16391a9ef58..de90951262c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
@@ -50,12 +50,7 @@ public abstract class
AbstractOperateSubscriptionAndPipeProcedure
LOGGER.info("ProcedureId {} try to acquire subscription and pipe lock.",
getProcId());
pipeTaskInfo =
- configNodeProcedureEnv
- .getConfigManager()
- .getPipeManager()
- .getPipeTaskCoordinator()
- .lock()
- .left;
+
configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
if (pipeTaskInfo == null) {
LOGGER.warn("ProcedureId {} failed to acquire pipe lock.", getProcId());
} else {