This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 2ea083dcf6a [To dev/1.3] Pipe: Fixed the event clear logic of drop
pipe (#17560) (#17619)
2ea083dcf6a is described below
commit 2ea083dcf6add81e9bb43ef536728c7d53e47f96
Author: Caideyipi <[email protected]>
AuthorDate: Sat May 9 14:48:31 2026 +0800
[To dev/1.3] Pipe: Fixed the event clear logic of drop pipe (#17560)
(#17619)
* Pipe: Fixed the event clear logic of drop pipe (#17560)
* drop-1
* wd
* drop
* fix
* local
* triple
* by
* spt
* bug-fix
* no-pipe-task-key
* Update IoTDBDataRegionAsyncSink.java
* triple
* Fix
* comp
* comp-fix02
* drop-n
---
.../agent/task/connection/PipeEventCollector.java | 20 ++-
.../sink/PipeRealtimePriorityBlockingQueue.java | 13 +-
.../agent/task/subtask/sink/PipeSinkSubtask.java | 13 +-
.../subtask/sink/PipeSinkSubtaskLifeCycle.java | 5 +-
.../task/subtask/sink/PipeSinkSubtaskManager.java | 2 +-
.../evolvable/batch/PipeTabletEventBatch.java | 7 +-
.../batch/PipeTransferBatchReqBuilder.java | 9 +-
.../thrift/async/IoTDBDataRegionAsyncSink.java | 50 ++++++--
.../thrift/sync/IoTDBDataRegionSyncSink.java | 5 +-
.../websocket/WebSocketConnectorServer.java | 141 +++++++++++++++++----
.../sink/protocol/websocket/WebSocketSink.java | 11 +-
.../subtask/SubscriptionSinkSubtaskLifeCycle.java | 3 +-
.../subtask/SubscriptionSinkSubtaskManager.java | 2 +-
.../task/connection/PipeEventCollectorTest.java | 86 +++++++++++++
.../task/subtask/sink/PipeSinkSubtaskTest.java | 61 +++++++++
.../apache/iotdb/db/pipe/sink/PipeSinkTest.java | 95 ++++++++++++++
.../task/connection/BlockingPendingQueue.java | 55 +++++++-
.../iotdb/commons/pipe/datastructure/Triple.java | 63 +++++++++
.../commons/pipe/sink/protocol/IoTDBSink.java | 5 +-
.../protocol/PipeConnectorWithEventDiscard.java | 25 ++++
20 files changed, 599 insertions(+), 72 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index 4728f62c94f..387d4ff7ec6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -178,7 +178,8 @@ public class PipeEventCollector implements EventCollector {
private void collectEvent(final Event event) {
if (event instanceof EnrichedEvent) {
- if (!((EnrichedEvent)
event).increaseReferenceCount(PipeEventCollector.class.getName())) {
+ final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
+ if
(!enrichedEvent.increaseReferenceCount(PipeEventCollector.class.getName())) {
LOGGER.warn("PipeEventCollector: The event {} is already released,
skipping it.", event);
isFailedToIncreaseReferenceCount = true;
return;
@@ -186,18 +187,25 @@ public class PipeEventCollector implements EventCollector
{
// Assign a commit id for this event in order to report progress in
order.
PipeEventCommitManager.getInstance()
- .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event,
creationTime, regionId);
+ .enrichWithCommitterKeyAndCommitId(enrichedEvent, creationTime,
regionId);
- // Assign a rebootTime for pipeConsensus
- ((EnrichedEvent)
event).setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());
+ // Assign a rebootTime for iotConsensusV2
+
enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());
+
+ if (enrichedEvent.getPipeName() != null
+ && pendingQueue.isPipeDropped(enrichedEvent.getPipeName(),
creationTime, regionId)) {
+ enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName());
+ return;
+ }
}
if (event instanceof PipeHeartbeatEvent) {
((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
}
- pendingQueue.offer(event);
- collectInvocationCount.incrementAndGet();
+ if (pendingQueue.offer(event)) {
+ collectInvocationCount.incrementAndGet();
+ }
}
public void resetFlags() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
index 6d227ac31fd..f972bba0e6e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
@@ -73,7 +73,9 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
@Override
public boolean offer(final Event event) {
- checkBeforeOffer(event);
+ if (!checkBeforeOffer(event)) {
+ return false;
+ }
if (event instanceof TsFileInsertionEvent) {
tsfileInsertEventDeque.add((TsFileInsertionEvent) event);
@@ -356,13 +358,14 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
}
@Override
- public void discardEventsOfPipe(final String pipeNameToDrop, final int
regionId) {
- super.discardEventsOfPipe(pipeNameToDrop, regionId);
+ public void discardEventsOfPipe(
+ final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
+ super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
tsfileInsertEventDeque.removeIf(
event -> {
if (event instanceof EnrichedEvent
- && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
- && regionId == ((EnrichedEvent) event).getRegionId()) {
+ && isEventFromPipe(
+ ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop,
regionId)) {
if (((EnrichedEvent) event)
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
eventCounter.decreaseEventCount(event);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 4b4794891f6..ae48fcb2778 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
+import
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
@@ -199,9 +200,10 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
* When a pipe is dropped, the connector maybe reused and will not be
closed. So we just discard
* its queued events in the output pipe connector.
*/
- public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
+ public void discardEventsOfPipe(
+ final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
// Try to remove the events as much as possible
- inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId);
+ inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop,
regionId);
try {
increaseHighPriorityTaskCount();
@@ -215,6 +217,7 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
// will.
if (lastEvent instanceof EnrichedEvent
&& pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
+ && creationTimeToDrop == ((EnrichedEvent)
lastEvent).getCreationTime()
&& regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
// Do not clear the last event's reference counts because it may be
on transferring
lastEvent = null;
@@ -238,6 +241,7 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
// "nonnull" detection.
if (lastExceptionEvent instanceof EnrichedEvent
&& pipeNameToDrop.equals(((EnrichedEvent)
lastExceptionEvent).getPipeName())
+ && creationTimeToDrop == ((EnrichedEvent)
lastExceptionEvent).getCreationTime()
&& regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId())
{
clearReferenceCountAndReleaseLastExceptionEvent();
}
@@ -246,8 +250,9 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
decreaseHighPriorityTaskCount();
}
- if (outputPipeConnector instanceof IoTDBSink) {
- ((IoTDBSink) outputPipeConnector).discardEventsOfPipe(pipeNameToDrop,
regionId);
+ if (outputPipeConnector instanceof PipeConnectorWithEventDiscard) {
+ ((PipeConnectorWithEventDiscard) outputPipeConnector)
+ .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
index 35f7983075d..1780f5a87ef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
@@ -92,12 +92,13 @@ public class PipeSinkSubtaskLifeCycle implements
AutoCloseable {
* {@link PipeSinkSubtask} should never be used again
* @throws IllegalStateException if {@link
PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0
*/
- public synchronized boolean deregister(final String pipeNameToDeregister,
int regionId) {
+ public synchronized boolean deregister(
+ final String pipeNameToDeregister, final long creationTimeToDeregister,
final int regionId) {
if (registeredTaskCount <= 0) {
throw new IllegalStateException("registeredTaskCount <= 0");
}
- subtask.discardEventsOfPipe(pipeNameToDeregister, regionId);
+ subtask.discardEventsOfPipe(pipeNameToDeregister,
creationTimeToDeregister, regionId);
try {
if (registeredTaskCount > 1) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index caff425f790..d7f81c12dbc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -209,7 +209,7 @@ public class PipeSinkSubtaskManager {
// Shall not be empty
final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor;
- lifeCycles.removeIf(o -> o.deregister(pipeName, regionId));
+ lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId));
if (lifeCycles.isEmpty()) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
index 96bddd0d672..c44e12a4bbf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -154,10 +154,13 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
* Discard all events of the given pipe. This method only clears the
reference count of the events
* and discard them, but do not modify other objects (such as buffers) for
simplicity.
*/
- public synchronized void discardEventsOfPipe(final String pipeNameToDrop,
final int regionId) {
+ public synchronized void discardEventsOfPipe(
+ final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
events.removeIf(
event -> {
- if (pipeNameToDrop.equals(event.getPipeName()) && regionId ==
event.getRegionId()) {
+ if (pipeNameToDrop.equals(event.getPipeName())
+ && creationTimeToDrop == event.getCreationTime()
+ && regionId == event.getRegionId()) {
event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 9fa706e985f..ac5f568f1c6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -195,9 +195,12 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
&&
endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
}
- public synchronized void discardEventsOfPipe(final String pipeNameToDrop,
final int regionId) {
- defaultBatch.discardEventsOfPipe(pipeNameToDrop, regionId);
- endPointToBatch.values().forEach(batch ->
batch.discardEventsOfPipe(pipeNameToDrop, regionId));
+ public synchronized void discardEventsOfPipe(
+ final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
+ defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop,
regionId);
+ endPointToBatch
+ .values()
+ .forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop,
creationTimeToDrop, regionId));
}
public int size() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 07381d15e28..fe3d44bedb4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ThriftClient;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.datastructure.Triple;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
@@ -74,6 +75,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -121,6 +123,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
private final Map<PipeTransferTrackableHandler,
PipeTransferTrackableHandler> pendingHandlers =
new ConcurrentHashMap<>();
+ // Pipe name, creation time, region id
+ private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
+ ConcurrentHashMap.newKeySet();
+
private boolean enableSendTsFileLimit;
private volatile boolean isConnectionException;
@@ -660,8 +666,15 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
public void addFailureEventToRetryQueue(final Event event, final Exception
e) {
isConnectionException =
e instanceof PipeConnectionException ||
ThriftClient.isConnectionBroken(e);
- if (event instanceof EnrichedEvent && ((EnrichedEvent)
event).isReleased()) {
- return;
+ if (event instanceof EnrichedEvent) {
+ final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
+ if (enrichedEvent.isReleased()) {
+ return;
+ }
+ if (isDroppedPipe(enrichedEvent)) {
+
enrichedEvent.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
+ return;
+ }
}
if (isClosed.get()) {
@@ -707,15 +720,18 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
//////////////////////////// Operations for close
////////////////////////////
@Override
- public synchronized void discardEventsOfPipe(final String pipeNameToDrop,
final int regionId) {
- if (isTabletBatchModeEnabled) {
- tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId);
+ public synchronized void discardEventsOfPipe(
+ final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
+ droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop,
regionId));
+
+ if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) {
+ tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop,
creationTimeToDrop, regionId);
}
retryEventQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
- && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
- && regionId == ((EnrichedEvent) event).getRegionId()) {
+ && isDroppedPipe(
+ (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop,
regionId)) {
((EnrichedEvent)
event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
retryEventQueueEventCounter.decreaseEventCount(event);
return true;
@@ -726,8 +742,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
retryTsFileQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
- && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
- && regionId == ((EnrichedEvent) event).getRegionId()) {
+ && isDroppedPipe(
+ (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop,
regionId)) {
((EnrichedEvent)
event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
retryEventQueueEventCounter.decreaseEventCount(event);
return true;
@@ -771,6 +787,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
// clear reference count of events in retry queue after closing async
client
clearRetryEventsReferenceCount();
+ droppedPipeTaskKeys.clear();
super.close();
}
@@ -827,6 +844,21 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
this.transferTsFileCounter = transferTsFileCounter;
}
+ private boolean isDroppedPipe(final EnrichedEvent event) {
+ return droppedPipeTaskKeys.contains(
+ new Triple<>(event.getPipeName(), event.getCreationTime(),
event.getRegionId()));
+ }
+
+ private static boolean isDroppedPipe(
+ final EnrichedEvent event,
+ final String pipeNameToDrop,
+ final long creationTimeToDrop,
+ final int regionId) {
+ return pipeNameToDrop.equals(event.getPipeName())
+ && creationTimeToDrop == event.getCreationTime()
+ && regionId == event.getRegionId();
+ }
+
@Override
public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
if (tabletBatchBuilder != null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index a13f40b1b83..552b8cf1cae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -521,9 +521,10 @@ public class IoTDBDataRegionSyncSink extends
IoTDBDataNodeSyncSink {
}
@Override
- public synchronized void discardEventsOfPipe(final String pipeNameToDrop,
final int regionId) {
+ public synchronized void discardEventsOfPipe(
+ final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
if (Objects.nonNull(tabletBatchBuilder)) {
- tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId);
+ tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop,
creationTimeToDrop, regionId);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
index 6da64460495..2a8b5c8c3c0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.sink.protocol.websocket;
+import org.apache.iotdb.commons.pipe.datastructure.Triple;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.Event;
@@ -39,6 +40,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -57,6 +59,10 @@ public class WebSocketConnectorServer extends
WebSocketServer {
private final ConcurrentHashMap<String, ConcurrentHashMap<Long,
EventWaitingForAck>>
eventsWaitingForAck = new ConcurrentHashMap<>();
+ // Pipe name, creation time, region id
+ private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
+ ConcurrentHashMap.newKeySet();
+
private final BidiMap<String, WebSocket> router =
new DualTreeBidiMap<String, WebSocket>(null,
Comparator.comparing(Object::hashCode)) {};
@@ -97,13 +103,8 @@ public class WebSocketConnectorServer extends
WebSocketServer {
eventWrappers = new ArrayList<>(eventTransferQueue);
eventTransferQueue.clear();
}
- eventWrappers.forEach(
- (eventWrapper) -> {
- if (eventWrapper.event instanceof EnrichedEvent) {
- ((EnrichedEvent) eventWrapper.event)
-
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
- }
- });
+ eventWrappers.forEach(eventWrapper ->
discardEvent(eventWrapper.event));
+ eventWrappers.clear();
synchronized (eventTransferQueue) {
eventTransferQueue.notifyAll();
}
@@ -113,13 +114,36 @@ public class WebSocketConnectorServer extends
WebSocketServer {
if (eventsWaitingForAck.containsKey(pipeName)) {
eventsWaitingForAck
.remove(pipeName)
- .forEach(
- (eventId, eventWrapper) -> {
- if (eventWrapper.event instanceof EnrichedEvent) {
- ((EnrichedEvent) eventWrapper.event)
-
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
- }
- });
+ .forEach((eventId, eventWrapper) ->
discardEvent(eventWrapper.event));
+ }
+
+ droppedPipeTaskKeys.removeIf(key -> key.getFirst().equals(pipeName));
+ }
+
+ public synchronized void discardEventsOfPipe(
+ final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
+ droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop,
regionId));
+
+ final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue =
+ eventsWaitingForTransfer.get(pipeNameToDrop);
+ if (eventTransferQueue != null) {
+ eventTransferQueue.removeIf(
+ eventWrapper ->
+ discardIfMatches(eventWrapper.event, pipeNameToDrop,
creationTimeToDrop, regionId));
+ synchronized (eventTransferQueue) {
+ eventTransferQueue.notifyAll();
+ }
+ }
+
+ final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap =
+ eventsWaitingForAck.get(pipeNameToDrop);
+ if (eventId2EventMap != null) {
+ eventId2EventMap
+ .entrySet()
+ .removeIf(
+ entry ->
+ discardIfMatches(
+ entry.getValue().event, pipeNameToDrop,
creationTimeToDrop, regionId));
}
}
@@ -300,21 +324,24 @@ public class WebSocketConnectorServer extends
WebSocketServer {
}
public void addEvent(Event event, WebSocketSink connector) {
+ if (isDroppedPipe(event)) {
+ discardEvent(event);
+ return;
+ }
+
+ final String pipeName = connector.getPipeName();
final PriorityBlockingQueue<EventWaitingForTransfer> queue =
- eventsWaitingForTransfer.get(connector.getPipeName());
+ eventsWaitingForTransfer.get(pipeName);
if (queue == null) {
LOGGER.warn("The pipe {} was dropped so the event {} will be dropped.",
connector, event);
- if (event instanceof EnrichedEvent) {
- ((EnrichedEvent) event)
- .decreaseReferenceCount(WebSocketConnectorServer.class.getName(),
false);
- }
+ discardEvent(event);
return;
}
if (queue.size() >= 5) {
synchronized (queue) {
- while (queue.size() >= 5) {
+ while (queue.size() >= 5 && isQueueAvailable(pipeName, queue) &&
!isDroppedPipe(event)) {
try {
queue.wait();
} catch (InterruptedException e) {
@@ -323,12 +350,22 @@ public class WebSocketConnectorServer extends
WebSocketServer {
}
}
+ if (!isQueueAvailable(pipeName, queue) || isDroppedPipe(event)) {
+ discardEvent(event);
+ return;
+ }
+
queue.put(
new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(),
connector, event));
return;
}
}
+ if (!isQueueAvailable(pipeName, queue) || isDroppedPipe(event)) {
+ discardEvent(event);
+ return;
+ }
+
synchronized (queue) {
queue.put(new
EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event));
}
@@ -377,6 +414,11 @@ public class WebSocketConnectorServer extends
WebSocketServer {
final WebSocketSink connector = element.connector;
try {
+ if (isDroppedPipe(event)) {
+ discardEvent(event);
+ return;
+ }
+
ByteBuffer tabletBuffer;
if (event instanceof PipeRawTabletInsertionEvent) {
tabletBuffer = ((PipeRawTabletInsertionEvent)
event).convertToTablet().serialize();
@@ -387,7 +429,11 @@ public class WebSocketConnectorServer extends
WebSocketServer {
}
if (tabletBuffer == null) {
- connector.commit((EnrichedEvent) event);
+ if (isDroppedPipe(event)) {
+ discardEvent(event);
+ } else {
+ connector.commit((EnrichedEvent) event);
+ }
return;
}
@@ -398,11 +444,17 @@ public class WebSocketConnectorServer extends
WebSocketServer {
server.broadcast(payload,
Collections.singletonList(router.get(pipeName)));
+ if (isDroppedPipe(event)) {
+ discardEvent(event);
+ return;
+ }
+
final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap =
eventsWaitingForAck.get(pipeName);
if (eventId2EventMap == null) {
LOGGER.warn(
"The pipe {} was dropped so the event ack {} will be ignored.",
pipeName, eventId);
+ discardEvent(event);
return;
}
eventId2EventMap.put(eventId, new EventWaitingForAck(connector,
event));
@@ -410,13 +462,10 @@ public class WebSocketConnectorServer extends
WebSocketServer {
synchronized (server) {
final PriorityBlockingQueue<EventWaitingForTransfer> queue =
eventsWaitingForTransfer.get(pipeName);
- if (queue == null) {
+ if (queue == null || isDroppedPipe(event)) {
LOGGER.warn(
"The pipe {} was dropped so the event {} will be dropped.",
pipeName, eventId);
- if (event instanceof EnrichedEvent) {
- ((EnrichedEvent) event)
-
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
- }
+ discardEvent(event);
return;
}
@@ -465,4 +514,44 @@ public class WebSocketConnectorServer extends
WebSocketServer {
this.event = event;
}
}
+
+ private boolean discardIfMatches(
+ final Event event,
+ final String pipeNameToDrop,
+ final long creationTimeToDrop,
+ final int regionId) {
+ if (!(event instanceof EnrichedEvent)) {
+ return false;
+ }
+
+ final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
+ if (!pipeNameToDrop.equals(enrichedEvent.getPipeName())
+ || creationTimeToDrop != enrichedEvent.getCreationTime()
+ || regionId != enrichedEvent.getRegionId()) {
+ return false;
+ }
+
+ discardEvent(enrichedEvent);
+ return true;
+ }
+
+ private boolean isDroppedPipe(final Event event) {
+ return event instanceof EnrichedEvent
+ && droppedPipeTaskKeys.contains(
+ new Triple<>(
+ ((EnrichedEvent) event).getPipeName(),
+ ((EnrichedEvent) event).getCreationTime(),
+ ((EnrichedEvent) event).getRegionId()));
+ }
+
+ private boolean isQueueAvailable(
+ final String pipeName, final
PriorityBlockingQueue<EventWaitingForTransfer> queue) {
+ return eventsWaitingForTransfer.get(pipeName) == queue;
+ }
+
+ private void discardEvent(final Event event) {
+ if (event instanceof EnrichedEvent) {
+ ((EnrichedEvent)
event).clearReferenceCount(WebSocketSink.class.getName());
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
index c89486bc2c8..40fccc12c99 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.websocket;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
@@ -39,7 +40,7 @@ import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Optional;
-public class WebSocketSink implements PipeConnector {
+public class WebSocketSink implements PipeConnector,
PipeConnectorWithEventDiscard {
private static final Logger LOGGER =
LoggerFactory.getLogger(WebSocketSink.class);
@@ -164,6 +165,14 @@ public class WebSocketSink implements PipeConnector {
}
}
+ @Override
+ public void discardEventsOfPipe(
+ final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
+ if (server != null) {
+ server.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
+ }
+ }
+
public void commit(EnrichedEvent enrichedEvent) {
Optional.ofNullable(enrichedEvent)
.ifPresent(event ->
event.decreaseReferenceCount(WebSocketSink.class.getName(), true));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
index 98163697374..af871feaa7e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
@@ -63,7 +63,8 @@ public class SubscriptionSinkSubtaskLifeCycle extends
PipeSinkSubtaskLifeCycle {
}
@Override
- public synchronized boolean deregister(final String pipeNameToDeregister,
int regionId) {
+ public synchronized boolean deregister(
+ final String pipeNameToDeregister, final long creationTimeToDeregister,
final int regionId) {
if (registeredTaskCount <= 0) {
throw new IllegalStateException("registeredTaskCount <= 0");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
index 07def3ff4d3..f4547673eaa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
@@ -167,7 +167,7 @@ public class SubscriptionSinkSubtaskManager {
final PipeSinkSubtaskLifeCycle lifeCycle =
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
- if (lifeCycle.deregister(pipeName, regionId)) {
+ if (lifeCycle.deregister(pipeName, creationTime, regionId)) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java
new file mode 100644
index 00000000000..d54db821007
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.task.connection;
+
+import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import
org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeRealtimePriorityBlockingQueue;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class PipeEventCollectorTest {
+
+ @Test
+ public void
testCollectorDoesNotOfferEventsOfDroppedPipeToUnboundedPendingQueue() {
+ verifyCollectorDoesNotOfferEventsOfDroppedPipe(
+ new UnboundedBlockingPendingQueue<>(new PipeDataRegionEventCounter()));
+ }
+
+ @Test
+ public void
testCollectorDoesNotOfferEventsOfDroppedPipeToRealtimePendingQueue() {
+ verifyCollectorDoesNotOfferEventsOfDroppedPipe(new
PipeRealtimePriorityBlockingQueue());
+ }
+
+ private void verifyCollectorDoesNotOfferEventsOfDroppedPipe(
+ final UnboundedBlockingPendingQueue<Event> pendingQueue) {
+ pendingQueue.discardEventsOfPipe("pipe", 1L, 1);
+
+ final PipeEventCollector droppedPipeCollector =
+ new PipeEventCollector(pendingQueue, 1L, 1, false, false);
+ final PipeRawTabletInsertionEvent droppedPipeEvent =
+ createPipeRawTabletInsertionEvent("pipe", 1L);
+ droppedPipeCollector.collect(droppedPipeEvent);
+
+ Assert.assertTrue(droppedPipeEvent.isReleased());
+ Assert.assertEquals(0, pendingQueue.size());
+
+ final PipeEventCollector recreatedPipeCollector =
+ new PipeEventCollector(pendingQueue, 2L, 1, false, false);
+ final PipeRawTabletInsertionEvent recreatedPipeEvent =
+ createPipeRawTabletInsertionEvent("pipe", 2L);
+ recreatedPipeCollector.collect(recreatedPipeEvent);
+
+ Assert.assertFalse(recreatedPipeEvent.isReleased());
+ Assert.assertEquals(1, pendingQueue.size());
+
+ pendingQueue.discardAllEvents();
+ Assert.assertTrue(recreatedPipeEvent.isReleased());
+ }
+
+ private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent(
+ final String pipeName, final long creationTime) {
+ final List<MeasurementSchema> schemaList =
+ Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64));
+ final Tablet tablet = new Tablet("root.db.d1", schemaList, 1);
+ tablet.addTimestamp(0, 1L);
+ tablet.addValue("s1", 0, 1L);
+ return new PipeRawTabletInsertionEvent(
+ tablet, false, pipeName, creationTime, null, null, false);
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
new file mode 100644
index 00000000000..ddfc699721b
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
+
+import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
+import org.apache.iotdb.pipe.api.PipeConnector;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.withSettings;
+
+public class PipeSinkSubtaskTest {
+
+ @Test
+ public void testDiscardEventsOfPipeDelegatesToConnector() {
+ final PipeConnector connector =
+ mock(
+ PipeConnector.class,
+
withSettings().extraInterfaces(PipeConnectorWithEventDiscard.class));
+ final UnboundedBlockingPendingQueue<?> pendingQueue =
mock(UnboundedBlockingPendingQueue.class);
+
+ final PipeSinkSubtask subtask =
+ Mockito.spy(
+ new PipeSinkSubtask(
+ "PipeSinkSubtaskTest",
+ System.currentTimeMillis(),
+ "data_test",
+ 0,
+ (UnboundedBlockingPendingQueue) pendingQueue,
+ connector));
+
+ try {
+ subtask.discardEventsOfPipe("pipe", 1L, 1);
+
+ verify((PipeConnectorWithEventDiscard)
connector).discardEventsOfPipe("pipe", 1L, 1);
+ } finally {
+ subtask.close();
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
index fae64308762..537ae6648b2 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.sink;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment;
@@ -28,14 +29,18 @@ import
org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
+import
org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketConnectorServer;
+import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketSink;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import java.security.SecureRandom;
import java.util.Arrays;
@@ -103,6 +108,85 @@ public class PipeSinkTest {
}
}
+ @Test
+ public void testAsyncSinkDropDoesNotRequeueDroppedPipeEvents() throws
Exception {
+ try (final IoTDBDataRegionAsyncSink connector = new
IoTDBDataRegionAsyncSink()) {
+ final PipeParameters parameters =
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(
+ PipeSinkConstant.CONNECTOR_KEY,
+
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
+ put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY,
"127.0.0.1:6668");
+ }
+ });
+ connector.validate(new PipeParameterValidator(parameters));
+ connector.customize(
+ parameters,
+ new PipeTaskRuntimeConfiguration(new
PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1)));
+
+ final PipeRawTabletInsertionEvent droppedEvent =
+ createPipeRawTabletInsertionEvent("pipe", 1L, 1);
+ droppedEvent.increaseReferenceCount("test");
+ droppedEvent.setCommitterKeyAndCommitId(new CommitterKey("pipe", 1L, 1,
-1), 1L);
+
+ connector.discardEventsOfPipe("pipe", 1L, 1);
+ connector.addFailureEventToRetryQueue(droppedEvent, new
PipeException("test"));
+
+ Assert.assertEquals(0, connector.getRetryEventQueueSize());
+ Assert.assertTrue(droppedEvent.isReleased());
+
+ final PipeRawTabletInsertionEvent recreatedPipeEvent =
+ createPipeRawTabletInsertionEvent("pipe", 2L, 1);
+ recreatedPipeEvent.increaseReferenceCount("test");
+ recreatedPipeEvent.setCommitterKeyAndCommitId(new CommitterKey("pipe",
2L, 1, -1), 1L);
+
+ connector.addFailureEventToRetryQueue(recreatedPipeEvent, new
PipeException("test"));
+
+ Assert.assertEquals(1, connector.getRetryEventQueueSize());
+ }
+ }
+
+ @Test
+ public void testWebSocketSinkDropDoesNotRequeueDroppedPipeEvents() {
+ final String pipeName = "pipe_" + System.nanoTime();
+ final WebSocketConnectorServer server =
WebSocketConnectorServer.getOrCreateInstance(0);
+ final WebSocketSink connector = Mockito.mock(WebSocketSink.class);
+ Mockito.when(connector.getPipeName()).thenReturn(pipeName);
+
+ server.register(connector);
+ try {
+ final PipeRawTabletInsertionEvent droppedEvent =
+ createPipeRawTabletInsertionEvent(pipeName, 1L, 1);
+ droppedEvent.increaseReferenceCount(WebSocketSink.class.getName());
+ droppedEvent.setCommitterKeyAndCommitId(new CommitterKey(pipeName, 1L,
1, -1), 1L);
+ server.addEvent(droppedEvent, connector);
+
+ server.discardEventsOfPipe(pipeName, 1L, 1);
+ Assert.assertTrue(droppedEvent.isReleased());
+
+ final PipeRawTabletInsertionEvent recreatedDroppedPipeEvent =
+ createPipeRawTabletInsertionEvent(pipeName, 1L, 1);
+
recreatedDroppedPipeEvent.increaseReferenceCount(WebSocketSink.class.getName());
+ recreatedDroppedPipeEvent.setCommitterKeyAndCommitId(
+ new CommitterKey(pipeName, 1L, 1, -1), 2L);
+ server.addEvent(recreatedDroppedPipeEvent, connector);
+
+ Assert.assertTrue(recreatedDroppedPipeEvent.isReleased());
+
+ final PipeRawTabletInsertionEvent recreatedPipeEvent =
+ createPipeRawTabletInsertionEvent(pipeName, 2L, 1);
+ recreatedPipeEvent.increaseReferenceCount(WebSocketSink.class.getName());
+ recreatedPipeEvent.setCommitterKeyAndCommitId(new CommitterKey(pipeName,
2L, 1, -1), 3L);
+ server.addEvent(recreatedPipeEvent, connector);
+
+ Assert.assertFalse(recreatedPipeEvent.isReleased());
+ } finally {
+ server.unregister(connector);
+ }
+ }
+
@Test
public void testOpcUaSink() {
final List<MeasurementSchema> schemaList =
@@ -181,4 +265,15 @@ public class PipeSinkTest {
Assert.fail();
}
}
+
+ private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent(
+ final String pipeName, final long creationTime, final int regionId) {
+ final List<MeasurementSchema> schemaList =
+ Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64));
+ final Tablet tablet = new Tablet("root.db.d" + regionId, schemaList, 1);
+ tablet.addTimestamp(0, 1L);
+ tablet.addValue("s1", 0, 1L);
+ return new PipeRawTabletInsertionEvent(
+ tablet, false, pipeName, creationTime, null, null, false);
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
index 8773b03f9f3..8d920121363 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.commons.pipe.agent.task.connection;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.datastructure.Triple;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
import org.apache.iotdb.pipe.api.event.Event;
@@ -27,7 +28,9 @@ import org.apache.iotdb.pipe.api.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@@ -44,6 +47,10 @@ public abstract class BlockingPendingQueue<E extends Event> {
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
+ // Pipe name, creation time, region id
+ protected final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
+ ConcurrentHashMap.newKeySet();
+
protected BlockingPendingQueue(
final BlockingQueue<E> pendingQueue, final PipeEventCounter
eventCounter) {
this.pendingQueue = pendingQueue;
@@ -51,7 +58,10 @@ public abstract class BlockingPendingQueue<E extends Event> {
}
public boolean offer(final E event) {
- checkBeforeOffer(event);
+ if (!checkBeforeOffer(event)) {
+ return false;
+ }
+
final boolean offered = pendingQueue.offer(event);
if (offered) {
eventCounter.increaseEventCount(event);
@@ -60,7 +70,9 @@ public abstract class BlockingPendingQueue<E extends Event> {
}
public boolean put(final E event) {
- checkBeforeOffer(event);
+ if (!checkBeforeOffer(event)) {
+ return false;
+ }
try {
pendingQueue.put(event);
eventCounter.increaseEventCount(event);
@@ -101,6 +113,7 @@ public abstract class BlockingPendingQueue<E extends Event>
{
isClosed.set(true);
pendingQueue.clear();
eventCounter.reset();
+ droppedPipeTaskKeys.clear();
}
/** DO NOT FORGET to set eventCounter to new value after invoking this
method. */
@@ -120,14 +133,17 @@ public abstract class BlockingPendingQueue<E extends
Event> {
return true;
});
eventCounter.reset();
+ droppedPipeTaskKeys.clear();
}
- public void discardEventsOfPipe(final String pipeNameToDrop, final int
regionId) {
+ public void discardEventsOfPipe(
+ final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
+ droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop,
regionId));
pendingQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
- && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
- && regionId == ((EnrichedEvent) event).getRegionId()) {
+ && isEventFromPipe(
+ ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop,
regionId)) {
if (((EnrichedEvent)
event).clearReferenceCount(BlockingPendingQueue.class.getName())) {
eventCounter.decreaseEventCount(event);
}
@@ -157,9 +173,34 @@ public abstract class BlockingPendingQueue<E extends
Event> {
return eventCounter.getPipeHeartbeatEventCount();
}
- protected void checkBeforeOffer(final E event) {
- if (isClosed.get() && event instanceof EnrichedEvent) {
+ protected boolean checkBeforeOffer(final E event) {
+ final boolean shouldReject = isClosed.get() ||
isEventFromDroppedPipe(event);
+ if (shouldReject && event instanceof EnrichedEvent) {
((EnrichedEvent)
event).clearReferenceCount(BlockingPendingQueue.class.getName());
}
+ return !shouldReject;
+ }
+
+ protected static boolean isEventFromPipe(
+ final EnrichedEvent event,
+ final String pipeNameToDrop,
+ final long creationTimeToDrop,
+ final int regionId) {
+ return pipeNameToDrop.equals(event.getPipeName())
+ && creationTimeToDrop == event.getCreationTime()
+ && regionId == event.getRegionId();
+ }
+
+ protected boolean isEventFromDroppedPipe(final E event) {
+ return event instanceof EnrichedEvent
+ && ((EnrichedEvent) event).getPipeName() != null
+ && isPipeDropped(
+ ((EnrichedEvent) event).getPipeName(),
+ ((EnrichedEvent) event).getCreationTime(),
+ ((EnrichedEvent) event).getRegionId());
+ }
+
+ public boolean isPipeDropped(final String pipeName, final long creationTime,
final int regionId) {
+ return droppedPipeTaskKeys.contains(new Triple<>(pipeName, creationTime,
regionId));
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java
new file mode 100644
index 00000000000..275ccb20ea3
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure;
+
+import java.util.Objects;
+
+public class Triple<L, M, R> {
+ public final L first;
+ public final M second;
+ public final R third;
+
+ public Triple(final L first, final M second, final R third) {
+ this.first = first;
+ this.second = second;
+ this.third = third;
+ }
+
+ public L getFirst() {
+ return first;
+ }
+
+ public M getSecond() {
+ return second;
+ }
+
+ public R getThird() {
+ return third;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final Triple<?, ?, ?> triple = (Triple<?, ?, ?>) o;
+ return first.equals(triple.first) && second.equals(triple.second) &&
third.equals(triple.third);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(first, second, third);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index 66f3eda6a1a..2f2d54e7b4f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -139,7 +139,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SIN
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_RATE_LIMIT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_SKIP_IF_KEY;
-public abstract class IoTDBSink implements PipeConnector {
+public abstract class IoTDBSink implements PipeConnector,
PipeConnectorWithEventDiscard {
private static final String PARSE_URL_ERROR_FORMATTER =
"Exception occurred while parsing node urls from target servers: {}";
@@ -621,7 +621,8 @@ public abstract class IoTDBSink implements PipeConnector {
* When a pipe is dropped, the connector maybe reused and will not be
closed. We need to discard
* its batched or queued events in the output pipe connector.
*/
- public synchronized void discardEventsOfPipe(final String pipeName, final
int regionId) {
+ public synchronized void discardEventsOfPipe(
+ final String pipeName, final long creationTime, final int regionId) {
// Do nothing by default
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
new file mode 100644
index 00000000000..ab4dbcf9075
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.protocol;
+
+public interface PipeConnectorWithEventDiscard {
+
+ void discardEventsOfPipe(String pipeName, long creationTime, int regionId);
+}