This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 84b2fd93b9f Pipe: avoid event accumulation in the stale sink pending 
queue & promptly clean up closed processor subtasks & close parser when 
releasing phantom reference of tsfile event (#14820) (#14835)
84b2fd93b9f is described below

commit 84b2fd93b9f41b0352c6284c46eb52e2fdfa0933
Author: VGalaxies <[email protected]>
AuthorDate: Fri Feb 14 17:55:52 2025 +0800

    Pipe: avoid event accumulation in the stale sink pending queue & promptly 
clean up closed processor subtasks & close parser when releasing phantom 
reference of tsfile event (#14820) (#14835)
---
 .../PipeRealtimePriorityBlockingQueue.java         |  2 +
 .../subtask/processor/PipeProcessorSubtask.java    |  5 +--
 .../processor/PipeProcessorSubtaskWorker.java      | 13 ++----
 .../common/tsfile/PipeTsFileInsertionEvent.java    | 51 ++++++++++++++++------
 .../task/connection/BlockingPendingQueue.java      | 14 ++++++
 5 files changed, 59 insertions(+), 26 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
index 185b60c29e9..3edbcef539c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -51,6 +51,8 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
 
   @Override
   public boolean directOffer(final Event event) {
+    checkBeforeOffer(event);
+
     if (event instanceof TsFileInsertionEvent) {
       tsfileInsertEventDeque.add((TsFileInsertionEvent) event);
       return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index 7f29b3c55f9..b4bd024bfec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -228,10 +228,9 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
     PipeProcessorMetrics.getInstance().deregister(taskID);
     try {
       isClosed.set(true);
-
-      // pipeProcessor closes first, then no more events will be added into 
outputEventCollector.
-      // only after that, outputEventCollector can be closed.
       pipeProcessor.close();
+      // It is important to note that even if the subtask and its 
corresponding processor are
+      // closed, the execution thread may still deliver events downstream.
     } catch (final Exception e) {
       LOGGER.info(
           "Exception occurred when closing pipe processor subtask {}, root 
cause: {}",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtaskWorker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtaskWorker.java
index 44fd578c84d..813d0731721 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtaskWorker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtaskWorker.java
@@ -33,9 +33,6 @@ public class PipeProcessorSubtaskWorker extends 
WrappedRunnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeProcessorSubtaskWorker.class);
 
-  private static final long CLOSED_SUBTASK_CLEANUP_ROUND_INTERVAL = 1000;
-  private long closedSubtaskCleanupRoundCounter = 0;
-
   private static final int SLEEP_INTERVAL_ADJUSTMENT_ROUND_INTERVAL = 100;
   private int totalRoundInAdjustmentInterval = 0;
   private int workingRoundInAdjustmentInterval = 0;
@@ -56,12 +53,10 @@ public class PipeProcessorSubtaskWorker extends 
WrappedRunnable {
   }
 
   private void cleanupClosedSubtasksIfNecessary() {
-    if (++closedSubtaskCleanupRoundCounter % 
CLOSED_SUBTASK_CLEANUP_ROUND_INTERVAL == 0) {
-      subtasks.stream()
-          .filter(PipeProcessorSubtask::isClosed)
-          .collect(Collectors.toList())
-          .forEach(subtasks::remove);
-    }
+    subtasks.stream()
+        .filter(PipeProcessorSubtask::isClosed)
+        .collect(Collectors.toList())
+        .forEach(subtasks::remove);
   }
 
   private boolean runSubtasks() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 71a21eb5f05..3bd7254ec24 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -55,6 +55,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeTsFileInsertionEvent extends EnrichedEvent
     implements TsFileInsertionEvent, ReferenceTrackableEvent {
@@ -74,7 +75,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
   private final boolean isGeneratedByHistoricalExtractor;
 
   private final AtomicBoolean isClosed;
-  private TsFileInsertionDataContainer dataContainer;
+  private final AtomicReference<TsFileInsertionDataContainer> dataContainer;
 
   // The point count of the TsFile. Used for metrics on PipeConsensus' 
receiver side.
   // May be updated after it is flushed. Should be negative if not set.
@@ -163,6 +164,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
     // If the status is "closed", then the resource status is "closed", the 
tsFile won't be altered
     // and can be sent.
     isClosed.set(resource.isClosed());
+
+    this.dataContainer = new AtomicReference<>(null);
   }
 
   /**
@@ -489,13 +492,12 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent
 
   private TsFileInsertionDataContainer initDataContainer() {
     try {
-      if (dataContainer == null) {
-        dataContainer =
-            new TsFileInsertionDataContainerProvider(
-                    tsFile, pipePattern, startTime, endTime, pipeTaskMeta, 
this)
-                .provide();
-      }
-      return dataContainer;
+      dataContainer.compareAndSet(
+          null,
+          new TsFileInsertionDataContainerProvider(
+                  tsFile, pipePattern, startTime, endTime, pipeTaskMeta, this)
+              .provide());
+      return dataContainer.get();
     } catch (final IOException e) {
       close();
 
@@ -532,10 +534,13 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent
   /** Release the resource of {@link TsFileInsertionDataContainer}. */
   @Override
   public void close() {
-    if (dataContainer != null) {
-      dataContainer.close();
-      dataContainer = null;
-    }
+    dataContainer.getAndUpdate(
+        container -> {
+          if (Objects.nonNull(container)) {
+            container.close();
+          }
+          return null;
+        });
   }
 
   /////////////////////////// Object ///////////////////////////
@@ -568,7 +573,12 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
   @Override
   public PipeEventResource eventResourceBuilder() {
     return new PipeTsFileInsertionEventResource(
-        this.isReleased, this.referenceCount, this.tsFile, this.isWithMod, 
this.modFile);
+        this.isReleased,
+        this.referenceCount,
+        this.tsFile,
+        this.isWithMod,
+        this.modFile,
+        this.dataContainer);
   }
 
   private static class PipeTsFileInsertionEventResource extends 
PipeEventResource {
@@ -576,26 +586,39 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent
     private final File tsFile;
     private final boolean isWithMod;
     private final File modFile;
+    private final AtomicReference<TsFileInsertionDataContainer> dataContainer;
 
     private PipeTsFileInsertionEventResource(
         final AtomicBoolean isReleased,
         final AtomicInteger referenceCount,
         final File tsFile,
         final boolean isWithMod,
-        final File modFile) {
+        final File modFile,
+        final AtomicReference<TsFileInsertionDataContainer> dataContainer) {
       super(isReleased, referenceCount);
       this.tsFile = tsFile;
       this.isWithMod = isWithMod;
       this.modFile = modFile;
+      this.dataContainer = dataContainer;
     }
 
     @Override
     protected void finalizeResource() {
       try {
+        // decrease reference count
         PipeDataNodeResourceManager.tsfile().decreaseFileReference(tsFile);
         if (isWithMod) {
           PipeDataNodeResourceManager.tsfile().decreaseFileReference(modFile);
         }
+
+        // close data container
+        dataContainer.getAndUpdate(
+            container -> {
+              if (Objects.nonNull(container)) {
+                container.close();
+              }
+              return null;
+            });
       } catch (final Exception e) {
         LOGGER.warn(
             String.format("Decrease reference count for TsFile %s error.", 
tsFile.getPath()), e);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
index 83ad7f04727..82518bda14d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
 public abstract class BlockingPendingQueue<E extends Event> {
@@ -42,6 +43,8 @@ public abstract class BlockingPendingQueue<E extends Event> {
 
   protected final PipeEventCounter eventCounter;
 
+  protected final AtomicBoolean isClosed = new AtomicBoolean(false);
+
   protected BlockingPendingQueue(
       final BlockingQueue<E> pendingQueue, final PipeEventCounter 
eventCounter) {
     this.pendingQueue = pendingQueue;
@@ -49,6 +52,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
   }
 
   public boolean waitedOffer(final E event) {
+    checkBeforeOffer(event);
     try {
       final boolean offered =
           pendingQueue.offer(event, MAX_BLOCKING_TIME_MS, 
TimeUnit.MILLISECONDS);
@@ -64,6 +68,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
   }
 
   public boolean directOffer(final E event) {
+    checkBeforeOffer(event);
     final boolean offered = pendingQueue.offer(event);
     if (offered) {
       eventCounter.increaseEventCount(event);
@@ -72,6 +77,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
   }
 
   public boolean put(final E event) {
+    checkBeforeOffer(event);
     try {
       pendingQueue.put(event);
       eventCounter.increaseEventCount(event);
@@ -102,6 +108,7 @@ public abstract class BlockingPendingQueue<E extends Event> 
{
   }
 
   public void clear() {
+    isClosed.set(true);
     pendingQueue.clear();
     eventCounter.reset();
   }
@@ -112,6 +119,7 @@ public abstract class BlockingPendingQueue<E extends Event> 
{
   }
 
   public void discardAllEvents() {
+    isClosed.set(true);
     pendingQueue.removeIf(
         event -> {
           if (event instanceof EnrichedEvent) {
@@ -158,4 +166,10 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
   public int getPipeHeartbeatEventCount() {
     return eventCounter.getPipeHeartbeatEventCount();
   }
+
+  protected void checkBeforeOffer(final E event) {
+    if (isClosed.get() && event instanceof EnrichedEvent) {
+      ((EnrichedEvent) 
event).clearReferenceCount(BlockingPendingQueue.class.getName());
+    }
+  }
 }

Reply via email to