This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch drop-pipe-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a3b7ea0bcd4173a562ce1f9d50c5728f79171db6
Author: Caideyipi <[email protected]>
AuthorDate: Mon Apr 27 14:28:36 2026 +0800

    drop
---
 .../agent/task/connection/PipeEventCollector.java  | 18 +++--
 .../sink/PipeRealtimePriorityBlockingQueue.java    |  4 +-
 .../task/connection/PipeEventCollectorTest.java    | 87 ++++++++++++++++++++++
 .../task/connection/BlockingPendingQueue.java      | 40 +++++++++-
 4 files changed, 139 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index a22848ee3ba..b000c5d2366 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -220,7 +220,8 @@ public class PipeEventCollector implements EventCollector {
 
   private void collectEvent(final Event event) {
     if (event instanceof EnrichedEvent) {
-      if (!((EnrichedEvent) 
event).increaseReferenceCount(PipeEventCollector.class.getName())) {
+      final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
+      if 
(!enrichedEvent.increaseReferenceCount(PipeEventCollector.class.getName())) {
         LOGGER.warn("PipeEventCollector: The event {} is already released, 
skipping it.", event);
         isFailedToIncreaseReferenceCount = true;
         return;
@@ -228,18 +229,25 @@ public class PipeEventCollector implements EventCollector 
{
 
       // Assign a commit id for this event in order to report progress in 
order.
       PipeEventCommitManager.getInstance()
-          .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, 
creationTime, regionId);
+          .enrichWithCommitterKeyAndCommitId(enrichedEvent, creationTime, 
regionId);
 
       // Assign a rebootTime for iotConsensusV2
-      ((EnrichedEvent) 
event).setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());
+      
enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());
+
+      if (enrichedEvent.getPipeName() != null
+          && pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), 
creationTime, regionId)) {
+        enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName());
+        return;
+      }
     }
 
     if (event instanceof PipeHeartbeatEvent) {
       ((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
     }
 
-    pendingQueue.offer(event);
-    collectInvocationCount.incrementAndGet();
+    if (pendingQueue.offer(event)) {
+      collectInvocationCount.incrementAndGet();
+    }
   }
 
   public void resetFlags() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
index 3d553f73595..f972bba0e6e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
@@ -73,7 +73,9 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
 
   @Override
   public boolean offer(final Event event) {
-    checkBeforeOffer(event);
+    if (!checkBeforeOffer(event)) {
+      return false;
+    }
 
     if (event instanceof TsFileInsertionEvent) {
       tsfileInsertEventDeque.add((TsFileInsertionEvent) event);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java
new file mode 100644
index 00000000000..bbd5e0b5e3d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.task.connection;
+
+import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import 
org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeRealtimePriorityBlockingQueue;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class PipeEventCollectorTest {
+
+  @Test
+  public void 
testCollectorDoesNotOfferEventsOfDroppedPipeToUnboundedPendingQueue() {
+    verifyCollectorDoesNotOfferEventsOfDroppedPipe(
+        new UnboundedBlockingPendingQueue<>(new PipeDataRegionEventCounter()));
+  }
+
+  @Test
+  public void 
testCollectorDoesNotOfferEventsOfDroppedPipeToRealtimePendingQueue() {
+    verifyCollectorDoesNotOfferEventsOfDroppedPipe(new 
PipeRealtimePriorityBlockingQueue());
+  }
+
+  private void verifyCollectorDoesNotOfferEventsOfDroppedPipe(
+      final UnboundedBlockingPendingQueue<Event> pendingQueue) {
+    pendingQueue.discardEventsOfPipe("pipe", 1L, 1);
+
+    final PipeEventCollector droppedPipeCollector =
+        new PipeEventCollector(pendingQueue, 1L, 1, false, false, false);
+    final PipeRawTabletInsertionEvent droppedPipeEvent =
+        createPipeRawTabletInsertionEvent("pipe", 1L);
+    droppedPipeCollector.collect(droppedPipeEvent);
+
+    Assert.assertTrue(droppedPipeEvent.isReleased());
+    Assert.assertEquals(0, pendingQueue.size());
+
+    final PipeEventCollector recreatedPipeCollector =
+        new PipeEventCollector(pendingQueue, 2L, 1, false, false, false);
+    final PipeRawTabletInsertionEvent recreatedPipeEvent =
+        createPipeRawTabletInsertionEvent("pipe", 2L);
+    recreatedPipeCollector.collect(recreatedPipeEvent);
+
+    Assert.assertFalse(recreatedPipeEvent.isReleased());
+    Assert.assertEquals(1, pendingQueue.size());
+
+    pendingQueue.discardAllEvents();
+    Assert.assertTrue(recreatedPipeEvent.isReleased());
+  }
+
+  private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent(
+      final String pipeName, final long creationTime) {
+    final List<IMeasurementSchema> schemaList =
+        Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64));
+    final Tablet tablet = new Tablet("root.db.d1", schemaList, 1);
+    tablet.addTimestamp(0, 1L);
+    tablet.addValue("s1", 0, 1L);
+    return new PipeRawTabletInsertionEvent(
+        false, "root.db", "db", "root.db", tablet, false, pipeName, 
creationTime, null, null, false);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
index b3b796ab6d8..7080a2fe6f9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
@@ -28,8 +28,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Set;
 import java.util.function.Consumer;
 
 public abstract class BlockingPendingQueue<E extends Event> {
@@ -43,6 +45,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
   protected final PipeEventCounter eventCounter;
 
   protected final AtomicBoolean isClosed = new AtomicBoolean(false);
+  protected final Set<String> droppedPipeTaskKeys = 
ConcurrentHashMap.newKeySet();
 
   protected BlockingPendingQueue(
       final BlockingQueue<E> pendingQueue, final PipeEventCounter 
eventCounter) {
@@ -51,7 +54,10 @@ public abstract class BlockingPendingQueue<E extends Event> {
   }
 
   public boolean offer(final E event) {
-    checkBeforeOffer(event);
+    if (!checkBeforeOffer(event)) {
+      return false;
+    }
+
     final boolean offered = pendingQueue.offer(event);
     if (offered) {
       eventCounter.increaseEventCount(event);
@@ -60,7 +66,9 @@ public abstract class BlockingPendingQueue<E extends Event> {
   }
 
   public boolean put(final E event) {
-    checkBeforeOffer(event);
+    if (!checkBeforeOffer(event)) {
+      return false;
+    }
     try {
       pendingQueue.put(event);
       eventCounter.increaseEventCount(event);
@@ -101,6 +109,7 @@ public abstract class BlockingPendingQueue<E extends Event> 
{
     isClosed.set(true);
     pendingQueue.clear();
     eventCounter.reset();
+    droppedPipeTaskKeys.clear();
   }
 
   /** DO NOT FORGET to set eventCounter to new value after invoking this 
method. */
@@ -120,10 +129,12 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
           return true;
         });
     eventCounter.reset();
+    droppedPipeTaskKeys.clear();
   }
 
   public void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    droppedPipeTaskKeys.add(generatePipeTaskKey(pipeNameToDrop, 
creationTimeToDrop, regionId));
     pendingQueue.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
@@ -158,10 +169,12 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
     return eventCounter.getPipeHeartbeatEventCount();
   }
 
-  protected void checkBeforeOffer(final E event) {
-    if (isClosed.get() && event instanceof EnrichedEvent) {
+  protected boolean checkBeforeOffer(final E event) {
+    final boolean shouldReject = isClosed.get() || 
isEventFromDroppedPipe(event);
+    if (shouldReject && event instanceof EnrichedEvent) {
       ((EnrichedEvent) 
event).clearReferenceCount(BlockingPendingQueue.class.getName());
     }
+    return !shouldReject;
   }
 
   protected static boolean isEventFromPipe(
@@ -173,4 +186,23 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
         && creationTimeToDrop == event.getCreationTime()
         && regionId == event.getRegionId();
   }
+
+  protected boolean isEventFromDroppedPipe(final E event) {
+    return event instanceof EnrichedEvent
+        && ((EnrichedEvent) event).getPipeName() != null
+        && isPipeDropped(
+            ((EnrichedEvent) event).getPipeName(),
+            ((EnrichedEvent) event).getCreationTime(),
+            ((EnrichedEvent) event).getRegionId());
+  }
+
+  public boolean isPipeDropped(
+      final String pipeName, final long creationTime, final int regionId) {
+    return droppedPipeTaskKeys.contains(generatePipeTaskKey(pipeName, 
creationTime, regionId));
+  }
+
+  private static String generatePipeTaskKey(
+      final String pipeName, final long creationTime, final int regionId) {
+    return pipeName + "_" + creationTime + "_" + regionId;
+  }
 }

Reply via email to