This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch drop-pipe-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4ec94cd7bf7668a7bbc57ffb14ae68317f8832a6
Author: Caideyipi <[email protected]>
AuthorDate: Mon Apr 27 11:18:40 2026 +0800

    drop-1
---
 .../sink/PipeRealtimePriorityBlockingQueue.java    |   9 ++--
 .../agent/task/subtask/sink/PipeSinkSubtask.java   |  10 ++--
 .../subtask/sink/PipeSinkSubtaskLifeCycle.java     |   5 +-
 .../task/subtask/sink/PipeSinkSubtaskManager.java  |   2 +-
 .../evolvable/batch/PipeTabletEventBatch.java      |   7 ++-
 .../batch/PipeTransferBatchReqBuilder.java         |   8 ++--
 .../thrift/async/IoTDBDataRegionAsyncSink.java     |  51 ++++++++++++++++----
 .../thrift/sync/IoTDBDataRegionSyncSink.java       |   5 +-
 .../subtask/SubscriptionSinkSubtaskLifeCycle.java  |   5 +-
 .../subtask/SubscriptionSinkSubtaskManager.java    |   2 +-
 .../apache/iotdb/db/pipe/sink/PipeSinkTest.java    |  53 +++++++++++++++++++++
 .../opc_security/8443_12686/iotdb-server.pfx       | Bin 0 -> 2942 bytes
 .../task/connection/BlockingPendingQueue.java      |  17 +++++--
 .../commons/pipe/sink/protocol/IoTDBSink.java      |   3 +-
 14 files changed, 145 insertions(+), 32 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
index 6d227ac31fd..3d553f73595 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
@@ -356,13 +356,14 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
   }
 
   @Override
-  public void discardEventsOfPipe(final String pipeNameToDrop, final int 
regionId) {
-    super.discardEventsOfPipe(pipeNameToDrop, regionId);
+  public void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
     tsfileInsertEventDeque.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
-              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
-              && regionId == ((EnrichedEvent) event).getRegionId()) {
+              && isEventFromPipe(
+                  ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, 
regionId)) {
             if (((EnrichedEvent) event)
                 
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
               eventCounter.decreaseEventCount(event);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 6e26a76d774..f6008822e61 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -198,9 +198,10 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
    * When a pipe is dropped, the connector maybe reused and will not be 
closed. So we just discard
    * its queued events in the output pipe connector.
    */
-  public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
+  public void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
     // Try to remove the events as much as possible
-    inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId);
+    inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, 
regionId);
 
     try {
       increaseHighPriorityTaskCount();
@@ -214,6 +215,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
         // will.
         if (lastEvent instanceof EnrichedEvent
             && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
+            && creationTimeToDrop == ((EnrichedEvent) 
lastEvent).getCreationTime()
             && regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
           // Do not clear the last event's reference counts because it may be 
on transferring
           lastEvent = null;
@@ -237,6 +239,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
         // "nonnull" detection.
         if (lastExceptionEvent instanceof EnrichedEvent
             && pipeNameToDrop.equals(((EnrichedEvent) 
lastExceptionEvent).getPipeName())
+            && creationTimeToDrop == ((EnrichedEvent) 
lastExceptionEvent).getCreationTime()
             && regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) 
{
           clearReferenceCountAndReleaseLastExceptionEvent();
         }
@@ -246,7 +249,8 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
     }
 
     if (outputPipeSink instanceof IoTDBSink) {
-      ((IoTDBSink) outputPipeSink).discardEventsOfPipe(pipeNameToDrop, 
regionId);
+      ((IoTDBSink) outputPipeSink)
+          .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
index 35f7983075d..1780f5a87ef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
@@ -92,12 +92,13 @@ public class PipeSinkSubtaskLifeCycle implements 
AutoCloseable {
    *     {@link PipeSinkSubtask} should never be used again
    * @throws IllegalStateException if {@link 
PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0
    */
-  public synchronized boolean deregister(final String pipeNameToDeregister, 
int regionId) {
+  public synchronized boolean deregister(
+      final String pipeNameToDeregister, final long creationTimeToDeregister, 
final int regionId) {
     if (registeredTaskCount <= 0) {
       throw new IllegalStateException("registeredTaskCount <= 0");
     }
 
-    subtask.discardEventsOfPipe(pipeNameToDeregister, regionId);
+    subtask.discardEventsOfPipe(pipeNameToDeregister, 
creationTimeToDeregister, regionId);
 
     try {
       if (registeredTaskCount > 1) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 9138a075918..4faa42db004 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -209,7 +209,7 @@ public class PipeSinkSubtaskManager {
     // Shall not be empty
     final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor;
 
-    lifeCycles.removeIf(o -> o.deregister(pipeName, regionId));
+    lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId));
 
     if (lifeCycles.isEmpty()) {
       attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
index 96bddd0d672..c44e12a4bbf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -154,10 +154,13 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
    * Discard all events of the given pipe. This method only clears the 
reference count of the events
    * and discard them, but do not modify other objects (such as buffers) for 
simplicity.
    */
-  public synchronized void discardEventsOfPipe(final String pipeNameToDrop, 
final int regionId) {
+  public synchronized void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
     events.removeIf(
         event -> {
-          if (pipeNameToDrop.equals(event.getPipeName()) && regionId == 
event.getRegionId()) {
+          if (pipeNameToDrop.equals(event.getPipeName())
+              && creationTimeToDrop == event.getCreationTime()
+              && regionId == event.getRegionId()) {
             
event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
             return true;
           }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index dd4d4fe1ce6..5bb76ae40e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -196,9 +196,11 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
         && 
endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
   }
 
-  public synchronized void discardEventsOfPipe(final String pipeNameToDrop, 
final int regionId) {
-    defaultBatch.discardEventsOfPipe(pipeNameToDrop, regionId);
-    endPointToBatch.values().forEach(batch -> 
batch.discardEventsOfPipe(pipeNameToDrop, regionId));
+  public synchronized void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, 
regionId);
+    endPointToBatch.values().forEach(
+        batch -> batch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, 
regionId));
   }
 
   public int size() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index d19639310cf..c00e36e0c7c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -77,6 +77,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -126,6 +127,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   private final AtomicBoolean isClosed = new AtomicBoolean(false);
   private final Map<PipeTransferTrackableHandler, 
PipeTransferTrackableHandler> pendingHandlers =
       new ConcurrentHashMap<>();
+  private final Set<String> droppedPipeTaskKeys = 
ConcurrentHashMap.newKeySet();
 
   private boolean enableSendTsFileLimit;
   private volatile boolean isConnectionException;
@@ -681,8 +683,15 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   public void addFailureEventToRetryQueue(final Event event, final Exception 
e) {
     isConnectionException =
         e instanceof PipeConnectionException || 
ThriftClient.isConnectionBroken(e);
-    if (event instanceof EnrichedEvent && ((EnrichedEvent) 
event).isReleased()) {
-      return;
+    if (event instanceof EnrichedEvent) {
+      final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
+      if (enrichedEvent.isReleased()) {
+        return;
+      }
+      if (isDroppedPipe(enrichedEvent)) {
+        
enrichedEvent.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
+        return;
+      }
     }
 
     if (isClosed.get()) {
@@ -728,15 +737,18 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   //////////////////////////// Operations for close 
////////////////////////////
 
   @Override
-  public synchronized void discardEventsOfPipe(final String pipeNameToDrop, 
final int regionId) {
-    if (isTabletBatchModeEnabled) {
-      tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId);
+  public synchronized void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    droppedPipeTaskKeys.add(generatePipeTaskKey(pipeNameToDrop, 
creationTimeToDrop, regionId));
+
+    if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) {
+      tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, 
creationTimeToDrop, regionId);
     }
     retryEventQueue.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
-              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
-              && regionId == ((EnrichedEvent) event).getRegionId()) {
+              && isDroppedPipe(
+                  (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, 
regionId)) {
             ((EnrichedEvent) 
event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
             retryEventQueueEventCounter.decreaseEventCount(event);
             return true;
@@ -747,8 +759,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     retryTsFileQueue.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
-              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
-              && regionId == ((EnrichedEvent) event).getRegionId()) {
+              && isDroppedPipe(
+                  (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, 
regionId)) {
             ((EnrichedEvent) 
event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
             retryEventQueueEventCounter.decreaseEventCount(event);
             return true;
@@ -792,6 +804,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
 
     // clear reference count of events in retry queue after closing async 
client
     clearRetryEventsReferenceCount();
+    droppedPipeTaskKeys.clear();
 
     super.close();
   }
@@ -848,6 +861,26 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     this.transferTsFileCounter = transferTsFileCounter;
   }
 
+  private boolean isDroppedPipe(final EnrichedEvent event) {
+    return droppedPipeTaskKeys.contains(
+        generatePipeTaskKey(event.getPipeName(), event.getCreationTime(), 
event.getRegionId()));
+  }
+
+  private static boolean isDroppedPipe(
+      final EnrichedEvent event,
+      final String pipeNameToDrop,
+      final long creationTimeToDrop,
+      final int regionId) {
+    return pipeNameToDrop.equals(event.getPipeName())
+        && creationTimeToDrop == event.getCreationTime()
+        && regionId == event.getRegionId();
+  }
+
+  private static String generatePipeTaskKey(
+      final String pipeName, final long creationTime, final int regionId) {
+    return pipeName + "_" + creationTime + "_" + regionId;
+  }
+
   @Override
   public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
     if (tabletBatchBuilder != null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index 9357a8c6a6d..e8c4420861c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -599,9 +599,10 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
   }
 
   @Override
-  public synchronized void discardEventsOfPipe(final String pipeNameToDrop, 
final int regionId) {
+  public synchronized void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
     if (Objects.nonNull(tabletBatchBuilder)) {
-      tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId);
+      tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, 
creationTimeToDrop, regionId);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
index 98163697374..390a6d58018 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
@@ -63,7 +63,10 @@ public class SubscriptionSinkSubtaskLifeCycle extends 
PipeSinkSubtaskLifeCycle {
   }
 
   @Override
-  public synchronized boolean deregister(final String pipeNameToDeregister, 
int regionId) {
+  public synchronized boolean deregister(
+      final String pipeNameToDeregister,
+      final long creationTimeToDeregister,
+      final int regionId) {
     if (registeredTaskCount <= 0) {
       throw new IllegalStateException("registeredTaskCount <= 0");
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
index 6d5f27d8172..16a9ee1a03d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
@@ -168,7 +168,7 @@ public class SubscriptionSinkSubtaskManager {
 
     final PipeSinkSubtaskLifeCycle lifeCycle =
         attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
-    if (lifeCycle.deregister(pipeName, regionId)) {
+    if (lifeCycle.deregister(pipeName, creationTime, regionId)) {
       attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
     }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
index ec2122b9175..7dfd0446038 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.sink;
 
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment;
@@ -30,6 +31,7 @@ import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncS
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.write.record.Tablet;
@@ -104,6 +106,46 @@ public class PipeSinkTest {
     }
   }
 
+  @Test
+  public void testAsyncSinkDropDoesNotRequeueDroppedPipeEvents() throws 
Exception {
+    try (final IoTDBDataRegionAsyncSink connector = new 
IoTDBDataRegionAsyncSink()) {
+      final PipeParameters parameters =
+          new PipeParameters(
+              new HashMap<String, String>() {
+                {
+                  put(
+                      PipeSinkConstant.CONNECTOR_KEY,
+                      
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
+                  put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, 
"127.0.0.1:6668");
+                }
+              });
+      connector.validate(new PipeParameterValidator(parameters));
+      connector.customize(
+          parameters,
+          new PipeTaskRuntimeConfiguration(new 
PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1)));
+
+      final PipeRawTabletInsertionEvent droppedEvent =
+          createPipeRawTabletInsertionEvent("pipe", 1L, 1);
+      droppedEvent.increaseReferenceCount("test");
+      droppedEvent.setCommitterKeyAndCommitId(new CommitterKey("pipe", 1L, 1, 
-1), 1L);
+
+      connector.discardEventsOfPipe("pipe", 1L, 1);
+      connector.addFailureEventToRetryQueue(droppedEvent, new 
PipeException("test"));
+
+      Assert.assertEquals(0, connector.getRetryEventQueueSize());
+      Assert.assertTrue(droppedEvent.isReleased());
+
+      final PipeRawTabletInsertionEvent recreatedPipeEvent =
+          createPipeRawTabletInsertionEvent("pipe", 2L, 1);
+      recreatedPipeEvent.increaseReferenceCount("test");
+      recreatedPipeEvent.setCommitterKeyAndCommitId(new CommitterKey("pipe", 
2L, 1, -1), 1L);
+
+      connector.addFailureEventToRetryQueue(recreatedPipeEvent, new 
PipeException("test"));
+
+      Assert.assertEquals(1, connector.getRetryEventQueueSize());
+    }
+  }
+
   @Test
   public void testOpcUaSink() {
     final List<IMeasurementSchema> schemaList =
@@ -194,4 +236,15 @@ public class PipeSinkTest {
       Assert.fail();
     }
   }
+
+  private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent(
+      final String pipeName, final long creationTime, final int regionId) {
+    final List<IMeasurementSchema> schemaList =
+        Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64));
+    final Tablet tablet = new Tablet("root.db.d" + regionId, schemaList, 1);
+    tablet.addTimestamp(0, 1L);
+    tablet.addValue("s1", 0, 1L);
+    return new PipeRawTabletInsertionEvent(
+        false, "root.db", "db", "root.db", tablet, false, pipeName, 
creationTime, null, null, false);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/resources/opc_security/8443_12686/iotdb-server.pfx
 
b/iotdb-core/datanode/src/test/resources/opc_security/8443_12686/iotdb-server.pfx
new file mode 100644
index 00000000000..b36d23da368
Binary files /dev/null and 
b/iotdb-core/datanode/src/test/resources/opc_security/8443_12686/iotdb-server.pfx
 differ
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
index 8773b03f9f3..b3b796ab6d8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
@@ -122,12 +122,13 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
     eventCounter.reset();
   }
 
-  public void discardEventsOfPipe(final String pipeNameToDrop, final int 
regionId) {
+  public void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
     pendingQueue.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
-              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
-              && regionId == ((EnrichedEvent) event).getRegionId()) {
+              && isEventFromPipe(
+                  ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, 
regionId)) {
             if (((EnrichedEvent) 
event).clearReferenceCount(BlockingPendingQueue.class.getName())) {
               eventCounter.decreaseEventCount(event);
             }
@@ -162,4 +163,14 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
       ((EnrichedEvent) 
event).clearReferenceCount(BlockingPendingQueue.class.getName());
     }
   }
+
+  protected static boolean isEventFromPipe(
+      final EnrichedEvent event,
+      final String pipeNameToDrop,
+      final long creationTimeToDrop,
+      final int regionId) {
+    return pipeNameToDrop.equals(event.getPipeName())
+        && creationTimeToDrop == event.getCreationTime()
+        && regionId == event.getRegionId();
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index 88a8b71775f..7de06376b6d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -641,7 +641,8 @@ public abstract class IoTDBSink implements PipeConnector {
    * When a pipe is dropped, the connector maybe reused and will not be 
closed. We need to discard
    * its batched or queued events in the output pipe connector.
    */
-  public synchronized void discardEventsOfPipe(final String pipeName, final 
int regionId) {
+  public synchronized void discardEventsOfPipe(
+      final String pipeName, final long creationTime, final int regionId) {
     // Do nothing by default
   }
 

Reply via email to