This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 5a54947d168 Pipe: Optimize Drop Pipe high priority tasks cannot obtain
SubTask object lock (#15404) (#15610)
5a54947d168 is described below
commit 5a54947d16898db13db56234488ac163b1c3fea9
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu May 29 18:28:11 2025 +0800
Pipe: Optimize Drop Pipe high priority tasks cannot obtain SubTask object
lock (#15404) (#15610)
(cherry picked from commit ca8ce24f2f0c1703164412a1d704c1166ebbafaa)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../subtask/connector/PipeConnectorSubtask.java | 71 ++++++------
.../task/subtask/PipeAbstractConnectorSubtask.java | 127 +++++++++++++--------
2 files changed, 115 insertions(+), 83 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
index 359376f0f6d..f316e2cf462 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
@@ -232,41 +232,46 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
// Try to remove the events as much as possible
inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId);
- // synchronized to use the lastEvent & lastExceptionEvent
- synchronized (this) {
- // Here we discard the last event, and re-submit the pipe task to avoid
that the pipe task has
- // stopped submission but will not be stopped by critical exceptions,
because when it acquires
- // lock, the pipe is already dropped, thus it will do nothing.
- // Note that since we use a new thread to stop all the pipes, we will
not encounter deadlock
- // here. Or else we will.
- if (lastEvent instanceof EnrichedEvent
- && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
- && regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
- // Do not clear last event's reference count because it may be on
transferring
- lastEvent = null;
- // Submit self to avoid that the lastEvent has been retried "max
times" times and has
- // stopped executing.
- // 1. If the last event is still on execution, or submitted by the
previous "onSuccess" or
- // "onFailure", the "submitSelf" cause nothing.
- // 2. If the last event is waiting the instance lock to call
"onSuccess", then the callback
- // method will skip this turn of submission.
- // 3. If the last event is waiting to call "onFailure", then it will
be ignored because the
- // last event has been set to null.
- // 4. If the last event has called "onFailure" and caused the subtask
to stop submission,
- // it's submitted here and the "report" will wait for the "drop
pipe" lock to stop all
- // the pipes with critical exceptions. As illustrated above, the
"report" will do
- // nothing.
- submitSelf();
- }
+ highPriorityLockTaskCount.incrementAndGet();
+ try {
+ // synchronized to use the lastEvent & lastExceptionEvent
+ synchronized (this) {
+ // Here we discard the last event, and re-submit the pipe task to
avoid that the pipe task
+ // has stopped submission but will not be stopped by critical
exceptions, because when it
+ // acquires lock, the pipe is already dropped, thus it will do
nothing. Note that since we
+ // use a new thread to stop all the pipes, we will not encounter
deadlock here. Or else we
+ // will.
+ if (lastEvent instanceof EnrichedEvent
+ && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
+ && regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
+ // Do not clear the last event's reference counts because it may be
on transferring
+ lastEvent = null;
+ // Submit self to avoid that the lastEvent has been retried "max
times" times and has
+ // stopped executing.
+ // 1. If the last event is still on execution, or submitted by the
previous "onSuccess" or
+ // "onFailure", the "submitSelf" causes nothing.
+ // 2. If the last event is waiting the instance lock to call
"onSuccess", then the
+ // callback method will skip this turn of submission.
+ // 3. If the last event is waiting to call "onFailure", then it will
be ignored because
+ // the last event has been set to null.
+ // 4. If the last event has called "onFailure" and caused the
subtask to stop submission,
+ // it's submitted here and the "report" will wait for the "drop
pipe" lock to stop all
+ // the pipes with critical exceptions. As illustrated above, the
"report" will do
+ // nothing.
+ submitSelf();
+ }
- // We only clear the lastEvent's reference count when it's already on
failure. Namely, we
- // clear the lastExceptionEvent. It's safe to potentially clear it twice
because we have the
- // "nonnull" detection.
- if (lastExceptionEvent instanceof EnrichedEvent
- && pipeNameToDrop.equals(((EnrichedEvent)
lastExceptionEvent).getPipeName())
- && regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) {
- clearReferenceCountAndReleaseLastExceptionEvent();
+ // We only clear the lastEvent's reference counts when it's already on
failure. Namely, we
+ // clear the lastExceptionEvent. It's safe to potentially clear it
twice because we have the
+ // "nonnull" detection.
+ if (lastExceptionEvent instanceof EnrichedEvent
+ && pipeNameToDrop.equals(((EnrichedEvent)
lastExceptionEvent).getPipeName())
+ && regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId())
{
+ clearReferenceCountAndReleaseLastExceptionEvent();
+ }
}
+ } finally {
+ highPriorityLockTaskCount.decrementAndGet();
}
if (outputPipeConnector instanceof IoTDBConnector) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
index cfd987758e4..055a546e7e3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java
@@ -34,11 +34,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeAbstractConnectorSubtask.class);
+ // To ensure that high-priority tasks can obtain object locks first, a
counter is now used to save
+ // the number of high-priority tasks.
+ protected final AtomicLong highPriorityLockTaskCount = new AtomicLong(0);
+
// For output (transfer events to the target system in connector)
protected PipeConnector outputPipeConnector;
@@ -70,67 +75,76 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
}
@Override
- public synchronized void onSuccess(final Boolean
hasAtLeastOneEventProcessed) {
- isSubmitted = false;
+ public void onSuccess(final Boolean hasAtLeastOneEventProcessed) {
+ preScheduleLowPriorityTask(100);
+
+ synchronized (this) {
+ isSubmitted = false;
- super.onSuccess(hasAtLeastOneEventProcessed);
+ super.onSuccess(hasAtLeastOneEventProcessed);
+ }
}
@Override
- public synchronized void onFailure(final Throwable throwable) {
- isSubmitted = false;
+ public void onFailure(final Throwable throwable) {
+ preScheduleLowPriorityTask(100);
- if (isClosed.get()) {
- LOGGER.info(
- "onFailure in pipe transfer, ignored because the connector subtask
is dropped.",
- throwable);
- clearReferenceCountAndReleaseLastEvent(null);
- return;
- }
+ synchronized (this) {
+ isSubmitted = false;
- // We assume that the event is cleared as the "lastEvent" in processor
subtask and reaches the
- // connector subtask. Then, it may fail because of released resource and
block the other pipes
- // using the same connector. We simply discard it.
- if (lastExceptionEvent instanceof EnrichedEvent
- && ((EnrichedEvent) lastExceptionEvent).isReleased()) {
- LOGGER.info(
- "onFailure in pipe transfer, ignored because the failure event is
released.", throwable);
- submitSelf();
- return;
- }
+ if (isClosed.get()) {
+ LOGGER.info(
+ "onFailure in pipe transfer, ignored because the connector subtask
is dropped.",
+ throwable);
+ clearReferenceCountAndReleaseLastEvent(null);
+ return;
+ }
- // If lastExceptionEvent != lastEvent, it indicates that the lastEvent's
reference has been
- // changed because the pipe of it has been dropped. In that case, we just
discard the event.
- if (lastEvent != lastExceptionEvent) {
- LOGGER.info(
- "onFailure in pipe transfer, ignored because the failure event's
pipe is dropped.",
- throwable);
- clearReferenceCountAndReleaseLastExceptionEvent();
- submitSelf();
- return;
- }
+ // We assume that the event is cleared as the "lastEvent" in processor
subtask and reaches the
+ // connector subtask. Then, it may fail because of released resource and
block the other pipes
+ // using the same connector. We simply discard it.
+ if (lastExceptionEvent instanceof EnrichedEvent
+ && ((EnrichedEvent) lastExceptionEvent).isReleased()) {
+ LOGGER.info(
+ "onFailure in pipe transfer, ignored because the failure event is
released.",
+ throwable);
+ submitSelf();
+ return;
+ }
- if (throwable instanceof PipeConnectionException) {
- // Retry to connect to the target system if the connection is broken
- // We should reconstruct the client before re-submit the subtask
- if (onPipeConnectionException(throwable)) {
- // return if the pipe task should be stopped
+ // If lastExceptionEvent != lastEvent, it indicates that the lastEvent's
reference has been
+ // changed because the pipe of it has been dropped. In that case, we
just discard the event.
+ if (lastEvent != lastExceptionEvent) {
+ LOGGER.info(
+ "onFailure in pipe transfer, ignored because the failure event's
pipe is dropped.",
+ throwable);
+ clearReferenceCountAndReleaseLastExceptionEvent();
+ submitSelf();
return;
}
- }
- // Handle exceptions if any available clients exist
- // Notice that the PipeRuntimeConnectorCriticalException must be thrown
here
- // because the upper layer relies on this to stop all the related pipe
tasks
- // Other exceptions may cause the subtask to stop forever and can not be
restarted
- if (throwable instanceof PipeRuntimeConnectorCriticalException) {
- super.onFailure(throwable);
- } else {
- // Print stack trace for better debugging
- LOGGER.warn(
- "A non PipeRuntimeConnectorCriticalException occurred, will throw a
PipeRuntimeConnectorCriticalException.",
- throwable);
- super.onFailure(new
PipeRuntimeConnectorCriticalException(throwable.getMessage()));
+ if (throwable instanceof PipeConnectionException) {
+ // Retry to connect to the target system if the connection is broken
+ // We should reconstruct the client before re-submit the subtask
+ if (onPipeConnectionException(throwable)) {
+ // return if the pipe task should be stopped
+ return;
+ }
+ }
+
+ // Handle exceptions if any available clients exist
+ // Notice that the PipeRuntimeConnectorCriticalException must be thrown
here
+ // because the upper layer relies on this to stop all the related pipe
tasks
+ // Other exceptions may cause the subtask to stop forever and can not be
restarted
+ if (throwable instanceof PipeRuntimeConnectorCriticalException) {
+ super.onFailure(throwable);
+ } else {
+ // Print stack trace for better debugging
+ LOGGER.warn(
+ "A non PipeRuntimeConnectorCriticalException occurred, will throw
a PipeRuntimeConnectorCriticalException.",
+ throwable);
+ super.onFailure(new
PipeRuntimeConnectorCriticalException(throwable.getMessage()));
+ }
}
}
@@ -238,4 +252,17 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
lastExceptionEvent = null;
}
}
+
+ private void preScheduleLowPriorityTask(int maxRetries) {
+ while (highPriorityLockTaskCount.get() != 0L && maxRetries-- > 0) {
+ try {
+ // Introduce a short delay to avoid CPU spinning
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Interrupted while waiting for the high priority lock
task.", e);
+ break;
+ }
+ }
+ }
}