This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch drop-pipe-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d8114969d0a7601c3697f09cfedc74523c8fcfe0
Author: Caideyipi <[email protected]>
AuthorDate: Mon Apr 27 18:50:05 2026 +0800

    fix
---
 .../db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java    |  2 +-
 .../evolvable/batch/PipeTransferBatchReqBuilder.java        |  5 +++--
 .../sink/protocol/websocket/WebSocketConnectorServer.java   | 13 +++++++------
 .../db/pipe/sink/protocol/websocket/WebSocketSink.java      |  2 +-
 .../task/subtask/SubscriptionSinkSubtaskLifeCycle.java      |  4 +---
 .../pipe/agent/task/connection/PipeEventCollectorTest.java  | 12 +++++++++++-
 .../java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java    | 12 +++++++++++-
 .../pipe/agent/task/connection/BlockingPendingQueue.java    |  5 ++---
 8 files changed, 37 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index dd31d5b5fd8..a11a1a68f0c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -25,8 +25,8 @@ import 
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
-import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
 import 
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
+import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 5bb76ae40e5..49d9d8cea09 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -199,8 +199,9 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
   public synchronized void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
     defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, 
regionId);
-    endPointToBatch.values().forEach(
-        batch -> batch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, 
regionId));
+    endPointToBatch
+        .values()
+        .forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, 
creationTimeToDrop, regionId));
   }
 
   public int size() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
index 6b402b27026..4706eb0275a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
@@ -125,8 +125,7 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
     if (eventTransferQueue != null) {
       eventTransferQueue.removeIf(
           eventWrapper ->
-              discardIfMatches(
-                  eventWrapper.event, pipeNameToDrop, creationTimeToDrop, 
regionId));
+              discardIfMatches(eventWrapper.event, pipeNameToDrop, 
creationTimeToDrop, regionId));
       synchronized (eventTransferQueue) {
         eventTransferQueue.notifyAll();
       }
@@ -135,10 +134,12 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
     final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap =
         eventsWaitingForAck.get(pipeNameToDrop);
     if (eventId2EventMap != null) {
-      eventId2EventMap.entrySet().removeIf(
-          entry ->
-              discardIfMatches(
-                  entry.getValue().event, pipeNameToDrop, creationTimeToDrop, 
regionId));
+      eventId2EventMap
+          .entrySet()
+          .removeIf(
+              entry ->
+                  discardIfMatches(
+                      entry.getValue().event, pipeNameToDrop, 
creationTimeToDrop, regionId));
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
index dadeee8053d..e8a38d63e6e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
@@ -19,9 +19,9 @@
 
 package org.apache.iotdb.db.pipe.sink.protocol.websocket;
 
-import 
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import 
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
index 390a6d58018..af871feaa7e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
@@ -64,9 +64,7 @@ public class SubscriptionSinkSubtaskLifeCycle extends 
PipeSinkSubtaskLifeCycle {
 
   @Override
   public synchronized boolean deregister(
-      final String pipeNameToDeregister,
-      final long creationTimeToDeregister,
-      final int regionId) {
+      final String pipeNameToDeregister, final long creationTimeToDeregister, 
final int regionId) {
     if (registeredTaskCount <= 0) {
       throw new IllegalStateException("registeredTaskCount <= 0");
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java
index bbd5e0b5e3d..029a722c8a9 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java
@@ -82,6 +82,16 @@ public class PipeEventCollectorTest {
     tablet.addTimestamp(0, 1L);
     tablet.addValue("s1", 0, 1L);
     return new PipeRawTabletInsertionEvent(
-        false, "root.db", "db", "root.db", tablet, false, pipeName, 
creationTime, null, null, false);
+        false,
+        "root.db",
+        "db",
+        "root.db",
+        tablet,
+        false,
+        pipeName,
+        creationTime,
+        null,
+        null,
+        false);
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
index 3ad262130ff..cf311639ee9 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
@@ -287,6 +287,16 @@ public class PipeSinkTest {
     tablet.addTimestamp(0, 1L);
     tablet.addValue("s1", 0, 1L);
     return new PipeRawTabletInsertionEvent(
-        false, "root.db", "db", "root.db", tablet, false, pipeName, 
creationTime, null, null, false);
+        false,
+        "root.db",
+        "db",
+        "root.db",
+        tablet,
+        false,
+        pipeName,
+        creationTime,
+        null,
+        null,
+        false);
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
index 7080a2fe6f9..adbc79d5004 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
@@ -27,11 +27,11 @@ import org.apache.iotdb.pipe.api.event.Event;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.Set;
 import java.util.function.Consumer;
 
 public abstract class BlockingPendingQueue<E extends Event> {
@@ -196,8 +196,7 @@ public abstract class BlockingPendingQueue<E extends Event> 
{
             ((EnrichedEvent) event).getRegionId());
   }
 
-  public boolean isPipeDropped(
-      final String pipeName, final long creationTime, final int regionId) {
+  public boolean isPipeDropped(final String pipeName, final long creationTime, 
final int regionId) {
     return droppedPipeTaskKeys.contains(generatePipeTaskKey(pipeName, 
creationTime, regionId));
   }
 

Reply via email to