This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch drop-pipe-fix in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a3b7ea0bcd4173a562ce1f9d50c5728f79171db6 Author: Caideyipi <[email protected]> AuthorDate: Mon Apr 27 14:28:36 2026 +0800 drop --- .../agent/task/connection/PipeEventCollector.java | 18 +++-- .../sink/PipeRealtimePriorityBlockingQueue.java | 4 +- .../task/connection/PipeEventCollectorTest.java | 87 ++++++++++++++++++++++ .../task/connection/BlockingPendingQueue.java | 40 +++++++++- 4 files changed, 139 insertions(+), 10 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java index a22848ee3ba..b000c5d2366 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java @@ -220,7 +220,8 @@ public class PipeEventCollector implements EventCollector { private void collectEvent(final Event event) { if (event instanceof EnrichedEvent) { - if (!((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName())) { + final EnrichedEvent enrichedEvent = (EnrichedEvent) event; + if (!enrichedEvent.increaseReferenceCount(PipeEventCollector.class.getName())) { LOGGER.warn("PipeEventCollector: The event {} is already released, skipping it.", event); isFailedToIncreaseReferenceCount = true; return; @@ -228,18 +229,25 @@ public class PipeEventCollector implements EventCollector { // Assign a commit id for this event in order to report progress in order. PipeEventCommitManager.getInstance() - .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, creationTime, regionId); + .enrichWithCommitterKeyAndCommitId(enrichedEvent, creationTime, regionId); // Assign a rebootTime for iotConsensusV2 - ((EnrichedEvent) event).setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes()); + enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes()); + + if (enrichedEvent.getPipeName() != null + && pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), creationTime, regionId)) { + enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName()); + return; + } } if (event instanceof PipeHeartbeatEvent) { ((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue); } - pendingQueue.offer(event); - collectInvocationCount.incrementAndGet(); + if (pendingQueue.offer(event)) { + collectInvocationCount.incrementAndGet(); + } } public void resetFlags() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java index 3d553f73595..f972bba0e6e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java @@ -73,7 +73,9 @@ public class PipeRealtimePriorityBlockingQueue extends UnboundedBlockingPendingQ @Override public boolean offer(final Event event) { - checkBeforeOffer(event); + if (!checkBeforeOffer(event)) { + return false; + } if (event instanceof TsFileInsertionEvent) { tsfileInsertEventDeque.add((TsFileInsertionEvent) event); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java new file mode 100644 index 00000000000..bbd5e0b5e3d --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.agent.task.connection; + +import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeRealtimePriorityBlockingQueue; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter; +import org.apache.iotdb.pipe.api.event.Event; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class PipeEventCollectorTest { + + @Test + public void testCollectorDoesNotOfferEventsOfDroppedPipeToUnboundedPendingQueue() { + verifyCollectorDoesNotOfferEventsOfDroppedPipe( + new UnboundedBlockingPendingQueue<>(new PipeDataRegionEventCounter())); + } + + @Test + public void testCollectorDoesNotOfferEventsOfDroppedPipeToRealtimePendingQueue() { + verifyCollectorDoesNotOfferEventsOfDroppedPipe(new PipeRealtimePriorityBlockingQueue()); + } + + private void verifyCollectorDoesNotOfferEventsOfDroppedPipe( + final UnboundedBlockingPendingQueue<Event> pendingQueue) { + pendingQueue.discardEventsOfPipe("pipe", 1L, 1); + + final PipeEventCollector droppedPipeCollector = + new PipeEventCollector(pendingQueue, 1L, 1, false, false, false); + final PipeRawTabletInsertionEvent droppedPipeEvent = + createPipeRawTabletInsertionEvent("pipe", 1L); + droppedPipeCollector.collect(droppedPipeEvent); + + Assert.assertTrue(droppedPipeEvent.isReleased()); + Assert.assertEquals(0, pendingQueue.size()); + + final PipeEventCollector recreatedPipeCollector = + new PipeEventCollector(pendingQueue, 2L, 1, false, false, false); + final PipeRawTabletInsertionEvent recreatedPipeEvent = + createPipeRawTabletInsertionEvent("pipe", 2L); + recreatedPipeCollector.collect(recreatedPipeEvent); + + Assert.assertFalse(recreatedPipeEvent.isReleased()); + Assert.assertEquals(1, pendingQueue.size()); + + pendingQueue.discardAllEvents(); + Assert.assertTrue(recreatedPipeEvent.isReleased()); + } + + private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent( + final String pipeName, final long creationTime) { + final List<IMeasurementSchema> schemaList = + Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64)); + final Tablet tablet = new Tablet("root.db.d1", schemaList, 1); + tablet.addTimestamp(0, 1L); + tablet.addValue("s1", 0, 1L); + return new PipeRawTabletInsertionEvent( + false, "root.db", "db", "root.db", tablet, false, pipeName, creationTime, null, null, false); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java index b3b796ab6d8..7080a2fe6f9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java @@ -28,8 +28,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Set; import java.util.function.Consumer; public abstract class BlockingPendingQueue<E extends Event> { @@ -43,6 +45,7 @@ public abstract class BlockingPendingQueue<E extends Event> { protected final PipeEventCounter eventCounter; protected final AtomicBoolean isClosed = new AtomicBoolean(false); + protected final Set<String> droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); protected BlockingPendingQueue( final BlockingQueue<E> pendingQueue, final PipeEventCounter eventCounter) { @@ -51,7 +54,10 @@ public abstract class BlockingPendingQueue<E extends Event> { } public boolean offer(final E event) { - checkBeforeOffer(event); + if (!checkBeforeOffer(event)) { + return false; + } + final boolean offered = pendingQueue.offer(event); if (offered) { eventCounter.increaseEventCount(event); @@ -60,7 +66,9 @@ public abstract class BlockingPendingQueue<E extends Event> { } public boolean put(final E event) { - checkBeforeOffer(event); + if (!checkBeforeOffer(event)) { + return false; + } try { pendingQueue.put(event); eventCounter.increaseEventCount(event); @@ -101,6 +109,7 @@ public abstract class BlockingPendingQueue<E extends Event> { isClosed.set(true); pendingQueue.clear(); eventCounter.reset(); + droppedPipeTaskKeys.clear(); } /** DO NOT FORGET to set eventCounter to new value after invoking this method. */ @@ -120,10 +129,12 @@ public abstract class BlockingPendingQueue<E extends Event> { return true; }); eventCounter.reset(); + droppedPipeTaskKeys.clear(); } public void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + droppedPipeTaskKeys.add(generatePipeTaskKey(pipeNameToDrop, creationTimeToDrop, regionId)); pendingQueue.removeIf( event -> { if (event instanceof EnrichedEvent @@ -158,10 +169,12 @@ public abstract class BlockingPendingQueue<E extends Event> { return eventCounter.getPipeHeartbeatEventCount(); } - protected void checkBeforeOffer(final E event) { - if (isClosed.get() && event instanceof EnrichedEvent) { + protected boolean checkBeforeOffer(final E event) { + final boolean shouldReject = isClosed.get() || isEventFromDroppedPipe(event); + if (shouldReject && event instanceof EnrichedEvent) { ((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName()); } + return !shouldReject; } protected static boolean isEventFromPipe( @@ -173,4 +186,23 @@ public abstract class BlockingPendingQueue<E extends Event> { && creationTimeToDrop == event.getCreationTime() && regionId == event.getRegionId(); } + + protected boolean isEventFromDroppedPipe(final E event) { + return event instanceof EnrichedEvent + && ((EnrichedEvent) event).getPipeName() != null + && isPipeDropped( + ((EnrichedEvent) event).getPipeName(), + ((EnrichedEvent) event).getCreationTime(), + ((EnrichedEvent) event).getRegionId()); + } + + public boolean isPipeDropped( + final String pipeName, final long creationTime, final int regionId) { + return droppedPipeTaskKeys.contains(generatePipeTaskKey(pipeName, creationTime, regionId)); + } + + private static String generatePipeTaskKey( + final String pipeName, final long creationTime, final int regionId) { + return pipeName + "_" + creationTime + "_" + regionId; + } }
