This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch drop-pipe-fix in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4ec94cd7bf7668a7bbc57ffb14ae68317f8832a6 Author: Caideyipi <[email protected]> AuthorDate: Mon Apr 27 11:18:40 2026 +0800 drop-1 --- .../sink/PipeRealtimePriorityBlockingQueue.java | 9 ++-- .../agent/task/subtask/sink/PipeSinkSubtask.java | 10 ++-- .../subtask/sink/PipeSinkSubtaskLifeCycle.java | 5 +- .../task/subtask/sink/PipeSinkSubtaskManager.java | 2 +- .../evolvable/batch/PipeTabletEventBatch.java | 7 ++- .../batch/PipeTransferBatchReqBuilder.java | 8 ++-- .../thrift/async/IoTDBDataRegionAsyncSink.java | 51 ++++++++++++++++---- .../thrift/sync/IoTDBDataRegionSyncSink.java | 5 +- .../subtask/SubscriptionSinkSubtaskLifeCycle.java | 5 +- .../subtask/SubscriptionSinkSubtaskManager.java | 2 +- .../apache/iotdb/db/pipe/sink/PipeSinkTest.java | 53 +++++++++++++++++++++ .../opc_security/8443_12686/iotdb-server.pfx | Bin 0 -> 2942 bytes .../task/connection/BlockingPendingQueue.java | 17 +++++-- .../commons/pipe/sink/protocol/IoTDBSink.java | 3 +- 14 files changed, 145 insertions(+), 32 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java index 6d227ac31fd..3d553f73595 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java @@ -356,13 +356,14 @@ public class PipeRealtimePriorityBlockingQueue extends UnboundedBlockingPendingQ } @Override - public void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { - super.discardEventsOfPipe(pipeNameToDrop, regionId); + public void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); tsfileInsertEventDeque.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) - && regionId == ((EnrichedEvent) event).getRegionId()) { + && isEventFromPipe( + ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) { if (((EnrichedEvent) event) .clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) { eventCounter.decreaseEventCount(event); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 6e26a76d774..f6008822e61 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -198,9 +198,10 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { * When a pipe is dropped, the connector maybe reused and will not be closed. So we just discard * its queued events in the output pipe connector. */ - public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) { + public void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { // Try to remove the events as much as possible - inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId); + inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); try { increaseHighPriorityTaskCount(); @@ -214,6 +215,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { // will. if (lastEvent instanceof EnrichedEvent && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName()) + && creationTimeToDrop == ((EnrichedEvent) lastEvent).getCreationTime() && regionId == ((EnrichedEvent) lastEvent).getRegionId()) { // Do not clear the last event's reference counts because it may be on transferring lastEvent = null; @@ -237,6 +239,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { // "nonnull" detection. if (lastExceptionEvent instanceof EnrichedEvent && pipeNameToDrop.equals(((EnrichedEvent) lastExceptionEvent).getPipeName()) + && creationTimeToDrop == ((EnrichedEvent) lastExceptionEvent).getCreationTime() && regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) { clearReferenceCountAndReleaseLastExceptionEvent(); } @@ -246,7 +249,8 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { } if (outputPipeSink instanceof IoTDBSink) { - ((IoTDBSink) outputPipeSink).discardEventsOfPipe(pipeNameToDrop, regionId); + ((IoTDBSink) outputPipeSink) + .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java index 35f7983075d..1780f5a87ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java @@ -92,12 +92,13 @@ public class PipeSinkSubtaskLifeCycle implements AutoCloseable { * {@link PipeSinkSubtask} should never be used again * @throws IllegalStateException if {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0 */ - public synchronized boolean deregister(final String pipeNameToDeregister, int regionId) { + public synchronized boolean deregister( + final String pipeNameToDeregister, final long creationTimeToDeregister, final int regionId) { if (registeredTaskCount <= 0) { throw new IllegalStateException("registeredTaskCount <= 0"); } - subtask.discardEventsOfPipe(pipeNameToDeregister, regionId); + subtask.discardEventsOfPipe(pipeNameToDeregister, creationTimeToDeregister, regionId); try { if (registeredTaskCount > 1) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java index 9138a075918..4faa42db004 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java @@ -209,7 +209,7 @@ public class PipeSinkSubtaskManager { // Shall not be empty final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor; - lifeCycles.removeIf(o -> o.deregister(pipeName, regionId)); + lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId)); if (lifeCycles.isEmpty()) { attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java index 96bddd0d672..c44e12a4bbf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java @@ -154,10 +154,13 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { * Discard all events of the given pipe. This method only clears the reference count of the events * and discard them, but do not modify other objects (such as buffers) for simplicity. */ - public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { events.removeIf( event -> { - if (pipeNameToDrop.equals(event.getPipeName()) && regionId == event.getRegionId()) { + if (pipeNameToDrop.equals(event.getPipeName()) + && creationTimeToDrop == event.getCreationTime() + && regionId == event.getRegionId()) { event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); return true; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java index dd4d4fe1ce6..5bb76ae40e5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java @@ -196,9 +196,11 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable { && endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty); } - public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { - defaultBatch.discardEventsOfPipe(pipeNameToDrop, regionId); - endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, regionId)); + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + endPointToBatch.values().forEach( + batch -> batch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId)); } public int size() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index d19639310cf..c00e36e0c7c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -77,6 +77,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -126,6 +127,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { private final AtomicBoolean isClosed = new AtomicBoolean(false); private final Map<PipeTransferTrackableHandler, PipeTransferTrackableHandler> pendingHandlers = new ConcurrentHashMap<>(); + private final Set<String> droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); private boolean enableSendTsFileLimit; private volatile boolean isConnectionException; @@ -681,8 +683,15 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { public void addFailureEventToRetryQueue(final Event event, final Exception e) { isConnectionException = e instanceof PipeConnectionException || ThriftClient.isConnectionBroken(e); - if (event instanceof EnrichedEvent && ((EnrichedEvent) event).isReleased()) { - return; + if (event instanceof EnrichedEvent) { + final EnrichedEvent enrichedEvent = (EnrichedEvent) event; + if (enrichedEvent.isReleased()) { + return; + } + if (isDroppedPipe(enrichedEvent)) { + enrichedEvent.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); + return; + } } if (isClosed.get()) { @@ -728,15 +737,18 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { //////////////////////////// Operations for close //////////////////////////// @Override - public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { - if (isTabletBatchModeEnabled) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId); + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + droppedPipeTaskKeys.add(generatePipeTaskKey(pipeNameToDrop, creationTimeToDrop, regionId)); + + if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) { + tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); } retryEventQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) - && regionId == ((EnrichedEvent) event).getRegionId()) { + && isDroppedPipe( + (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) { ((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); retryEventQueueEventCounter.decreaseEventCount(event); return true; @@ -747,8 +759,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { retryTsFileQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) - && regionId == ((EnrichedEvent) event).getRegionId()) { + && isDroppedPipe( + (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) { ((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); retryEventQueueEventCounter.decreaseEventCount(event); return true; @@ -792,6 +804,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { // clear reference count of events in retry queue after closing async client clearRetryEventsReferenceCount(); + droppedPipeTaskKeys.clear(); super.close(); } @@ -848,6 +861,26 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { this.transferTsFileCounter = transferTsFileCounter; } + private boolean isDroppedPipe(final EnrichedEvent event) { + return droppedPipeTaskKeys.contains( + generatePipeTaskKey(event.getPipeName(), event.getCreationTime(), event.getRegionId())); + } + + private static boolean isDroppedPipe( + final EnrichedEvent event, + final String pipeNameToDrop, + final long creationTimeToDrop, + final int regionId) { + return pipeNameToDrop.equals(event.getPipeName()) + && creationTimeToDrop == event.getCreationTime() + && regionId == event.getRegionId(); + } + + private static String generatePipeTaskKey( + final String pipeName, final long creationTime, final int regionId) { + return pipeName + "_" + creationTime + "_" + regionId; + } + @Override public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) { if (tabletBatchBuilder != null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index 9357a8c6a6d..e8c4420861c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -599,9 +599,10 @@ public class IoTDBDataRegionSyncSink extends IoTDBDataNodeSyncSink { } @Override - public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { if (Objects.nonNull(tabletBatchBuilder)) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId); + tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java index 98163697374..390a6d58018 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java @@ -63,7 +63,10 @@ public class SubscriptionSinkSubtaskLifeCycle extends PipeSinkSubtaskLifeCycle { } @Override - public synchronized boolean deregister(final String pipeNameToDeregister, int regionId) { + public synchronized boolean deregister( + final String pipeNameToDeregister, + final long creationTimeToDeregister, + final int regionId) { if (registeredTaskCount <= 0) { throw new IllegalStateException("registeredTaskCount <= 0"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java index 6d5f27d8172..16a9ee1a03d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java @@ -168,7 +168,7 @@ public class SubscriptionSinkSubtaskManager { final PipeSinkSubtaskLifeCycle lifeCycle = attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString); - if (lifeCycle.deregister(pipeName, regionId)) { + if (lifeCycle.deregister(pipeName, creationTime, regionId)) { attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java index ec2122b9175..7dfd0446038 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.sink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment; @@ -30,6 +31,7 @@ import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncS import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.write.record.Tablet; @@ -104,6 +106,46 @@ public class PipeSinkTest { } } + @Test + public void testAsyncSinkDropDoesNotRequeueDroppedPipeEvents() throws Exception { + try (final IoTDBDataRegionAsyncSink connector = new IoTDBDataRegionAsyncSink()) { + final PipeParameters parameters = + new PipeParameters( + new HashMap<String, String>() { + { + put( + PipeSinkConstant.CONNECTOR_KEY, + BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName()); + put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6668"); + } + }); + connector.validate(new PipeParameterValidator(parameters)); + connector.customize( + parameters, + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1))); + + final PipeRawTabletInsertionEvent droppedEvent = + createPipeRawTabletInsertionEvent("pipe", 1L, 1); + droppedEvent.increaseReferenceCount("test"); + droppedEvent.setCommitterKeyAndCommitId(new CommitterKey("pipe", 1L, 1, -1), 1L); + + connector.discardEventsOfPipe("pipe", 1L, 1); + connector.addFailureEventToRetryQueue(droppedEvent, new PipeException("test")); + + Assert.assertEquals(0, connector.getRetryEventQueueSize()); + Assert.assertTrue(droppedEvent.isReleased()); + + final PipeRawTabletInsertionEvent recreatedPipeEvent = + createPipeRawTabletInsertionEvent("pipe", 2L, 1); + recreatedPipeEvent.increaseReferenceCount("test"); + recreatedPipeEvent.setCommitterKeyAndCommitId(new CommitterKey("pipe", 2L, 1, -1), 1L); + + connector.addFailureEventToRetryQueue(recreatedPipeEvent, new PipeException("test")); + + Assert.assertEquals(1, connector.getRetryEventQueueSize()); + } + } + @Test public void testOpcUaSink() { final List<IMeasurementSchema> schemaList = @@ -194,4 +236,15 @@ public class PipeSinkTest { Assert.fail(); } } + + private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent( + final String pipeName, final long creationTime, final int regionId) { + final List<IMeasurementSchema> schemaList = + Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64)); + final Tablet tablet = new Tablet("root.db.d" + regionId, schemaList, 1); + tablet.addTimestamp(0, 1L); + tablet.addValue("s1", 0, 1L); + return new PipeRawTabletInsertionEvent( + false, "root.db", "db", "root.db", tablet, false, pipeName, creationTime, null, null, false); + } } diff --git a/iotdb-core/datanode/src/test/resources/opc_security/8443_12686/iotdb-server.pfx b/iotdb-core/datanode/src/test/resources/opc_security/8443_12686/iotdb-server.pfx new file mode 100644 index 00000000000..b36d23da368 Binary files /dev/null and b/iotdb-core/datanode/src/test/resources/opc_security/8443_12686/iotdb-server.pfx differ diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java index 8773b03f9f3..b3b796ab6d8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java @@ -122,12 +122,13 @@ public abstract class BlockingPendingQueue<E extends Event> { eventCounter.reset(); } - public void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { + public void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { pendingQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) - && regionId == ((EnrichedEvent) event).getRegionId()) { + && isEventFromPipe( + ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) { if (((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName())) { eventCounter.decreaseEventCount(event); } @@ -162,4 +163,14 @@ public abstract class BlockingPendingQueue<E extends Event> { ((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName()); } } + + protected static boolean isEventFromPipe( + final EnrichedEvent event, + final String pipeNameToDrop, + final long creationTimeToDrop, + final int regionId) { + return pipeNameToDrop.equals(event.getPipeName()) + && creationTimeToDrop == event.getCreationTime() + && regionId == event.getRegionId(); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java index 88a8b71775f..7de06376b6d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java @@ -641,7 +641,8 @@ public abstract class IoTDBSink implements PipeConnector { * When a pipe is dropped, the connector maybe reused and will not be closed. We need to discard * its batched or queued events in the output pipe connector. */ - public synchronized void discardEventsOfPipe(final String pipeName, final int regionId) { + public synchronized void discardEventsOfPipe( + final String pipeName, final long creationTime, final int regionId) { // Do nothing by default }
