This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.4 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6de423bd05bbfbfefbd5687a036fc1338bf4d45f Author: VGalaxies <[email protected]> AuthorDate: Mon May 19 16:05:40 2025 +0800 Subscription: fully managed tsfile parsing process for tsfile format topic (#15524) (cherry picked from commit 632d87e2dbc1b35a4583a0dfee5a5841c33be4ee) --- .../task/builder/PipeDataNodeTaskBuilder.java | 11 +-------- .../agent/task/connection/PipeEventCollector.java | 2 +- .../broker/SubscriptionPrefetchingTsFileQueue.java | 6 +++++ .../batch/SubscriptionPipeTsFileEventBatch.java | 28 ++++++++++++++++++---- 4 files changed, 32 insertions(+), 15 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java index 5be9dc8ab64..c8da29ae11d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java @@ -49,7 +49,6 @@ import java.util.Map; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TABLET_VALUE; -import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY; @@ -149,15 +148,7 @@ public class PipeDataNodeTaskBuilder { Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY), CONNECTOR_FORMAT_HYBRID_VALUE) .equals(CONNECTOR_FORMAT_TABLET_VALUE), - PipeType.SUBSCRIPTION.equals(pipeType) - && - // should not skip parsing when the format is tsfile - !pipeStaticMeta - .getConnectorParameters() - .getStringOrDefault( - Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY), - CONNECTOR_FORMAT_HYBRID_VALUE) - .equals(CONNECTOR_FORMAT_TS_FILE_VALUE)); + PipeType.SUBSCRIPTION.equals(pipeType)); return new PipeDataNodeTask( pipeStaticMeta.getPipeName(), regionId, extractorStage, processorStage, connectorStage); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java index 3bc4553c852..e385a8d1037 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java @@ -149,7 +149,7 @@ public class PipeEventCollector implements EventCollector { } } - private boolean canSkipParsing4TsFileEvent(final PipeTsFileInsertionEvent sourceEvent) { + public static boolean canSkipParsing4TsFileEvent(final PipeTsFileInsertionEvent sourceEvent) { return !sourceEvent.shouldParseTimeOrPattern() || (sourceEvent.isTableModelEvent() && (sourceEvent.getTablePattern() == null diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java index 14e8c27902c..49ac3f6b8dd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java @@ -19,7 +19,9 @@ package org.apache.iotdb.db.subscription.broker; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; +import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; @@ -245,6 +247,10 @@ public class SubscriptionPrefetchingTsFileQueue extends SubscriptionPrefetchingQ @Override protected boolean onEvent(final TsFileInsertionEvent event) { + if (!PipeEventCollector.canSkipParsing4TsFileEvent((PipeTsFileInsertionEvent) event)) { + return batches.onEvent((EnrichedEvent) event, this::prefetchEvent); + } + final SubscriptionCommitContext commitContext = generateSubscriptionCommitContext(); final SubscriptionEvent ev = new SubscriptionEvent( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java index 6f56ed3887b..7902a069aa0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.subscription.event.batch; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFileBatchEvents; @@ -82,10 +83,29 @@ public class SubscriptionPipeTsFileEventBatch extends SubscriptionPipeEventBatch @Override protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) { - LOGGER.warn( - "SubscriptionPipeTsFileEventBatch {} ignore TsFileInsertionEvent {} when batching.", - this, - event); + // TODO: parse tsfile event on the fly like SubscriptionPipeTabletEventBatch + try { + for (final TabletInsertionEvent parsedEvent : event.toTabletInsertionEvents()) { + if (!((PipeRawTabletInsertionEvent) parsedEvent) + .increaseReferenceCount(this.getClass().getName())) { + LOGGER.warn( + "SubscriptionPipeTsFileEventBatch: Failed to increase the reference count of event {}, skipping it.", + ((PipeRawTabletInsertionEvent) parsedEvent).coreReportMessage()); + } else { + try { + batch.onEvent(parsedEvent); + } catch (final Exception ignored) { + // no exceptions will be thrown + } + } + } + } finally { + try { + event.close(); + } catch (final Exception ignored) { + // no exceptions will be thrown + } + } } @Override
