This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch drop-pipe-fix in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d8114969d0a7601c3697f09cfedc74523c8fcfe0 Author: Caideyipi <[email protected]> AuthorDate: Mon Apr 27 18:50:05 2026 +0800 fix --- .../db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java | 2 +- .../evolvable/batch/PipeTransferBatchReqBuilder.java | 5 +++-- .../sink/protocol/websocket/WebSocketConnectorServer.java | 13 +++++++------ .../db/pipe/sink/protocol/websocket/WebSocketSink.java | 2 +- .../task/subtask/SubscriptionSinkSubtaskLifeCycle.java | 4 +--- .../pipe/agent/task/connection/PipeEventCollectorTest.java | 12 +++++++++++- .../java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java | 12 +++++++++++- .../pipe/agent/task/connection/BlockingPendingQueue.java | 5 ++--- 8 files changed, 37 insertions(+), 18 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index dd31d5b5fd8..a11a1a68f0c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -25,8 +25,8 @@ import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink; -import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java index 5bb76ae40e5..49d9d8cea09 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java @@ -199,8 +199,9 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable { public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); - endPointToBatch.values().forEach( - batch -> batch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId)); + endPointToBatch + .values() + .forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId)); } public int size() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java index 6b402b27026..4706eb0275a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java @@ -125,8 +125,7 @@ public class WebSocketConnectorServer extends WebSocketServer { if (eventTransferQueue != null) { eventTransferQueue.removeIf( eventWrapper -> - discardIfMatches( - eventWrapper.event, pipeNameToDrop, creationTimeToDrop, regionId)); + discardIfMatches(eventWrapper.event, pipeNameToDrop, creationTimeToDrop, regionId)); synchronized (eventTransferQueue) { eventTransferQueue.notifyAll(); } @@ -135,10 +134,12 @@ public class WebSocketConnectorServer extends WebSocketServer { final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap = eventsWaitingForAck.get(pipeNameToDrop); if (eventId2EventMap != null) { - eventId2EventMap.entrySet().removeIf( - entry -> - discardIfMatches( - entry.getValue().event, pipeNameToDrop, creationTimeToDrop, regionId)); + eventId2EventMap + .entrySet() + .removeIf( + entry -> + discardIfMatches( + entry.getValue().event, pipeNameToDrop, creationTimeToDrop, regionId)); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java index dadeee8053d..e8a38d63e6e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java @@ -19,9 +19,9 @@ package org.apache.iotdb.db.pipe.sink.protocol.websocket; -import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java index 390a6d58018..af871feaa7e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java @@ -64,9 +64,7 @@ public class SubscriptionSinkSubtaskLifeCycle extends PipeSinkSubtaskLifeCycle { @Override public synchronized boolean deregister( - final String pipeNameToDeregister, - final long creationTimeToDeregister, - final int regionId) { + final String pipeNameToDeregister, final long creationTimeToDeregister, final int regionId) { if (registeredTaskCount <= 0) { throw new IllegalStateException("registeredTaskCount <= 0"); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java index bbd5e0b5e3d..029a722c8a9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java @@ -82,6 +82,16 @@ public class PipeEventCollectorTest { tablet.addTimestamp(0, 1L); tablet.addValue("s1", 0, 1L); return new PipeRawTabletInsertionEvent( - false, "root.db", "db", "root.db", tablet, false, pipeName, creationTime, null, null, false); + false, + "root.db", + "db", + "root.db", + tablet, + false, + pipeName, + creationTime, + null, + null, + false); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java index 3ad262130ff..cf311639ee9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java @@ -287,6 +287,16 @@ public class PipeSinkTest { tablet.addTimestamp(0, 1L); tablet.addValue("s1", 0, 1L); return new PipeRawTabletInsertionEvent( - false, "root.db", "db", "root.db", tablet, false, pipeName, creationTime, null, null, false); + false, + "root.db", + "db", + "root.db", + tablet, + false, + pipeName, + creationTime, + null, + null, + false); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java index 7080a2fe6f9..adbc79d5004 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java @@ -27,11 +27,11 @@ import org.apache.iotdb.pipe.api.event.Event; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.Set; import java.util.function.Consumer; public abstract class BlockingPendingQueue<E extends Event> { @@ -196,8 +196,7 @@ public abstract class BlockingPendingQueue<E extends Event> { ((EnrichedEvent) event).getRegionId()); } - public boolean isPipeDropped( - final String pipeName, final long creationTime, final int regionId) { + public boolean isPipeDropped(final String pipeName, final long creationTime, final int regionId) { return droppedPipeTaskKeys.contains(generatePipeTaskKey(pipeName, creationTime, regionId)); }
