This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch pipe-serialize-sink-by-region in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3789288816534be05e68810fe3d7abb8d61be655 Author: Caideyipi <[email protected]> AuthorDate: Tue Jun 16 09:51:02 2026 +0800 Pipe: serialize sink transfers by region --- .../manual/enhanced/IoTDBPipeSinkParallelIT.java | 1 + .../auto/basic/IoTDBPipeSinkParallelIT.java | 1 + .../task/subtask/sink/PipeSinkSubtaskManager.java | 46 ++++++++++----- .../thrift/async/IoTDBDataRegionAsyncSink.java | 49 +++++++++++++++- .../apache/iotdb/db/pipe/sink/PipeSinkTest.java | 65 ++++++++++++++++++++++ .../pipe/config/constant/PipeSinkConstant.java | 4 ++ .../commons/pipe/sink/protocol/IoTDBSink.java | 11 ++++ 7 files changed, 159 insertions(+), 18 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkParallelIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkParallelIT.java index 85f3b93aa76..ae466349d72 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkParallelIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkParallelIT.java @@ -80,6 +80,7 @@ public class IoTDBPipeSinkParallelIT extends AbstractPipeTableModelDualManualIT connectorAttributes.put("connector.batch.enable", "true"); connectorAttributes.put("connector.ip", receiverIp); connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + connectorAttributes.put("connector.serialize-by-region", "false"); connectorAttributes.put("connector.parallel.tasks", "3"); final TSStatus status = diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSinkParallelIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSinkParallelIT.java index c21e6456ef1..df8e27a8546 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSinkParallelIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSinkParallelIT.java @@ -71,6 +71,7 @@ public class IoTDBPipeSinkParallelIT extends AbstractPipeDualTreeModelAutoIT { sinkAttributes.put("sink.batch.enable", "false"); sinkAttributes.put("sink.ip", receiverIp); sinkAttributes.put("sink.port", Integer.toString(receiverPort)); + sinkAttributes.put("sink.serialize-by-region", "false"); sinkAttributes.put("sink.parallel.tasks", "3"); final TSStatus status = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java index 01552eec5ae..a82e9432c09 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java @@ -93,30 +93,42 @@ public class PipeSinkSubtaskManager { final int sinkNum; boolean realTimeFirst = false; + boolean serializeByRegion = false; String attributeSortedString = generateAttributeSortedString(pipeSinkParameters); final String attributeDisplayString = generateAttributeDisplayString(pipeSinkParameters); if (isDataRegionSink) { - sinkNum = - pipeSinkParameters.getIntOrDefault( + serializeByRegion = + pipeSinkParameters.getBooleanOrDefault( Arrays.asList( - PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, - PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY), - PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains( - pipeSinkParameters - .getStringOrDefault( - Arrays.asList( - PipeSinkConstant.CONNECTOR_KEY, PipeSinkConstant.SINK_KEY), - BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName()) - .toLowerCase()) - ? 1 - : PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE); + PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_KEY, + PipeSinkConstant.SINK_SERIALIZE_BY_REGION_KEY), + PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_DEFAULT_VALUE); + sinkNum = + serializeByRegion + ? 1 + : pipeSinkParameters.getIntOrDefault( + Arrays.asList( + PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, + PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY), + PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains( + pipeSinkParameters + .getStringOrDefault( + Arrays.asList( + PipeSinkConstant.CONNECTOR_KEY, PipeSinkConstant.SINK_KEY), + BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName()) + .toLowerCase()) + ? 1 + : PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE); realTimeFirst = pipeSinkParameters.getBooleanOrDefault( Arrays.asList( PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, PipeSinkConstant.SINK_REALTIME_FIRST_KEY), PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE); - attributeSortedString = "data_" + attributeSortedString; + attributeSortedString = + serializeByRegion + ? "data_region_" + environment.getRegionId() + "_" + attributeSortedString + : "data_" + attributeSortedString; } else { // Do not allow parallel tasks for schema region connectors // to avoid the potential disorder of the schema region data transfer @@ -124,7 +136,11 @@ public class PipeSinkSubtaskManager { attributeSortedString = "schema_" + attributeSortedString; } final String attributeDisplayStringWithPrefix = - isDataRegionSink ? "data_" + attributeDisplayString : "schema_" + attributeDisplayString; + isDataRegionSink + ? serializeByRegion + ? "data_region_" + environment.getRegionId() + "_" + attributeDisplayString + : "data_" + attributeDisplayString + : "schema_" + attributeDisplayString; environment.setAttributeSortedString(attributeDisplayStringWithPrefix); if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index b8b169b1f6a..4db4942139b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -129,6 +129,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { private final AtomicBoolean isClosed = new AtomicBoolean(false); private final Map<PipeTransferTrackableHandler, PipeTransferTrackableHandler> pendingHandlers = new ConcurrentHashMap<>(); + private final Object pendingHandlersMonitor = new Object(); private final Set<CommitterKey> droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); @@ -207,6 +208,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { @Override public void heartbeat() throws Exception { + waitForNoPendingHandlerIfNecessary(); if (!isClosed()) { syncSink.heartbeat(); } @@ -214,6 +216,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { + waitForNoPendingHandlerIfNecessary(); transferQueuedEventsIfNecessary(false); if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) @@ -346,6 +349,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { final PipeTransferTabletBatchEventHandler pipeTransferTabletBatchEventHandler) { AsyncPipeDataTransferServiceClient client = null; try { + waitForNoPendingHandlerIfNecessary(); client = clientManager.borrowClient(endPoint); pipeTransferTabletBatchEventHandler.transfer(client); } catch (final Exception ex) { @@ -359,6 +363,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { final PipeTransferTabletInsertNodeEventHandler pipeTransferInsertNodeReqHandler) { AsyncPipeDataTransferServiceClient client = null; try { + waitForNoPendingHandlerIfNecessary(); client = clientManager.borrowClient(deviceId); pipeTransferInsertNodeReqHandler.transfer(client); } catch (final Exception ex) { @@ -371,6 +376,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { final String deviceId, final PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler) { AsyncPipeDataTransferServiceClient client = null; try { + waitForNoPendingHandlerIfNecessary(); client = clientManager.borrowClient(deviceId); pipeTransferTabletReqHandler.transfer(client); } catch (final Exception ex) { @@ -381,6 +387,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { @Override public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exception { + waitForNoPendingHandlerIfNecessary(); transferQueuedEventsIfNecessary(false); transferBatchedEventsIfNecessary(); @@ -442,6 +449,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { } private void transfer(final PipeTransferTsFileHandler pipeTransferTsFileHandler) { + waitForNoPendingHandlerIfNecessary(); transferTsFileCounter.incrementAndGet(); CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync( @@ -460,7 +468,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { }, transferTsFileClientManager.getExecutor()); - if (PipeConfig.getInstance().isTransferTsFileSync()) { + if (isSerializeByRegion || PipeConfig.getInstance().isTransferTsFileSync()) { try { completableFuture.get(); } catch (final Exception e) { @@ -493,6 +501,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { @Override public void transfer(final Event event) throws Exception { + waitForNoPendingHandlerIfNecessary(); transferQueuedEventsIfNecessary(true); transferBatchedEventsIfNecessary(); @@ -506,6 +515,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { return; } + waitForNoPendingHandlerIfNecessary(); syncSink.transfer(event); } @@ -620,6 +630,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { } } + waitForNoPendingHandlerIfNecessary(); + // Stop retrying if the execution time exceeds the threshold for better realtime performance if (System.currentTimeMillis() - retryStartTime > PipeConfig.getInstance().getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall()) { @@ -798,6 +810,9 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { // synchronized to avoid close connector when transfer event public synchronized void close() { isClosed.set(true); + synchronized (pendingHandlersMonitor) { + pendingHandlersMonitor.notifyAll(); + } syncSink.close(); @@ -866,7 +881,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { } public void trackHandler(final PipeTransferTrackableHandler handler) { - pendingHandlers.put(handler, handler); + synchronized (pendingHandlersMonitor) { + pendingHandlers.put(handler, handler); + pendingHandlersMonitor.notifyAll(); + } } public void eliminateHandler( @@ -875,13 +893,38 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { handler.closeClient(); } handler.close(); - pendingHandlers.remove(handler); + synchronized (pendingHandlersMonitor) { + pendingHandlers.remove(handler); + pendingHandlersMonitor.notifyAll(); + } } public boolean hasPendingHandlers() { return !pendingHandlers.isEmpty(); } + public boolean isSerializeByRegion() { + return isSerializeByRegion; + } + + private void waitForNoPendingHandlerIfNecessary() { + if (!isSerializeByRegion) { + return; + } + + synchronized (pendingHandlersMonitor) { + while (!isClosed.get() && !pendingHandlers.isEmpty()) { + try { + pendingHandlersMonitor.wait(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PipeException( + "Interrupted when waiting for previous async sink transfer to finish.", e); + } + } + } + } + public void setTransferTsFileCounter(AtomicInteger transferTsFileCounter) { this.transferTsFileCounter = transferTsFileCounter; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java index cf311639ee9..00d2a40aa0f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java @@ -109,6 +109,71 @@ public class PipeSinkTest { } } + @Test + public void testAsyncSinkSerializeByRegionConfig() throws Exception { + try (IoTDBDataRegionAsyncSink connector = new IoTDBDataRegionAsyncSink()) { + final PipeParameters parameters = + new PipeParameters( + new HashMap<String, String>() { + { + put( + PipeSinkConstant.CONNECTOR_KEY, + BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName()); + put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6668"); + } + }); + + connector.validate(new PipeParameterValidator(parameters)); + connector.customize( + parameters, + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1))); + + Assert.assertTrue(connector.isSerializeByRegion()); + } + + try (IoTDBDataRegionAsyncSink connector = new IoTDBDataRegionAsyncSink()) { + final PipeParameters parameters = + new PipeParameters( + new HashMap<String, String>() { + { + put( + PipeSinkConstant.CONNECTOR_KEY, + BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName()); + put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6668"); + put(PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_KEY, "false"); + } + }); + + connector.validate(new PipeParameterValidator(parameters)); + connector.customize( + parameters, + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1))); + + Assert.assertFalse(connector.isSerializeByRegion()); + } + + try (IoTDBDataRegionAsyncSink connector = new IoTDBDataRegionAsyncSink()) { + final PipeParameters parameters = + new PipeParameters( + new HashMap<String, String>() { + { + put( + PipeSinkConstant.CONNECTOR_KEY, + BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName()); + put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6668"); + put(PipeSinkConstant.SINK_SERIALIZE_BY_REGION_KEY, "false"); + } + }); + + connector.validate(new PipeParameterValidator(parameters)); + connector.customize( + parameters, + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1))); + + Assert.assertFalse(connector.isSerializeByRegion()); + } + } + @Test public void testAsyncSinkDropDoesNotRequeueDroppedPipeEvents() throws Exception { try (final IoTDBDataRegionAsyncSink connector = new IoTDBDataRegionAsyncSink()) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java index 058b17e2f4f..4fd0f1e82b8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java @@ -69,6 +69,10 @@ public class PipeSinkConstant { public static final String SINK_REALTIME_FIRST_KEY = "sink.realtime-first"; public static final boolean CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE = true; + public static final String CONNECTOR_SERIALIZE_BY_REGION_KEY = "connector.serialize-by-region"; + public static final String SINK_SERIALIZE_BY_REGION_KEY = "sink.serialize-by-region"; + public static final boolean CONNECTOR_SERIALIZE_BY_REGION_DEFAULT_VALUE = true; + public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = "connector.batch.enable"; public static final String SINK_IOTDB_BATCH_MODE_ENABLE_KEY = "sink.batch.enable"; public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE = true; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java index b5662aeec2c..1ea8410c0e0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java @@ -192,6 +192,7 @@ public abstract class IoTDBSink implements PipeConnector, PipeConnectorWithEvent protected String sinkTaskId; protected Timer compressionTimer; protected boolean isRealtimeFirst; + protected boolean isSerializeByRegion; @Override public void validate(final PipeParameterValidator validator) throws Exception { @@ -498,6 +499,16 @@ public abstract class IoTDBSink implements PipeConnector, PipeConnectorWithEvent PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE); LOGGER.info( "IoTDBSink {} = {}", PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, isRealtimeFirst); + isSerializeByRegion = + parameters.getBooleanOrDefault( + Arrays.asList( + PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_KEY, + PipeSinkConstant.SINK_SERIALIZE_BY_REGION_KEY), + PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_DEFAULT_VALUE); + LOGGER.info( + "IoTDBSink {} = {}", + PipeSinkConstant.CONNECTOR_SERIALIZE_BY_REGION_KEY, + isSerializeByRegion); } protected LinkedHashSet<TEndPoint> parseNodeUrls(final PipeParameters parameters)
