This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch drop-pipe-fix in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 543efacc830755c006836575e65fb2e1ab7ed09d Author: Caideyipi <[email protected]> AuthorDate: Mon Apr 27 12:22:13 2026 +0800 wd --- .../agent/task/subtask/sink/PipeSinkSubtask.java | 5 +- .../websocket/WebSocketConnectorServer.java | 143 +++++++++++++++++---- .../sink/protocol/websocket/WebSocketSink.java | 11 +- .../task/subtask/sink/PipeSinkSubtaskTest.java | 61 +++++++++ .../apache/iotdb/db/pipe/sink/PipeSinkTest.java | 42 ++++++ .../commons/pipe/sink/protocol/IoTDBSink.java | 2 +- .../protocol/PipeConnectorWithEventDiscard.java | 25 ++++ 7 files changed, 259 insertions(+), 30 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index f6008822e61..dd31d5b5fd8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent; @@ -248,8 +249,8 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { decreaseHighPriorityTaskCount(); } - if (outputPipeSink instanceof IoTDBSink) { - ((IoTDBSink) outputPipeSink) + if (outputPipeSink instanceof PipeConnectorWithEventDiscard) { + ((PipeConnectorWithEventDiscard) outputPipeSink) .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java index 491d40a1e45..6b402b27026 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -56,6 +57,7 @@ public class WebSocketConnectorServer extends WebSocketServer { // Map<pipeName, Map<eventId, Tuple<connector, event>>> private final ConcurrentHashMap<String, ConcurrentHashMap<Long, EventWaitingForAck>> eventsWaitingForAck = new ConcurrentHashMap<>(); + private final Set<String> droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); private final BidiMap<String, WebSocket> router = new DualTreeBidiMap<String, WebSocket>(null, Comparator.comparing(Object::hashCode)) {}; @@ -97,13 +99,8 @@ public class WebSocketConnectorServer extends WebSocketServer { eventWrappers = new ArrayList<>(eventTransferQueue); eventTransferQueue.clear(); } - eventWrappers.forEach( - (eventWrapper) -> { - if (eventWrapper.event instanceof EnrichedEvent) { - ((EnrichedEvent) eventWrapper.event) - .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false); - } - }); + eventWrappers.forEach(eventWrapper -> discardEvent(eventWrapper.event)); + eventWrappers.clear(); synchronized (eventTransferQueue) { eventTransferQueue.notifyAll(); } @@ -113,13 +110,35 @@ public class WebSocketConnectorServer extends WebSocketServer { if (eventsWaitingForAck.containsKey(pipeName)) { eventsWaitingForAck .remove(pipeName) - .forEach( - (eventId, eventWrapper) -> { - if (eventWrapper.event instanceof EnrichedEvent) { - ((EnrichedEvent) eventWrapper.event) - .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false); - } - }); + .forEach((eventId, eventWrapper) -> discardEvent(eventWrapper.event)); + } + + droppedPipeTaskKeys.removeIf(key -> key.startsWith(pipeName + "_")); + } + + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + droppedPipeTaskKeys.add(generatePipeTaskKey(pipeNameToDrop, creationTimeToDrop, regionId)); + + final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue = + eventsWaitingForTransfer.get(pipeNameToDrop); + if (eventTransferQueue != null) { + eventTransferQueue.removeIf( + eventWrapper -> + discardIfMatches( + eventWrapper.event, pipeNameToDrop, creationTimeToDrop, regionId)); + synchronized (eventTransferQueue) { + eventTransferQueue.notifyAll(); + } + } + + final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap = + eventsWaitingForAck.get(pipeNameToDrop); + if (eventId2EventMap != null) { + eventId2EventMap.entrySet().removeIf( + entry -> + discardIfMatches( + entry.getValue().event, pipeNameToDrop, creationTimeToDrop, regionId)); } } @@ -300,21 +319,24 @@ public class WebSocketConnectorServer extends WebSocketServer { } public void addEvent(Event event, WebSocketSink connector) { + if (isDroppedPipe(event)) { + discardEvent(event); + return; + } + + final String pipeName = connector.getPipeName(); final PriorityBlockingQueue<EventWaitingForTransfer> queue = - eventsWaitingForTransfer.get(connector.getPipeName()); + eventsWaitingForTransfer.get(pipeName); if (queue == null) { LOGGER.warn("The pipe {} was dropped so the event {} will be dropped.", connector, event); - if (event instanceof EnrichedEvent) { - ((EnrichedEvent) event) - .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false); - } + discardEvent(event); return; } if (queue.size() >= 5) { synchronized (queue) { - while (queue.size() >= 5) { + while (queue.size() >= 5 && isQueueAvailable(pipeName, queue) && !isDroppedPipe(event)) { try { queue.wait(); } catch (InterruptedException e) { @@ -323,15 +345,27 @@ public class WebSocketConnectorServer extends WebSocketServer { } } + if (!isQueueAvailable(pipeName, queue) || isDroppedPipe(event)) { + discardEvent(event); + return; + } + queue.put( new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event)); return; } } + if (!isQueueAvailable(pipeName, queue) || isDroppedPipe(event)) { + discardEvent(event); + return; + } + synchronized (queue) { queue.put(new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event)); } + + queue.put(new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event)); } private class TransferThread extends Thread { @@ -377,6 +411,11 @@ public class WebSocketConnectorServer extends WebSocketServer { final WebSocketSink connector = element.connector; try { + if (isDroppedPipe(event)) { + discardEvent(event); + return; + } + ByteBuffer tabletBuffer; if (event instanceof PipeRawTabletInsertionEvent) { tabletBuffer = ((PipeRawTabletInsertionEvent) event).convertToTablet().serialize(); @@ -387,7 +426,11 @@ public class WebSocketConnectorServer extends WebSocketServer { } if (tabletBuffer == null) { - connector.commit((EnrichedEvent) event); + if (isDroppedPipe(event)) { + discardEvent(event); + } else { + connector.commit((EnrichedEvent) event); + } return; } @@ -398,11 +441,17 @@ public class WebSocketConnectorServer extends WebSocketServer { server.broadcast(payload, Collections.singletonList(router.get(pipeName))); + if (isDroppedPipe(event)) { + discardEvent(event); + return; + } + final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap = eventsWaitingForAck.get(pipeName); if (eventId2EventMap == null) { LOGGER.warn( "The pipe {} was dropped so the event ack {} will be ignored.", pipeName, eventId); + discardEvent(event); return; } eventId2EventMap.put(eventId, new EventWaitingForAck(connector, event)); @@ -410,13 +459,10 @@ public class WebSocketConnectorServer extends WebSocketServer { synchronized (server) { final PriorityBlockingQueue<EventWaitingForTransfer> queue = eventsWaitingForTransfer.get(pipeName); - if (queue == null) { + if (queue == null || isDroppedPipe(event)) { LOGGER.warn( "The pipe {} was dropped so the event {} will be dropped.", pipeName, eventId); - if (event instanceof EnrichedEvent) { - ((EnrichedEvent) event) - .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false); - } + discardEvent(event); return; } @@ -465,4 +511,49 @@ public class WebSocketConnectorServer extends WebSocketServer { this.event = event; } } + + private boolean discardIfMatches( + final Event event, + final String pipeNameToDrop, + final long creationTimeToDrop, + final int regionId) { + if (!(event instanceof EnrichedEvent)) { + return false; + } + + final EnrichedEvent enrichedEvent = (EnrichedEvent) event; + if (!pipeNameToDrop.equals(enrichedEvent.getPipeName()) + || creationTimeToDrop != enrichedEvent.getCreationTime() + || regionId != enrichedEvent.getRegionId()) { + return false; + } + + discardEvent(enrichedEvent); + return true; + } + + private boolean isDroppedPipe(final Event event) { + return event instanceof EnrichedEvent + && droppedPipeTaskKeys.contains( + generatePipeTaskKey( + ((EnrichedEvent) event).getPipeName(), + ((EnrichedEvent) event).getCreationTime(), + ((EnrichedEvent) event).getRegionId())); + } + + private boolean isQueueAvailable( + final String pipeName, final PriorityBlockingQueue<EventWaitingForTransfer> queue) { + return eventsWaitingForTransfer.get(pipeName) == queue; + } + + private static String generatePipeTaskKey( + final String pipeName, final long creationTime, final int regionId) { + return pipeName + "_" + creationTime + "_" + regionId; + } + + private void discardEvent(final Event event) { + if (event instanceof EnrichedEvent) { + ((EnrichedEvent) event).clearReferenceCount(WebSocketSink.class.getName()); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java index 4e593e673fd..dadeee8053d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.websocket; +import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; @@ -41,7 +42,7 @@ import java.util.Arrays; import java.util.Optional; @TreeModel -public class WebSocketSink implements PipeConnector { +public class WebSocketSink implements PipeConnector, PipeConnectorWithEventDiscard { private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketSink.class); @@ -166,6 +167,14 @@ public class WebSocketSink implements PipeConnector { } } + @Override + public void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + if (server != null) { + server.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + } + } + public void commit(EnrichedEvent enrichedEvent) { Optional.ofNullable(enrichedEvent) .ifPresent(event -> event.decreaseReferenceCount(WebSocketSink.class.getName(), true)); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java new file mode 100644 index 00000000000..ddfc699721b --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.agent.task.subtask.sink; + +import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; +import org.apache.iotdb.pipe.api.PipeConnector; + +import org.junit.Test; +import org.mockito.Mockito; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.withSettings; + +public class PipeSinkSubtaskTest { + + @Test + public void testDiscardEventsOfPipeDelegatesToConnector() { + final PipeConnector connector = + mock( + PipeConnector.class, + withSettings().extraInterfaces(PipeConnectorWithEventDiscard.class)); + final UnboundedBlockingPendingQueue<?> pendingQueue = mock(UnboundedBlockingPendingQueue.class); + + final PipeSinkSubtask subtask = + Mockito.spy( + new PipeSinkSubtask( + "PipeSinkSubtaskTest", + System.currentTimeMillis(), + "data_test", + 0, + (UnboundedBlockingPendingQueue) pendingQueue, + connector)); + + try { + subtask.discardEventsOfPipe("pipe", 1L, 1); + + verify((PipeConnectorWithEventDiscard) connector).discardEventsOfPipe("pipe", 1L, 1); + } finally { + subtask.close(); + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java index 7dfd0446038..3ad262130ff 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java @@ -29,6 +29,8 @@ import org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink; import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink; +import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketConnectorServer; +import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketSink; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -39,6 +41,7 @@ import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.security.SecureRandom; import java.util.Arrays; @@ -146,6 +149,45 @@ public class PipeSinkTest { } } + @Test + public void testWebSocketSinkDropDoesNotRequeueDroppedPipeEvents() { + final String pipeName = "pipe_" + System.nanoTime(); + final WebSocketConnectorServer server = WebSocketConnectorServer.getOrCreateInstance(0); + final WebSocketSink connector = Mockito.mock(WebSocketSink.class); + Mockito.when(connector.getPipeName()).thenReturn(pipeName); + + server.register(connector); + try { + final PipeRawTabletInsertionEvent droppedEvent = + createPipeRawTabletInsertionEvent(pipeName, 1L, 1); + droppedEvent.increaseReferenceCount(WebSocketSink.class.getName()); + droppedEvent.setCommitterKeyAndCommitId(new CommitterKey(pipeName, 1L, 1, -1), 1L); + server.addEvent(droppedEvent, connector); + + server.discardEventsOfPipe(pipeName, 1L, 1); + Assert.assertTrue(droppedEvent.isReleased()); + + final PipeRawTabletInsertionEvent recreatedDroppedPipeEvent = + createPipeRawTabletInsertionEvent(pipeName, 1L, 1); + recreatedDroppedPipeEvent.increaseReferenceCount(WebSocketSink.class.getName()); + recreatedDroppedPipeEvent.setCommitterKeyAndCommitId( + new CommitterKey(pipeName, 1L, 1, -1), 2L); + server.addEvent(recreatedDroppedPipeEvent, connector); + + Assert.assertTrue(recreatedDroppedPipeEvent.isReleased()); + + final PipeRawTabletInsertionEvent recreatedPipeEvent = + createPipeRawTabletInsertionEvent(pipeName, 2L, 1); + recreatedPipeEvent.increaseReferenceCount(WebSocketSink.class.getName()); + recreatedPipeEvent.setCommitterKeyAndCommitId(new CommitterKey(pipeName, 2L, 1, -1), 3L); + server.addEvent(recreatedPipeEvent, connector); + + Assert.assertFalse(recreatedPipeEvent.isReleased()); + } finally { + server.unregister(connector); + } + } + @Test public void testOpcUaSink() { final List<IMeasurementSchema> schemaList = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java index 7de06376b6d..ebb3d90bc8c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java @@ -148,7 +148,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SIN @TreeModel @TableModel -public abstract class IoTDBSink implements PipeConnector { +public abstract class IoTDBSink implements PipeConnector, PipeConnectorWithEventDiscard { private static final String PARSE_URL_ERROR_FORMATTER = "Exception occurred while parsing node urls from target servers: {}"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java new file mode 100644 index 00000000000..ab4dbcf9075 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.sink.protocol; + +public interface PipeConnectorWithEventDiscard { + + void discardEventsOfPipe(String pipeName, long creationTime, int regionId); +}
