This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch drop-pipe-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 543efacc830755c006836575e65fb2e1ab7ed09d
Author: Caideyipi <[email protected]>
AuthorDate: Mon Apr 27 12:22:13 2026 +0800

    wd
---
 .../agent/task/subtask/sink/PipeSinkSubtask.java   |   5 +-
 .../websocket/WebSocketConnectorServer.java        | 143 +++++++++++++++++----
 .../sink/protocol/websocket/WebSocketSink.java     |  11 +-
 .../task/subtask/sink/PipeSinkSubtaskTest.java     |  61 +++++++++
 .../apache/iotdb/db/pipe/sink/PipeSinkTest.java    |  42 ++++++
 .../commons/pipe/sink/protocol/IoTDBSink.java      |   2 +-
 .../protocol/PipeConnectorWithEventDiscard.java    |  25 ++++
 7 files changed, 259 insertions(+), 30 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index f6008822e61..dd31d5b5fd8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
 import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
+import 
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
 import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
@@ -248,8 +249,8 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
       decreaseHighPriorityTaskCount();
     }
 
-    if (outputPipeSink instanceof IoTDBSink) {
-      ((IoTDBSink) outputPipeSink)
+    if (outputPipeSink instanceof PipeConnectorWithEventDiscard) {
+      ((PipeConnectorWithEventDiscard) outputPipeSink)
           .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
index 491d40a1e45..6b402b27026 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
@@ -39,6 +39,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,6 +57,7 @@ public class WebSocketConnectorServer extends WebSocketServer 
{
   // Map<pipeName, Map<eventId, Tuple<connector, event>>>
   private final ConcurrentHashMap<String, ConcurrentHashMap<Long, 
EventWaitingForAck>>
       eventsWaitingForAck = new ConcurrentHashMap<>();
+  private final Set<String> droppedPipeTaskKeys = 
ConcurrentHashMap.newKeySet();
 
   private final BidiMap<String, WebSocket> router =
       new DualTreeBidiMap<String, WebSocket>(null, 
Comparator.comparing(Object::hashCode)) {};
@@ -97,13 +99,8 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
           eventWrappers = new ArrayList<>(eventTransferQueue);
           eventTransferQueue.clear();
         }
-        eventWrappers.forEach(
-            (eventWrapper) -> {
-              if (eventWrapper.event instanceof EnrichedEvent) {
-                ((EnrichedEvent) eventWrapper.event)
-                    
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
-              }
-            });
+        eventWrappers.forEach(eventWrapper -> 
discardEvent(eventWrapper.event));
+        eventWrappers.clear();
         synchronized (eventTransferQueue) {
           eventTransferQueue.notifyAll();
         }
@@ -113,13 +110,35 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
     if (eventsWaitingForAck.containsKey(pipeName)) {
       eventsWaitingForAck
           .remove(pipeName)
-          .forEach(
-              (eventId, eventWrapper) -> {
-                if (eventWrapper.event instanceof EnrichedEvent) {
-                  ((EnrichedEvent) eventWrapper.event)
-                      
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
-                }
-              });
+          .forEach((eventId, eventWrapper) -> 
discardEvent(eventWrapper.event));
+    }
+
+    droppedPipeTaskKeys.removeIf(key -> key.startsWith(pipeName + "_"));
+  }
+
+  public synchronized void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    droppedPipeTaskKeys.add(generatePipeTaskKey(pipeNameToDrop, 
creationTimeToDrop, regionId));
+
+    final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue =
+        eventsWaitingForTransfer.get(pipeNameToDrop);
+    if (eventTransferQueue != null) {
+      eventTransferQueue.removeIf(
+          eventWrapper ->
+              discardIfMatches(
+                  eventWrapper.event, pipeNameToDrop, creationTimeToDrop, 
regionId));
+      synchronized (eventTransferQueue) {
+        eventTransferQueue.notifyAll();
+      }
+    }
+
+    final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap =
+        eventsWaitingForAck.get(pipeNameToDrop);
+    if (eventId2EventMap != null) {
+      eventId2EventMap.entrySet().removeIf(
+          entry ->
+              discardIfMatches(
+                  entry.getValue().event, pipeNameToDrop, creationTimeToDrop, 
regionId));
     }
   }
 
@@ -300,21 +319,24 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
   }
 
   public void addEvent(Event event, WebSocketSink connector) {
+    if (isDroppedPipe(event)) {
+      discardEvent(event);
+      return;
+    }
+
+    final String pipeName = connector.getPipeName();
     final PriorityBlockingQueue<EventWaitingForTransfer> queue =
-        eventsWaitingForTransfer.get(connector.getPipeName());
+        eventsWaitingForTransfer.get(pipeName);
 
     if (queue == null) {
       LOGGER.warn("The pipe {} was dropped so the event {} will be dropped.", 
connector, event);
-      if (event instanceof EnrichedEvent) {
-        ((EnrichedEvent) event)
-            .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), 
false);
-      }
+      discardEvent(event);
       return;
     }
 
     if (queue.size() >= 5) {
       synchronized (queue) {
-        while (queue.size() >= 5) {
+        while (queue.size() >= 5 && isQueueAvailable(pipeName, queue) && 
!isDroppedPipe(event)) {
           try {
             queue.wait();
           } catch (InterruptedException e) {
@@ -323,15 +345,27 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
           }
         }
 
+        if (!isQueueAvailable(pipeName, queue) || isDroppedPipe(event)) {
+          discardEvent(event);
+          return;
+        }
+
         queue.put(
             new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), 
connector, event));
         return;
       }
     }
 
+    if (!isQueueAvailable(pipeName, queue) || isDroppedPipe(event)) {
+      discardEvent(event);
+      return;
+    }
+
     synchronized (queue) {
       queue.put(new 
EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event));
     }
+
+    queue.put(new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), 
connector, event));
   }
 
   private class TransferThread extends Thread {
@@ -377,6 +411,11 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
       final WebSocketSink connector = element.connector;
 
       try {
+        if (isDroppedPipe(event)) {
+          discardEvent(event);
+          return;
+        }
+
         ByteBuffer tabletBuffer;
         if (event instanceof PipeRawTabletInsertionEvent) {
           tabletBuffer = ((PipeRawTabletInsertionEvent) 
event).convertToTablet().serialize();
@@ -387,7 +426,11 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
         }
 
         if (tabletBuffer == null) {
-          connector.commit((EnrichedEvent) event);
+          if (isDroppedPipe(event)) {
+            discardEvent(event);
+          } else {
+            connector.commit((EnrichedEvent) event);
+          }
           return;
         }
 
@@ -398,11 +441,17 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
 
         server.broadcast(payload, 
Collections.singletonList(router.get(pipeName)));
 
+        if (isDroppedPipe(event)) {
+          discardEvent(event);
+          return;
+        }
+
         final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap =
             eventsWaitingForAck.get(pipeName);
         if (eventId2EventMap == null) {
           LOGGER.warn(
               "The pipe {} was dropped so the event ack {} will be ignored.", 
pipeName, eventId);
+          discardEvent(event);
           return;
         }
         eventId2EventMap.put(eventId, new EventWaitingForAck(connector, 
event));
@@ -410,13 +459,10 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
         synchronized (server) {
           final PriorityBlockingQueue<EventWaitingForTransfer> queue =
               eventsWaitingForTransfer.get(pipeName);
-          if (queue == null) {
+          if (queue == null || isDroppedPipe(event)) {
             LOGGER.warn(
                 "The pipe {} was dropped so the event {} will be dropped.", 
pipeName, eventId);
-            if (event instanceof EnrichedEvent) {
-              ((EnrichedEvent) event)
-                  
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
-            }
+            discardEvent(event);
             return;
           }
 
@@ -465,4 +511,49 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
       this.event = event;
     }
   }
+
+  private boolean discardIfMatches(
+      final Event event,
+      final String pipeNameToDrop,
+      final long creationTimeToDrop,
+      final int regionId) {
+    if (!(event instanceof EnrichedEvent)) {
+      return false;
+    }
+
+    final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
+    if (!pipeNameToDrop.equals(enrichedEvent.getPipeName())
+        || creationTimeToDrop != enrichedEvent.getCreationTime()
+        || regionId != enrichedEvent.getRegionId()) {
+      return false;
+    }
+
+    discardEvent(enrichedEvent);
+    return true;
+  }
+
+  private boolean isDroppedPipe(final Event event) {
+    return event instanceof EnrichedEvent
+        && droppedPipeTaskKeys.contains(
+            generatePipeTaskKey(
+                ((EnrichedEvent) event).getPipeName(),
+                ((EnrichedEvent) event).getCreationTime(),
+                ((EnrichedEvent) event).getRegionId()));
+  }
+
+  private boolean isQueueAvailable(
+      final String pipeName, final 
PriorityBlockingQueue<EventWaitingForTransfer> queue) {
+    return eventsWaitingForTransfer.get(pipeName) == queue;
+  }
+
+  private static String generatePipeTaskKey(
+      final String pipeName, final long creationTime, final int regionId) {
+    return pipeName + "_" + creationTime + "_" + regionId;
+  }
+
+  private void discardEvent(final Event event) {
+    if (event instanceof EnrichedEvent) {
+      ((EnrichedEvent) 
event).clearReferenceCount(WebSocketSink.class.getName());
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
index 4e593e673fd..dadeee8053d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.sink.protocol.websocket;
 
+import 
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
@@ -41,7 +42,7 @@ import java.util.Arrays;
 import java.util.Optional;
 
 @TreeModel
-public class WebSocketSink implements PipeConnector {
+public class WebSocketSink implements PipeConnector, 
PipeConnectorWithEventDiscard {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WebSocketSink.class);
 
@@ -166,6 +167,14 @@ public class WebSocketSink implements PipeConnector {
     }
   }
 
+  @Override
+  public void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    if (server != null) {
+      server.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
+    }
+  }
+
   public void commit(EnrichedEvent enrichedEvent) {
     Optional.ofNullable(enrichedEvent)
         .ifPresent(event -> 
event.decreaseReferenceCount(WebSocketSink.class.getName(), true));
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
new file mode 100644
index 00000000000..ddfc699721b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
+
+import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import 
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
+import org.apache.iotdb.pipe.api.PipeConnector;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.withSettings;
+
+public class PipeSinkSubtaskTest {
+
+  @Test
+  public void testDiscardEventsOfPipeDelegatesToConnector() {
+    final PipeConnector connector =
+        mock(
+            PipeConnector.class,
+            
withSettings().extraInterfaces(PipeConnectorWithEventDiscard.class));
+    final UnboundedBlockingPendingQueue<?> pendingQueue = 
mock(UnboundedBlockingPendingQueue.class);
+
+    final PipeSinkSubtask subtask =
+        Mockito.spy(
+            new PipeSinkSubtask(
+                "PipeSinkSubtaskTest",
+                System.currentTimeMillis(),
+                "data_test",
+                0,
+                (UnboundedBlockingPendingQueue) pendingQueue,
+                connector));
+
+    try {
+      subtask.discardEventsOfPipe("pipe", 1L, 1);
+
+      verify((PipeConnectorWithEventDiscard) 
connector).discardEventsOfPipe("pipe", 1L, 1);
+    } finally {
+      subtask.close();
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
index 7dfd0446038..3ad262130ff 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
@@ -29,6 +29,8 @@ import 
org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink;
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
+import 
org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketConnectorServer;
+import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketSink;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -39,6 +41,7 @@ import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.security.SecureRandom;
 import java.util.Arrays;
@@ -146,6 +149,45 @@ public class PipeSinkTest {
     }
   }
 
+  @Test
+  public void testWebSocketSinkDropDoesNotRequeueDroppedPipeEvents() {
+    final String pipeName = "pipe_" + System.nanoTime();
+    final WebSocketConnectorServer server = 
WebSocketConnectorServer.getOrCreateInstance(0);
+    final WebSocketSink connector = Mockito.mock(WebSocketSink.class);
+    Mockito.when(connector.getPipeName()).thenReturn(pipeName);
+
+    server.register(connector);
+    try {
+      final PipeRawTabletInsertionEvent droppedEvent =
+          createPipeRawTabletInsertionEvent(pipeName, 1L, 1);
+      droppedEvent.increaseReferenceCount(WebSocketSink.class.getName());
+      droppedEvent.setCommitterKeyAndCommitId(new CommitterKey(pipeName, 1L, 
1, -1), 1L);
+      server.addEvent(droppedEvent, connector);
+
+      server.discardEventsOfPipe(pipeName, 1L, 1);
+      Assert.assertTrue(droppedEvent.isReleased());
+
+      final PipeRawTabletInsertionEvent recreatedDroppedPipeEvent =
+          createPipeRawTabletInsertionEvent(pipeName, 1L, 1);
+      
recreatedDroppedPipeEvent.increaseReferenceCount(WebSocketSink.class.getName());
+      recreatedDroppedPipeEvent.setCommitterKeyAndCommitId(
+          new CommitterKey(pipeName, 1L, 1, -1), 2L);
+      server.addEvent(recreatedDroppedPipeEvent, connector);
+
+      Assert.assertTrue(recreatedDroppedPipeEvent.isReleased());
+
+      final PipeRawTabletInsertionEvent recreatedPipeEvent =
+          createPipeRawTabletInsertionEvent(pipeName, 2L, 1);
+      recreatedPipeEvent.increaseReferenceCount(WebSocketSink.class.getName());
+      recreatedPipeEvent.setCommitterKeyAndCommitId(new CommitterKey(pipeName, 
2L, 1, -1), 3L);
+      server.addEvent(recreatedPipeEvent, connector);
+
+      Assert.assertFalse(recreatedPipeEvent.isReleased());
+    } finally {
+      server.unregister(connector);
+    }
+  }
+
   @Test
   public void testOpcUaSink() {
     final List<IMeasurementSchema> schemaList =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index 7de06376b6d..ebb3d90bc8c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -148,7 +148,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SIN
 
 @TreeModel
 @TableModel
-public abstract class IoTDBSink implements PipeConnector {
+public abstract class IoTDBSink implements PipeConnector, 
PipeConnectorWithEventDiscard {
 
   private static final String PARSE_URL_ERROR_FORMATTER =
       "Exception occurred while parsing node urls from target servers: {}";
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
new file mode 100644
index 00000000000..ab4dbcf9075
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.protocol;
+
+public interface PipeConnectorWithEventDiscard {
+
+  void discardEventsOfPipe(String pipeName, long creationTime, int regionId);
+}

Reply via email to