This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new ca52520f482 [To dev/1.3] Fix pipe drop event discard with
restart-aware committer keys (#17748) (#17778)
ca52520f482 is described below
commit ca52520f4823397fa00eeba8ea1ecd5c4aba3345
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 28 17:16:14 2026 +0800
[To dev/1.3] Fix pipe drop event discard with restart-aware committer keys
(#17748) (#17778)
---
.../agent/task/connection/PipeEventCollector.java | 5 ++-
.../sink/PipeRealtimePriorityBlockingQueue.java | 10 +++--
.../agent/task/subtask/sink/PipeSinkSubtask.java | 25 ++++++-----
.../subtask/sink/PipeSinkSubtaskLifeCycle.java | 9 ++--
.../task/subtask/sink/PipeSinkSubtaskManager.java | 6 ++-
.../evolvable/batch/PipeTabletEventBatch.java | 17 ++++++--
.../batch/PipeTransferBatchReqBuilder.java | 11 +++--
.../protocol/airgap/IoTDBDataRegionAirGapSink.java | 8 +++-
.../thrift/async/IoTDBDataRegionAsyncSink.java | 41 ++++++++---------
.../thrift/sync/IoTDBDataRegionSyncSink.java | 8 +++-
.../websocket/WebSocketConnectorServer.java | 51 ++++++++++------------
.../sink/protocol/websocket/WebSocketSink.java | 8 ++++
.../subtask/SubscriptionSinkSubtaskLifeCycle.java | 4 +-
.../subtask/SubscriptionSinkSubtaskManager.java | 7 ++-
.../task/subtask/sink/PipeSinkSubtaskTest.java | 6 ++-
.../task/connection/BlockingPendingQueue.java | 39 ++++++++++++-----
.../task/progress/PipeEventCommitManager.java | 5 +++
.../protocol/PipeConnectorWithEventDiscard.java | 7 +++
18 files changed, 171 insertions(+), 96 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index 95e8196ad38..f0cc4621612 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -202,7 +202,10 @@ public class PipeEventCollector implements EventCollector {
enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());
if (enrichedEvent.getPipeName() != null
- && pendingQueue.isPipeDropped(enrichedEvent.getPipeName(),
creationTime, regionId)) {
+ && (pendingQueue.isEventFromDroppedPipe(enrichedEvent)
+ || (enrichedEvent.getCommitterKey() == null
+ && pendingQueue.isPipeDropped(
+ enrichedEvent.getPipeName(), creationTime, regionId)))) {
enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName());
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
index f972bba0e6e..e35763c3012 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
@@ -360,12 +360,16 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
@Override
public void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
- super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
+ discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop,
regionId, -1));
+ }
+
+ @Override
+ public void discardEventsOfPipe(final CommitterKey committerKey) {
+ super.discardEventsOfPipe(committerKey);
tsfileInsertEventDeque.removeIf(
event -> {
if (event instanceof EnrichedEvent
- && isEventFromPipe(
- ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop,
regionId)) {
+ && isEventFromPipe((EnrichedEvent) event, committerKey)) {
if (((EnrichedEvent) event)
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
eventCounter.decreaseEventCount(event);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 1e7c50f389e..c855eb57140 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -201,10 +202,9 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
* When a pipe is dropped, the connector maybe reused and will not be
closed. So we just discard
* its queued events in the output pipe connector.
*/
- public void discardEventsOfPipe(
- final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
+ public void discardEventsOfPipe(final CommitterKey committerKey) {
// Try to remove the events as much as possible
- inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop,
regionId);
+ inputPendingQueue.discardEventsOfPipe(committerKey);
try {
increaseHighPriorityTaskCount();
@@ -217,9 +217,7 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
// use a new thread to stop all the pipes, we will not encounter
deadlock here. Or else we
// will.
if (lastEvent instanceof EnrichedEvent
- && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
- && creationTimeToDrop == ((EnrichedEvent)
lastEvent).getCreationTime()
- && regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
+ && isEventFromPipe((EnrichedEvent) lastEvent, committerKey)) {
// Do not clear the last event's reference counts because it may be
on transferring
lastEvent = null;
// Submit self to avoid that the lastEvent has been retried "max
times" times and has
@@ -241,9 +239,7 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
// clear the lastExceptionEvent. It's safe to potentially clear it
twice because we have the
// "nonnull" detection.
if (lastExceptionEvent instanceof EnrichedEvent
- && pipeNameToDrop.equals(((EnrichedEvent)
lastExceptionEvent).getPipeName())
- && creationTimeToDrop == ((EnrichedEvent)
lastExceptionEvent).getCreationTime()
- && regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId())
{
+ && isEventFromPipe((EnrichedEvent) lastExceptionEvent,
committerKey)) {
clearReferenceCountAndReleaseLastExceptionEvent();
}
}
@@ -252,11 +248,18 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
}
if (outputPipeConnector instanceof PipeConnectorWithEventDiscard) {
- ((PipeConnectorWithEventDiscard) outputPipeConnector)
- .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
+ ((PipeConnectorWithEventDiscard)
outputPipeConnector).discardEventsOfPipe(committerKey);
}
}
+ private static boolean isEventFromPipe(
+ final EnrichedEvent event, final CommitterKey committerKey) {
+ return committerKey.getPipeName().equals(event.getPipeName())
+ && committerKey.getCreationTime() == event.getCreationTime()
+ && committerKey.getRegionId() == event.getRegionId()
+ && (committerKey.getRestartTimes() < 0 ||
committerKey.equals(event.getCommitterKey()));
+ }
+
//////////////////////////// APIs provided for metric framework
////////////////////////////
public String getAttributeSortedString() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
index 1780f5a87ef..61a064e1637 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.db.pipe.agent.task.execution.PipeSinkSubtaskExecutor;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.pipe.api.event.Event;
@@ -86,19 +87,17 @@ public class PipeSinkSubtaskLifeCycle implements
AutoCloseable {
* Otherwise, the {@link PipeSinkSubtaskLifeCycle#runningTaskCount} might be
inconsistent with the
* {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} because of parallel
connector scheduling.
*
- * @param pipeNameToDeregister pipe name
- * @param regionId region id
+ * @param committerKey committer key of the pipe task to deregister
* @return {@code true} if the {@link PipeSinkSubtask} is out of life cycle,
indicating that the
* {@link PipeSinkSubtask} should never be used again
* @throws IllegalStateException if {@link
PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0
*/
- public synchronized boolean deregister(
- final String pipeNameToDeregister, final long creationTimeToDeregister,
final int regionId) {
+ public synchronized boolean deregister(final CommitterKey committerKey) {
if (registeredTaskCount <= 0) {
throw new IllegalStateException("registeredTaskCount <= 0");
}
- subtask.discardEventsOfPipe(pipeNameToDeregister,
creationTimeToDeregister, regionId);
+ subtask.discardEventsOfPipe(committerKey);
try {
if (registeredTaskCount > 1) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index d7f81c12dbc..817471c785a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
@@ -209,7 +210,10 @@ public class PipeSinkSubtaskManager {
// Shall not be empty
final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor;
- lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId));
+ final CommitterKey committerKey =
+ PipeEventCommitManager.getInstance().getCommitterKey(pipeName,
creationTime, regionId);
+
+ lifeCycles.removeIf(o -> o.deregister(committerKey));
if (lifeCycles.isEmpty()) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
index c44e12a4bbf..7058b885750 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
@@ -156,11 +157,13 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
*/
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
+ discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop,
regionId, -1));
+ }
+
+ public synchronized void discardEventsOfPipe(final CommitterKey
committerKey) {
events.removeIf(
event -> {
- if (pipeNameToDrop.equals(event.getPipeName())
- && creationTimeToDrop == event.getCreationTime()
- && regionId == event.getRegionId()) {
+ if (isEventFromPipe(event, committerKey)) {
event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
return true;
}
@@ -168,6 +171,14 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
});
}
+ private static boolean isEventFromPipe(
+ final EnrichedEvent event, final CommitterKey committerKey) {
+ return committerKey.getPipeName().equals(event.getPipeName())
+ && committerKey.getCreationTime() == event.getCreationTime()
+ && committerKey.getRegionId() == event.getRegionId()
+ && (committerKey.getRestartTimes() < 0 ||
committerKey.equals(event.getCommitterKey()));
+ }
+
public synchronized void decreaseEventsReferenceCount(
final String holderMessage, final boolean shouldReport) {
events.forEach(event -> event.decreaseReferenceCount(holderMessage,
shouldReport));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index ac5f568f1c6..45264138596 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -197,10 +198,12 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
- defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop,
regionId);
- endPointToBatch
- .values()
- .forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop,
creationTimeToDrop, regionId));
+ discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop,
regionId, -1));
+ }
+
+ public synchronized void discardEventsOfPipe(final CommitterKey
committerKey) {
+ defaultBatch.discardEventsOfPipe(committerKey);
+ endPointToBatch.values().forEach(batch ->
batch.discardEventsOfPipe(committerKey));
}
public int size() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 13bcb537ae1..81e745dc6a7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.sink.protocol.airgap;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
import org.apache.iotdb.commons.utils.RetryUtils;
@@ -546,8 +547,13 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
@Override
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
+ discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop,
regionId, -1));
+ }
+
+ @Override
+ public synchronized void discardEventsOfPipe(final CommitterKey
committerKey) {
if (Objects.nonNull(tabletBatchBuilder)) {
- tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop,
creationTimeToDrop, regionId);
+ tabletBatchBuilder.discardEventsOfPipe(committerKey);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index fe3d44bedb4..69dcb922e4a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -22,8 +22,8 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ThriftClient;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.commons.pipe.datastructure.Triple;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
@@ -123,9 +123,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
private final Map<PipeTransferTrackableHandler,
PipeTransferTrackableHandler> pendingHandlers =
new ConcurrentHashMap<>();
- // Pipe name, creation time, region id
- private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
- ConcurrentHashMap.newKeySet();
+ private final Set<CommitterKey> droppedPipeTaskKeys =
ConcurrentHashMap.newKeySet();
private boolean enableSendTsFileLimit;
private volatile boolean isConnectionException;
@@ -722,16 +720,20 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
@Override
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
- droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop,
regionId));
+ discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop,
regionId, -1));
+ }
+
+ @Override
+ public synchronized void discardEventsOfPipe(final CommitterKey
committerKey) {
+ droppedPipeTaskKeys.add(committerKey);
if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) {
- tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop,
creationTimeToDrop, regionId);
+ tabletBatchBuilder.discardEventsOfPipe(committerKey);
}
retryEventQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
- && isDroppedPipe(
- (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop,
regionId)) {
+ && isDroppedPipe((EnrichedEvent) event, committerKey)) {
((EnrichedEvent)
event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
retryEventQueueEventCounter.decreaseEventCount(event);
return true;
@@ -742,8 +744,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
retryTsFileQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
- && isDroppedPipe(
- (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop,
regionId)) {
+ && isDroppedPipe((EnrichedEvent) event, committerKey)) {
((EnrichedEvent)
event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
retryEventQueueEventCounter.decreaseEventCount(event);
return true;
@@ -845,18 +846,14 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
}
private boolean isDroppedPipe(final EnrichedEvent event) {
- return droppedPipeTaskKeys.contains(
- new Triple<>(event.getPipeName(), event.getCreationTime(),
event.getRegionId()));
- }
-
- private static boolean isDroppedPipe(
- final EnrichedEvent event,
- final String pipeNameToDrop,
- final long creationTimeToDrop,
- final int regionId) {
- return pipeNameToDrop.equals(event.getPipeName())
- && creationTimeToDrop == event.getCreationTime()
- && regionId == event.getRegionId();
+ return droppedPipeTaskKeys.stream().anyMatch(key -> isDroppedPipe(event,
key));
+ }
+
+ private static boolean isDroppedPipe(final EnrichedEvent event, final
CommitterKey committerKey) {
+ return committerKey.getPipeName().equals(event.getPipeName())
+ && committerKey.getCreationTime() == event.getCreationTime()
+ && committerKey.getRegionId() == event.getRegionId()
+ && (committerKey.getRestartTimes() < 0 ||
committerKey.equals(event.getCommitterKey()));
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index 552b8cf1cae..ef3d59f0d2a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.sync;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient;
import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
@@ -523,8 +524,13 @@ public class IoTDBDataRegionSyncSink extends
IoTDBDataNodeSyncSink {
@Override
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
+ discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop,
regionId, -1));
+ }
+
+ @Override
+ public synchronized void discardEventsOfPipe(final CommitterKey
committerKey) {
if (Objects.nonNull(tabletBatchBuilder)) {
- tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop,
creationTimeToDrop, regionId);
+ tabletBatchBuilder.discardEventsOfPipe(committerKey);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
index 2a8b5c8c3c0..0ecd1ad6e34 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.sink.protocol.websocket;
-import org.apache.iotdb.commons.pipe.datastructure.Triple;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.Event;
@@ -59,9 +59,7 @@ public class WebSocketConnectorServer extends WebSocketServer
{
private final ConcurrentHashMap<String, ConcurrentHashMap<Long,
EventWaitingForAck>>
eventsWaitingForAck = new ConcurrentHashMap<>();
- // Pipe name, creation time, region id
- private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
- ConcurrentHashMap.newKeySet();
+ private final Set<CommitterKey> droppedPipeTaskKeys =
ConcurrentHashMap.newKeySet();
private final BidiMap<String, WebSocket> router =
new DualTreeBidiMap<String, WebSocket>(null,
Comparator.comparing(Object::hashCode)) {};
@@ -117,33 +115,33 @@ public class WebSocketConnectorServer extends
WebSocketServer {
.forEach((eventId, eventWrapper) ->
discardEvent(eventWrapper.event));
}
- droppedPipeTaskKeys.removeIf(key -> key.getFirst().equals(pipeName));
+ droppedPipeTaskKeys.removeIf(key -> key.getPipeName().equals(pipeName));
}
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
- droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop,
regionId));
+ discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop,
regionId, -1));
+ }
+
+ public synchronized void discardEventsOfPipe(final CommitterKey
committerKey) {
+ droppedPipeTaskKeys.add(committerKey);
final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue =
- eventsWaitingForTransfer.get(pipeNameToDrop);
+ eventsWaitingForTransfer.get(committerKey.getPipeName());
if (eventTransferQueue != null) {
eventTransferQueue.removeIf(
- eventWrapper ->
- discardIfMatches(eventWrapper.event, pipeNameToDrop,
creationTimeToDrop, regionId));
+ eventWrapper -> discardIfMatches(eventWrapper.event, committerKey));
synchronized (eventTransferQueue) {
eventTransferQueue.notifyAll();
}
}
final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap =
- eventsWaitingForAck.get(pipeNameToDrop);
+ eventsWaitingForAck.get(committerKey.getPipeName());
if (eventId2EventMap != null) {
eventId2EventMap
.entrySet()
- .removeIf(
- entry ->
- discardIfMatches(
- entry.getValue().event, pipeNameToDrop,
creationTimeToDrop, regionId));
+ .removeIf(entry -> discardIfMatches(entry.getValue().event,
committerKey));
}
}
@@ -515,19 +513,13 @@ public class WebSocketConnectorServer extends
WebSocketServer {
}
}
- private boolean discardIfMatches(
- final Event event,
- final String pipeNameToDrop,
- final long creationTimeToDrop,
- final int regionId) {
+ private boolean discardIfMatches(final Event event, final CommitterKey
committerKey) {
if (!(event instanceof EnrichedEvent)) {
return false;
}
final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
- if (!pipeNameToDrop.equals(enrichedEvent.getPipeName())
- || creationTimeToDrop != enrichedEvent.getCreationTime()
- || regionId != enrichedEvent.getRegionId()) {
+ if (!isEventFromPipe(enrichedEvent, committerKey)) {
return false;
}
@@ -537,11 +529,16 @@ public class WebSocketConnectorServer extends
WebSocketServer {
private boolean isDroppedPipe(final Event event) {
return event instanceof EnrichedEvent
- && droppedPipeTaskKeys.contains(
- new Triple<>(
- ((EnrichedEvent) event).getPipeName(),
- ((EnrichedEvent) event).getCreationTime(),
- ((EnrichedEvent) event).getRegionId()));
+ && droppedPipeTaskKeys.stream()
+ .anyMatch(key -> isEventFromPipe((EnrichedEvent) event, key));
+ }
+
+ private static boolean isEventFromPipe(
+ final EnrichedEvent event, final CommitterKey committerKey) {
+ return committerKey.getPipeName().equals(event.getPipeName())
+ && committerKey.getCreationTime() == event.getCreationTime()
+ && committerKey.getRegionId() == event.getRegionId()
+ && (committerKey.getRestartTimes() < 0 ||
committerKey.equals(event.getCommitterKey()));
}
private boolean isQueueAvailable(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
index 40fccc12c99..7841e0199b2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.sink.protocol.websocket;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
@@ -173,6 +174,13 @@ public class WebSocketSink implements PipeConnector,
PipeConnectorWithEventDisca
}
}
+ @Override
+ public void discardEventsOfPipe(final CommitterKey committerKey) {
+ if (server != null) {
+ server.discardEventsOfPipe(committerKey);
+ }
+ }
+
public void commit(EnrichedEvent enrichedEvent) {
Optional.ofNullable(enrichedEvent)
.ifPresent(event ->
event.decreaseReferenceCount(WebSocketSink.class.getName(), true));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
index af871feaa7e..c24098f44fc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.subscription.task.subtask;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.db.pipe.agent.task.execution.PipeSinkSubtaskExecutor;
import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask;
import
org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtaskLifeCycle;
@@ -63,8 +64,7 @@ public class SubscriptionSinkSubtaskLifeCycle extends
PipeSinkSubtaskLifeCycle {
}
@Override
- public synchronized boolean deregister(
- final String pipeNameToDeregister, final long creationTimeToDeregister,
final int regionId) {
+ public synchronized boolean deregister(final CommitterKey committerKey) {
if (registeredTaskCount <= 0) {
throw new IllegalStateException("registeredTaskCount <= 0");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
index f4547673eaa..1c081888b35 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.subscription.task.subtask;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
@@ -167,7 +168,11 @@ public class SubscriptionSinkSubtaskManager {
final PipeSinkSubtaskLifeCycle lifeCycle =
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
- if (lifeCycle.deregister(pipeName, creationTime, regionId)) {
+
+ final CommitterKey committerKey =
+ PipeEventCommitManager.getInstance().getCommitterKey(pipeName,
creationTime, regionId);
+
+ if (lifeCycle.deregister(committerKey)) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
index ddfc699721b..2a15fb9ea18 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -51,9 +52,10 @@ public class PipeSinkSubtaskTest {
connector));
try {
- subtask.discardEventsOfPipe("pipe", 1L, 1);
+ final CommitterKey committerKey = new CommitterKey("pipe", 1L, 1, -1);
+ subtask.discardEventsOfPipe(committerKey);
- verify((PipeConnectorWithEventDiscard)
connector).discardEventsOfPipe("pipe", 1L, 1);
+ verify((PipeConnectorWithEventDiscard)
connector).discardEventsOfPipe(committerKey);
} finally {
subtask.close();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
index 8d920121363..c7b91f36d22 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.commons.pipe.agent.task.connection;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.commons.pipe.datastructure.Triple;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
import org.apache.iotdb.pipe.api.event.Event;
@@ -47,9 +47,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
- // Pipe name, creation time, region id
- protected final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
- ConcurrentHashMap.newKeySet();
+ protected final Set<CommitterKey> droppedPipeTaskKeys =
ConcurrentHashMap.newKeySet();
protected BlockingPendingQueue(
final BlockingQueue<E> pendingQueue, final PipeEventCounter
eventCounter) {
@@ -138,12 +136,15 @@ public abstract class BlockingPendingQueue<E extends
Event> {
public void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
- droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop,
regionId));
+ discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop,
regionId, -1));
+ }
+
+ public void discardEventsOfPipe(final CommitterKey committerKey) {
+ droppedPipeTaskKeys.add(committerKey);
pendingQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
- && isEventFromPipe(
- ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop,
regionId)) {
+ && isEventFromPipe((EnrichedEvent) event, committerKey)) {
if (((EnrichedEvent)
event).clearReferenceCount(BlockingPendingQueue.class.getName())) {
eventCounter.decreaseEventCount(event);
}
@@ -191,16 +192,30 @@ public abstract class BlockingPendingQueue<E extends
Event> {
&& regionId == event.getRegionId();
}
+ protected static boolean isEventFromPipe(
+ final EnrichedEvent event, final CommitterKey committerKey) {
+ return committerKey.getPipeName().equals(event.getPipeName())
+ && committerKey.getCreationTime() == event.getCreationTime()
+ && committerKey.getRegionId() == event.getRegionId()
+ && (committerKey.getRestartTimes() < 0 ||
committerKey.equals(event.getCommitterKey()));
+ }
+
protected boolean isEventFromDroppedPipe(final E event) {
return event instanceof EnrichedEvent
&& ((EnrichedEvent) event).getPipeName() != null
- && isPipeDropped(
- ((EnrichedEvent) event).getPipeName(),
- ((EnrichedEvent) event).getCreationTime(),
- ((EnrichedEvent) event).getRegionId());
+ && isEventFromDroppedPipe((EnrichedEvent) event);
+ }
+
+ public boolean isEventFromDroppedPipe(final EnrichedEvent event) {
+ return droppedPipeTaskKeys.stream().anyMatch(key -> isEventFromPipe(event,
key));
}
public boolean isPipeDropped(final String pipeName, final long creationTime,
final int regionId) {
- return droppedPipeTaskKeys.contains(new Triple<>(pipeName, creationTime,
regionId));
+ return droppedPipeTaskKeys.stream()
+ .anyMatch(
+ key ->
+ key.getPipeName().equals(pipeName)
+ && key.getCreationTime() == creationTime
+ && key.getRegionId() == regionId);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
index 7056b052a3e..9e1653a2516 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
@@ -169,6 +169,11 @@ public class PipeEventCommitManager {
return true;
}
+ public CommitterKey getCommitterKey(
+ final String pipeName, final long creationTime, final int regionId) {
+ return generateCommitterKey(pipeName, creationTime, regionId);
+ }
+
private CommitterKey generateCommitterKey(
final String pipeName, final long creationTime, final int regionId) {
return taskAgent.getCommitterKey(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
index ab4dbcf9075..4ffc0c25ed2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
@@ -19,7 +19,14 @@
package org.apache.iotdb.commons.pipe.sink.protocol;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
+
public interface PipeConnectorWithEventDiscard {
void discardEventsOfPipe(String pipeName, long creationTime, int regionId);
+
+ default void discardEventsOfPipe(final CommitterKey committerKey) {
+ discardEventsOfPipe(
+ committerKey.getPipeName(), committerKey.getCreationTime(),
committerKey.getRegionId());
+ }
}