This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch Fix-drop in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3f4ddd2febd37e6e5a58e9f28f6b5ab9cbce7d5e Author: Caideyipi <[email protected]> AuthorDate: Fri May 22 16:43:17 2026 +0800 Fix --- .../agent/task/connection/PipeEventCollector.java | 2 +- .../sink/PipeRealtimePriorityBlockingQueue.java | 11 +++-- .../agent/task/subtask/sink/PipeSinkSubtask.java | 26 ++++++----- .../subtask/sink/PipeSinkSubtaskLifeCycle.java | 9 ++-- .../task/subtask/sink/PipeSinkSubtaskManager.java | 6 ++- .../evolvable/batch/PipeTabletEventBatch.java | 18 ++++++-- .../batch/PipeTransferBatchReqBuilder.java | 11 +++-- .../protocol/airgap/IoTDBDataRegionAirGapSink.java | 8 +++- .../thrift/async/IoTDBDataRegionAsyncSink.java | 42 +++++++++-------- .../thrift/sync/IoTDBDataRegionSyncSink.java | 8 +++- .../websocket/WebSocketConnectorServer.java | 52 +++++++++++----------- .../sink/protocol/websocket/WebSocketSink.java | 8 ++++ .../task/connection/BlockingPendingQueue.java | 41 +++++++++++------ .../task/progress/PipeEventCommitManager.java | 5 +++ .../protocol/PipeConnectorWithEventDiscard.java | 7 +++ 15 files changed, 161 insertions(+), 93 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java index ad44b78042a..c46d4a71343 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java @@ -238,7 +238,7 @@ public class PipeEventCollector implements EventCollector { enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes()); if (enrichedEvent.getPipeName() != null - && pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), creationTime, regionId)) { + && pendingQueue.isEventFromDroppedPipe(enrichedEvent)) { enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName()); return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java index 4b65746b3ab..bea5fcfe854 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java @@ -356,12 +356,15 @@ public class PipeRealtimePriorityBlockingQueue extends UnboundedBlockingPendingQ @Override public void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { - super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + @Override + public void discardEventsOfPipe(final CommitterKey committerKey) { + super.discardEventsOfPipe(committerKey); tsfileInsertEventDeque.removeIf( event -> { - if (event instanceof EnrichedEvent - && isEventFromPipe( - ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) { + if (event instanceof EnrichedEvent && isEventFromPipe((EnrichedEvent) event, committerKey)) { if (((EnrichedEvent) event) .clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) { eventCounter.decreaseEventCount(event); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 560512521e7..b633efe3129 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.agent.task.subtask.sink; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; @@ -201,10 +202,9 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { * When a pipe is dropped, the connector maybe reused and will not be closed. So we just discard * its queued events in the output pipe connector. */ - public void discardEventsOfPipe( - final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + public void discardEventsOfPipe(final CommitterKey committerKey) { // Try to remove the events as much as possible - inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + inputPendingQueue.discardEventsOfPipe(committerKey); try { increaseHighPriorityTaskCount(); @@ -217,9 +217,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { // use a new thread to stop all the pipes, we will not encounter deadlock here. Or else we // will. if (lastEvent instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName()) - && creationTimeToDrop == ((EnrichedEvent) lastEvent).getCreationTime() - && regionId == ((EnrichedEvent) lastEvent).getRegionId()) { + && isEventFromPipe((EnrichedEvent) lastEvent, committerKey)) { // Do not clear the last event's reference counts because it may be on transferring lastEvent = null; // Submit self to avoid that the lastEvent has been retried "max times" times and has @@ -241,9 +239,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { // clear the lastExceptionEvent. It's safe to potentially clear it twice because we have the // "nonnull" detection. if (lastExceptionEvent instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) lastExceptionEvent).getPipeName()) - && creationTimeToDrop == ((EnrichedEvent) lastExceptionEvent).getCreationTime() - && regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) { + && isEventFromPipe((EnrichedEvent) lastExceptionEvent, committerKey)) { clearReferenceCountAndReleaseLastExceptionEvent(); } } @@ -252,11 +248,19 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { } if (outputPipeSink instanceof PipeConnectorWithEventDiscard) { - ((PipeConnectorWithEventDiscard) outputPipeSink) - .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + ((PipeConnectorWithEventDiscard) outputPipeSink).discardEventsOfPipe(committerKey); } } + private static boolean isEventFromPipe( + final EnrichedEvent event, final CommitterKey committerKey) { + return committerKey.getPipeName().equals(event.getPipeName()) + && committerKey.getCreationTime() == event.getCreationTime() + && committerKey.getRegionId() == event.getRegionId() + && (committerKey.getRestartTimes() < 0 + || committerKey.equals(event.getCommitterKey())); + } + //////////////////////////// APIs provided for metric framework //////////////////////////// public String getAttributeSortedString() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java index 85634277627..42b1ae91366 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.agent.task.subtask.sink; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.agent.task.execution.PipeSinkSubtaskExecutor; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; @@ -87,19 +88,17 @@ public class PipeSinkSubtaskLifeCycle implements AutoCloseable { * Otherwise, the {@link PipeSinkSubtaskLifeCycle#runningTaskCount} might be inconsistent with the * {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} because of parallel connector scheduling. * - * @param pipeNameToDeregister pipe name - * @param regionId region id + * @param committerKey committer key of the pipe task to deregister * @return {@code true} if the {@link PipeSinkSubtask} is out of life cycle, indicating that the * {@link PipeSinkSubtask} should never be used again * @throws IllegalStateException if {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0 */ - public synchronized boolean deregister( - final String pipeNameToDeregister, final long creationTimeToDeregister, final int regionId) { + public synchronized boolean deregister(final CommitterKey committerKey) { if (registeredTaskCount <= 0) { throw new IllegalStateException(DataNodePipeMessages.REGISTEREDTASKCOUNT_0_1); } - subtask.discardEventsOfPipe(pipeNameToDeregister, creationTimeToDeregister, regionId); + subtask.discardEventsOfPipe(committerKey); try { if (registeredTaskCount > 1) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java index 367b9210406..3ad99ca5c06 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; @@ -211,7 +212,10 @@ public class PipeSinkSubtaskManager { // Shall not be empty final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor; - lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId)); + final CommitterKey committerKey = + PipeEventCommitManager.getInstance().getCommitterKey(pipeName, creationTime, regionId); + + lifeCycles.removeIf(o -> o.deregister(committerKey)); if (lifeCycles.isEmpty()) { attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java index 8bf69e6e6b0..4e2189dd5a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; @@ -157,11 +158,13 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { */ public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { events.removeIf( event -> { - if (pipeNameToDrop.equals(event.getPipeName()) - && creationTimeToDrop == event.getCreationTime() - && regionId == event.getRegionId()) { + if (isEventFromPipe(event, committerKey)) { event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); return true; } @@ -169,6 +172,15 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { }); } + private static boolean isEventFromPipe( + final EnrichedEvent event, final CommitterKey committerKey) { + return committerKey.getPipeName().equals(event.getPipeName()) + && committerKey.getCreationTime() == event.getCreationTime() + && committerKey.getRegionId() == event.getRegionId() + && (committerKey.getRestartTimes() < 0 + || committerKey.equals(event.getCommitterKey())); + } + public synchronized void decreaseEventsReferenceCount( final String holderMessage, final boolean shouldReport) { events.forEach(event -> event.decreaseReferenceCount(holderMessage, shouldReport)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java index 3bec537614c..b3a8884a146 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; @@ -201,10 +202,12 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable { public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { - defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); - endPointToBatch - .values() - .forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId)); + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { + defaultBatch.discardEventsOfPipe(committerKey); + endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(committerKey)); } public int size() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java index 649ef35c4ce..ea83524988b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.airgap; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter; import org.apache.iotdb.commons.utils.RetryUtils; @@ -613,8 +614,13 @@ public class IoTDBDataRegionAirGapSink extends IoTDBDataNodeAirGapSink { @Override public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + @Override + public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { if (Objects.nonNull(tabletBatchBuilder)) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + tabletBatchBuilder.discardEventsOfPipe(committerKey); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index 9adbcf6cf16..b542627f942 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -23,8 +23,8 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.commons.pipe.datastructure.Triple; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink; @@ -130,9 +130,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { private final Map<PipeTransferTrackableHandler, PipeTransferTrackableHandler> pendingHandlers = new ConcurrentHashMap<>(); - // Pipe name, creation time, region id - private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys = - ConcurrentHashMap.newKeySet(); + private final Set<CommitterKey> droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); private boolean enableSendTsFileLimit; private volatile boolean isConnectionException; @@ -749,16 +747,20 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { @Override public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { - droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + @Override + public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { + droppedPipeTaskKeys.add(committerKey); if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + tabletBatchBuilder.discardEventsOfPipe(committerKey); } retryEventQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && isDroppedPipe( - (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) { + && isDroppedPipe((EnrichedEvent) event, committerKey)) { ((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); retryEventQueueEventCounter.decreaseEventCount(event); return true; @@ -769,8 +771,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { retryTsFileQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && isDroppedPipe( - (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) { + && isDroppedPipe((EnrichedEvent) event, committerKey)) { ((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); retryEventQueueEventCounter.decreaseEventCount(event); return true; @@ -872,18 +873,15 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { } private boolean isDroppedPipe(final EnrichedEvent event) { - return droppedPipeTaskKeys.contains( - new Triple<>(event.getPipeName(), event.getCreationTime(), event.getRegionId())); - } - - private static boolean isDroppedPipe( - final EnrichedEvent event, - final String pipeNameToDrop, - final long creationTimeToDrop, - final int regionId) { - return pipeNameToDrop.equals(event.getPipeName()) - && creationTimeToDrop == event.getCreationTime() - && regionId == event.getRegionId(); + return droppedPipeTaskKeys.stream().anyMatch(key -> isDroppedPipe(event, key)); + } + + private static boolean isDroppedPipe(final EnrichedEvent event, final CommitterKey committerKey) { + return committerKey.getPipeName().equals(event.getPipeName()) + && committerKey.getCreationTime() == event.getCreationTime() + && committerKey.getRegionId() == event.getRegionId() + && (committerKey.getRestartTimes() < 0 + || committerKey.equals(event.getCommitterKey())); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index 5e6297d8438..d9e25f5e09f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.sync; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient; import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter; @@ -604,8 +605,13 @@ public class IoTDBDataRegionSyncSink extends IoTDBDataNodeSyncSink { @Override public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + @Override + public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { if (Objects.nonNull(tabletBatchBuilder)) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + tabletBatchBuilder.discardEventsOfPipe(committerKey); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java index bbb4cb9a3a8..3702da3501a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java @@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.websocket; import org.apache.iotdb.commons.external.collections4.BidiMap; import org.apache.iotdb.commons.external.collections4.bidimap.DualTreeBidiMap; -import org.apache.iotdb.commons.pipe.datastructure.Triple; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -60,9 +60,7 @@ public class WebSocketConnectorServer extends WebSocketServer { private final ConcurrentHashMap<String, ConcurrentHashMap<Long, EventWaitingForAck>> eventsWaitingForAck = new ConcurrentHashMap<>(); - // Pipe name, creation time, region id - private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys = - ConcurrentHashMap.newKeySet(); + private final Set<CommitterKey> droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); private final BidiMap<String, WebSocket> router = new DualTreeBidiMap<String, WebSocket>(null, Comparator.comparing(Object::hashCode)) {}; @@ -118,33 +116,33 @@ public class WebSocketConnectorServer extends WebSocketServer { .forEach((eventId, eventWrapper) -> discardEvent(eventWrapper.event)); } - droppedPipeTaskKeys.removeIf(key -> key.getFirst().equals(pipeName)); + droppedPipeTaskKeys.removeIf(key -> key.getPipeName().equals(pipeName)); } public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { - droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { + droppedPipeTaskKeys.add(committerKey); final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue = - eventsWaitingForTransfer.get(pipeNameToDrop); + eventsWaitingForTransfer.get(committerKey.getPipeName()); if (eventTransferQueue != null) { eventTransferQueue.removeIf( - eventWrapper -> - discardIfMatches(eventWrapper.event, pipeNameToDrop, creationTimeToDrop, regionId)); + eventWrapper -> discardIfMatches(eventWrapper.event, committerKey)); synchronized (eventTransferQueue) { eventTransferQueue.notifyAll(); } } final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap = - eventsWaitingForAck.get(pipeNameToDrop); + eventsWaitingForAck.get(committerKey.getPipeName()); if (eventId2EventMap != null) { eventId2EventMap .entrySet() - .removeIf( - entry -> - discardIfMatches( - entry.getValue().event, pipeNameToDrop, creationTimeToDrop, regionId)); + .removeIf(entry -> discardIfMatches(entry.getValue().event, committerKey)); } } @@ -506,19 +504,13 @@ public class WebSocketConnectorServer extends WebSocketServer { } } - private boolean discardIfMatches( - final Event event, - final String pipeNameToDrop, - final long creationTimeToDrop, - final int regionId) { + private boolean discardIfMatches(final Event event, final CommitterKey committerKey) { if (!(event instanceof EnrichedEvent)) { return false; } final EnrichedEvent enrichedEvent = (EnrichedEvent) event; - if (!pipeNameToDrop.equals(enrichedEvent.getPipeName()) - || creationTimeToDrop != enrichedEvent.getCreationTime() - || regionId != enrichedEvent.getRegionId()) { + if (!isEventFromPipe(enrichedEvent, committerKey)) { return false; } @@ -528,11 +520,17 @@ public class WebSocketConnectorServer extends WebSocketServer { private boolean isDroppedPipe(final Event event) { return event instanceof EnrichedEvent - && droppedPipeTaskKeys.contains( - new Triple<>( - ((EnrichedEvent) event).getPipeName(), - ((EnrichedEvent) event).getCreationTime(), - ((EnrichedEvent) event).getRegionId())); + && droppedPipeTaskKeys.stream() + .anyMatch(key -> isEventFromPipe((EnrichedEvent) event, key)); + } + + private static boolean isEventFromPipe( + final EnrichedEvent event, final CommitterKey committerKey) { + return committerKey.getPipeName().equals(event.getPipeName()) + && committerKey.getCreationTime() == event.getCreationTime() + && committerKey.getRegionId() == event.getRegionId() + && (committerKey.getRestartTimes() < 0 + || committerKey.equals(event.getCommitterKey())); } private boolean isQueueAvailable( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java index 3c487ff1356..06eab035b4e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.websocket; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; @@ -177,6 +178,13 @@ public class WebSocketSink implements PipeConnector, PipeConnectorWithEventDisca } } + @Override + public void discardEventsOfPipe(final CommitterKey committerKey) { + if (server != null) { + server.discardEventsOfPipe(committerKey); + } + } + public void commit(EnrichedEvent enrichedEvent) { Optional.ofNullable(enrichedEvent) .ifPresent(event -> event.decreaseReferenceCount(WebSocketSink.class.getName(), true)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java index c56e8143ef5..9ae5f3873dd 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java @@ -20,8 +20,8 @@ package org.apache.iotdb.commons.pipe.agent.task.connection; import org.apache.iotdb.commons.i18n.PipeMessages; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.commons.pipe.datastructure.Triple; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.metric.PipeEventCounter; import org.apache.iotdb.pipe.api.event.Event; @@ -48,9 +48,7 @@ public abstract class BlockingPendingQueue<E extends Event> { protected final AtomicBoolean isClosed = new AtomicBoolean(false); - // Pipe name, creation time, region id - protected final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys = - ConcurrentHashMap.newKeySet(); + protected final Set<CommitterKey> droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); protected BlockingPendingQueue( final BlockingQueue<E> pendingQueue, final PipeEventCounter eventCounter) { @@ -139,12 +137,14 @@ public abstract class BlockingPendingQueue<E extends Event> { public void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { - droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + public void discardEventsOfPipe(final CommitterKey committerKey) { + droppedPipeTaskKeys.add(committerKey); pendingQueue.removeIf( event -> { - if (event instanceof EnrichedEvent - && isEventFromPipe( - ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) { + if (event instanceof EnrichedEvent && isEventFromPipe((EnrichedEvent) event, committerKey)) { if (((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName())) { eventCounter.decreaseEventCount(event); } @@ -192,16 +192,31 @@ public abstract class BlockingPendingQueue<E extends Event> { && regionId == event.getRegionId(); } + protected static boolean isEventFromPipe( + final EnrichedEvent event, final CommitterKey committerKey) { + return committerKey.getPipeName().equals(event.getPipeName()) + && committerKey.getCreationTime() == event.getCreationTime() + && committerKey.getRegionId() == event.getRegionId() + && (committerKey.getRestartTimes() < 0 + || committerKey.equals(event.getCommitterKey())); + } + protected boolean isEventFromDroppedPipe(final E event) { return event instanceof EnrichedEvent && ((EnrichedEvent) event).getPipeName() != null - && isPipeDropped( - ((EnrichedEvent) event).getPipeName(), - ((EnrichedEvent) event).getCreationTime(), - ((EnrichedEvent) event).getRegionId()); + && isEventFromDroppedPipe((EnrichedEvent) event); + } + + public boolean isEventFromDroppedPipe(final EnrichedEvent event) { + return droppedPipeTaskKeys.stream().anyMatch(key -> isEventFromPipe(event, key)); } public boolean isPipeDropped(final String pipeName, final long creationTime, final int regionId) { - return droppedPipeTaskKeys.contains(new Triple<>(pipeName, creationTime, regionId)); + return droppedPipeTaskKeys.stream() + .anyMatch( + key -> + key.getPipeName().equals(pipeName) + && key.getCreationTime() == creationTime + && key.getRegionId() == regionId); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java index f2c3a73e18c..26e7ea305d5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java @@ -168,6 +168,11 @@ public class PipeEventCommitManager { return true; } + public CommitterKey getCommitterKey( + final String pipeName, final long creationTime, final int regionId) { + return generateCommitterKey(pipeName, creationTime, regionId); + } + private CommitterKey generateCommitterKey( final String pipeName, final long creationTime, final int regionId) { return taskAgent.getCommitterKey( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java index ab4dbcf9075..4ffc0c25ed2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java @@ -19,7 +19,14 @@ package org.apache.iotdb.commons.pipe.sink.protocol; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; + public interface PipeConnectorWithEventDiscard { void discardEventsOfPipe(String pipeName, long creationTime, int regionId); + + default void discardEventsOfPipe(final CommitterKey committerKey) { + discardEventsOfPipe( + committerKey.getPipeName(), committerKey.getCreationTime(), committerKey.getRegionId()); + } }
