This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 84b2fd93b9f Pipe: avoid event accumulation in the stale sink pending
queue & promptly clean up closed processor subtasks & close parser when
releasing phantom reference of tsfile event (#14820) (#14835)
84b2fd93b9f is described below
commit 84b2fd93b9f41b0352c6284c46eb52e2fdfa0933
Author: VGalaxies <[email protected]>
AuthorDate: Fri Feb 14 17:55:52 2025 +0800
Pipe: avoid event accumulation in the stale sink pending queue & promptly
clean up closed processor subtasks & close parser when releasing phantom
reference of tsfile event (#14820) (#14835)
---
.../PipeRealtimePriorityBlockingQueue.java | 2 +
.../subtask/processor/PipeProcessorSubtask.java | 5 +--
.../processor/PipeProcessorSubtaskWorker.java | 13 ++----
.../common/tsfile/PipeTsFileInsertionEvent.java | 51 ++++++++++++++++------
.../task/connection/BlockingPendingQueue.java | 14 ++++++
5 files changed, 59 insertions(+), 26 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
index 185b60c29e9..3edbcef539c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -51,6 +51,8 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
@Override
public boolean directOffer(final Event event) {
+ checkBeforeOffer(event);
+
if (event instanceof TsFileInsertionEvent) {
tsfileInsertEventDeque.add((TsFileInsertionEvent) event);
return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index 7f29b3c55f9..b4bd024bfec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -228,10 +228,9 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
PipeProcessorMetrics.getInstance().deregister(taskID);
try {
isClosed.set(true);
-
- // pipeProcessor closes first, then no more events will be added into
outputEventCollector.
- // only after that, outputEventCollector can be closed.
pipeProcessor.close();
+ // It is important to note that even if the subtask and its
corresponding processor are
+ // closed, the execution thread may still deliver events downstream.
} catch (final Exception e) {
LOGGER.info(
"Exception occurred when closing pipe processor subtask {}, root
cause: {}",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtaskWorker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtaskWorker.java
index 44fd578c84d..813d0731721 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtaskWorker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtaskWorker.java
@@ -33,9 +33,6 @@ public class PipeProcessorSubtaskWorker extends
WrappedRunnable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeProcessorSubtaskWorker.class);
- private static final long CLOSED_SUBTASK_CLEANUP_ROUND_INTERVAL = 1000;
- private long closedSubtaskCleanupRoundCounter = 0;
-
private static final int SLEEP_INTERVAL_ADJUSTMENT_ROUND_INTERVAL = 100;
private int totalRoundInAdjustmentInterval = 0;
private int workingRoundInAdjustmentInterval = 0;
@@ -56,12 +53,10 @@ public class PipeProcessorSubtaskWorker extends
WrappedRunnable {
}
private void cleanupClosedSubtasksIfNecessary() {
- if (++closedSubtaskCleanupRoundCounter %
CLOSED_SUBTASK_CLEANUP_ROUND_INTERVAL == 0) {
- subtasks.stream()
- .filter(PipeProcessorSubtask::isClosed)
- .collect(Collectors.toList())
- .forEach(subtasks::remove);
- }
+ subtasks.stream()
+ .filter(PipeProcessorSubtask::isClosed)
+ .collect(Collectors.toList())
+ .forEach(subtasks::remove);
}
private boolean runSubtasks() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 71a21eb5f05..3bd7254ec24 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -55,6 +55,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileInsertionEvent, ReferenceTrackableEvent {
@@ -74,7 +75,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
private final boolean isGeneratedByHistoricalExtractor;
private final AtomicBoolean isClosed;
- private TsFileInsertionDataContainer dataContainer;
+ private final AtomicReference<TsFileInsertionDataContainer> dataContainer;
// The point count of the TsFile. Used for metrics on PipeConsensus'
receiver side.
// May be updated after it is flushed. Should be negative if not set.
@@ -163,6 +164,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
// If the status is "closed", then the resource status is "closed", the
tsFile won't be altered
// and can be sent.
isClosed.set(resource.isClosed());
+
+ this.dataContainer = new AtomicReference<>(null);
}
/**
@@ -489,13 +492,12 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent
private TsFileInsertionDataContainer initDataContainer() {
try {
- if (dataContainer == null) {
- dataContainer =
- new TsFileInsertionDataContainerProvider(
- tsFile, pipePattern, startTime, endTime, pipeTaskMeta,
this)
- .provide();
- }
- return dataContainer;
+ dataContainer.compareAndSet(
+ null,
+ new TsFileInsertionDataContainerProvider(
+ tsFile, pipePattern, startTime, endTime, pipeTaskMeta, this)
+ .provide());
+ return dataContainer.get();
} catch (final IOException e) {
close();
@@ -532,10 +534,13 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent
/** Release the resource of {@link TsFileInsertionDataContainer}. */
@Override
public void close() {
- if (dataContainer != null) {
- dataContainer.close();
- dataContainer = null;
- }
+ dataContainer.getAndUpdate(
+ container -> {
+ if (Objects.nonNull(container)) {
+ container.close();
+ }
+ return null;
+ });
}
/////////////////////////// Object ///////////////////////////
@@ -568,7 +573,12 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
@Override
public PipeEventResource eventResourceBuilder() {
return new PipeTsFileInsertionEventResource(
- this.isReleased, this.referenceCount, this.tsFile, this.isWithMod,
this.modFile);
+ this.isReleased,
+ this.referenceCount,
+ this.tsFile,
+ this.isWithMod,
+ this.modFile,
+ this.dataContainer);
}
private static class PipeTsFileInsertionEventResource extends
PipeEventResource {
@@ -576,26 +586,39 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent
private final File tsFile;
private final boolean isWithMod;
private final File modFile;
+ private final AtomicReference<TsFileInsertionDataContainer> dataContainer;
private PipeTsFileInsertionEventResource(
final AtomicBoolean isReleased,
final AtomicInteger referenceCount,
final File tsFile,
final boolean isWithMod,
- final File modFile) {
+ final File modFile,
+ final AtomicReference<TsFileInsertionDataContainer> dataContainer) {
super(isReleased, referenceCount);
this.tsFile = tsFile;
this.isWithMod = isWithMod;
this.modFile = modFile;
+ this.dataContainer = dataContainer;
}
@Override
protected void finalizeResource() {
try {
+ // decrease reference count
PipeDataNodeResourceManager.tsfile().decreaseFileReference(tsFile);
if (isWithMod) {
PipeDataNodeResourceManager.tsfile().decreaseFileReference(modFile);
}
+
+ // close data container
+ dataContainer.getAndUpdate(
+ container -> {
+ if (Objects.nonNull(container)) {
+ container.close();
+ }
+ return null;
+ });
} catch (final Exception e) {
LOGGER.warn(
String.format("Decrease reference count for TsFile %s error.",
tsFile.getPath()), e);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
index 83ad7f04727..82518bda14d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
public abstract class BlockingPendingQueue<E extends Event> {
@@ -42,6 +43,8 @@ public abstract class BlockingPendingQueue<E extends Event> {
protected final PipeEventCounter eventCounter;
+ protected final AtomicBoolean isClosed = new AtomicBoolean(false);
+
protected BlockingPendingQueue(
final BlockingQueue<E> pendingQueue, final PipeEventCounter
eventCounter) {
this.pendingQueue = pendingQueue;
@@ -49,6 +52,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
}
public boolean waitedOffer(final E event) {
+ checkBeforeOffer(event);
try {
final boolean offered =
pendingQueue.offer(event, MAX_BLOCKING_TIME_MS,
TimeUnit.MILLISECONDS);
@@ -64,6 +68,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
}
public boolean directOffer(final E event) {
+ checkBeforeOffer(event);
final boolean offered = pendingQueue.offer(event);
if (offered) {
eventCounter.increaseEventCount(event);
@@ -72,6 +77,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
}
public boolean put(final E event) {
+ checkBeforeOffer(event);
try {
pendingQueue.put(event);
eventCounter.increaseEventCount(event);
@@ -102,6 +108,7 @@ public abstract class BlockingPendingQueue<E extends Event>
{
}
public void clear() {
+ isClosed.set(true);
pendingQueue.clear();
eventCounter.reset();
}
@@ -112,6 +119,7 @@ public abstract class BlockingPendingQueue<E extends Event>
{
}
public void discardAllEvents() {
+ isClosed.set(true);
pendingQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent) {
@@ -158,4 +166,10 @@ public abstract class BlockingPendingQueue<E extends
Event> {
public int getPipeHeartbeatEventCount() {
return eventCounter.getPipeHeartbeatEventCount();
}
+
+ protected void checkBeforeOffer(final E event) {
+ if (isClosed.get() && event instanceof EnrichedEvent) {
+ ((EnrichedEvent)
event).clearReferenceCount(BlockingPendingQueue.class.getName());
+ }
+ }
}