This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch Fix-drop in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b1dd7ea0363f47113e647a3228c06c88d38f3eef Author: Caideyipi <[email protected]> AuthorDate: Fri May 22 16:44:16 2026 +0800 fix --- .../agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java | 3 ++- .../iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java | 3 +-- .../db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java | 3 +-- .../pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java | 3 +-- .../db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java | 3 +-- .../commons/pipe/agent/task/connection/BlockingPendingQueue.java | 6 +++--- 6 files changed, 9 insertions(+), 12 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java index bea5fcfe854..8641dbc7867 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java @@ -364,7 +364,8 @@ public class PipeRealtimePriorityBlockingQueue extends UnboundedBlockingPendingQ super.discardEventsOfPipe(committerKey); tsfileInsertEventDeque.removeIf( event -> { - if (event instanceof EnrichedEvent && isEventFromPipe((EnrichedEvent) event, committerKey)) { + if (event instanceof EnrichedEvent + && isEventFromPipe((EnrichedEvent) event, committerKey)) { if (((EnrichedEvent) event) .clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) { eventCounter.decreaseEventCount(event); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index b633efe3129..90d325f6d23 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -257,8 +257,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { return committerKey.getPipeName().equals(event.getPipeName()) && committerKey.getCreationTime() == event.getCreationTime() && committerKey.getRegionId() == event.getRegionId() - && (committerKey.getRestartTimes() < 0 - || committerKey.equals(event.getCommitterKey())); + && (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey())); } //////////////////////////// APIs provided for metric framework //////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java index 4e2189dd5a1..aede0e994d9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java @@ -177,8 +177,7 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { return committerKey.getPipeName().equals(event.getPipeName()) && committerKey.getCreationTime() == event.getCreationTime() && committerKey.getRegionId() == event.getRegionId() - && (committerKey.getRestartTimes() < 0 - || committerKey.equals(event.getCommitterKey())); + && (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey())); } public synchronized void decreaseEventsReferenceCount( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index b542627f942..32a2c191048 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -880,8 +880,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { return committerKey.getPipeName().equals(event.getPipeName()) && committerKey.getCreationTime() == event.getCreationTime() && committerKey.getRegionId() == event.getRegionId() - && (committerKey.getRestartTimes() < 0 - || committerKey.equals(event.getCommitterKey())); + && (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey())); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java index 3702da3501a..baddf4727d6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java @@ -529,8 +529,7 @@ public class WebSocketConnectorServer extends WebSocketServer { return committerKey.getPipeName().equals(event.getPipeName()) && committerKey.getCreationTime() == event.getCreationTime() && committerKey.getRegionId() == event.getRegionId() - && (committerKey.getRestartTimes() < 0 - || committerKey.equals(event.getCommitterKey())); + && (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey())); } private boolean isQueueAvailable( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java index 9ae5f3873dd..c430e3f6b06 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java @@ -144,7 +144,8 @@ public abstract class BlockingPendingQueue<E extends Event> { droppedPipeTaskKeys.add(committerKey); pendingQueue.removeIf( event -> { - if (event instanceof EnrichedEvent && isEventFromPipe((EnrichedEvent) event, committerKey)) { + if (event instanceof EnrichedEvent + && isEventFromPipe((EnrichedEvent) event, committerKey)) { if (((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName())) { eventCounter.decreaseEventCount(event); } @@ -197,8 +198,7 @@ public abstract class BlockingPendingQueue<E extends Event> { return committerKey.getPipeName().equals(event.getPipeName()) && committerKey.getCreationTime() == event.getCreationTime() && committerKey.getRegionId() == event.getRegionId() - && (committerKey.getRestartTimes() < 0 - || committerKey.equals(event.getCommitterKey())); + && (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey())); } protected boolean isEventFromDroppedPipe(final E event) {
