This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 632d87e2dbc Subscription: fully managed tsfile parsing process for
tsfile format topic (#15524)
632d87e2dbc is described below
commit 632d87e2dbc1b35a4583a0dfee5a5841c33be4ee
Author: VGalaxies <[email protected]>
AuthorDate: Mon May 19 16:05:40 2025 +0800
Subscription: fully managed tsfile parsing process for tsfile format topic
(#15524)
---
.../task/builder/PipeDataNodeTaskBuilder.java | 11 +--------
.../agent/task/connection/PipeEventCollector.java | 2 +-
.../broker/SubscriptionPrefetchingTsFileQueue.java | 6 +++++
.../batch/SubscriptionPipeTsFileEventBatch.java | 28 ++++++++++++++++++----
4 files changed, 32 insertions(+), 15 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 6bcea653d64..30c48ef5b7e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -51,7 +51,6 @@ import java.util.Map;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TABLET_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY;
@@ -152,15 +151,7 @@ public class PipeDataNodeTaskBuilder {
Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
CONNECTOR_FORMAT_HYBRID_VALUE)
.equals(CONNECTOR_FORMAT_TABLET_VALUE),
- PipeType.SUBSCRIPTION.equals(pipeType)
- &&
- // should not skip parsing when the format is tsfile
- !pipeStaticMeta
- .getConnectorParameters()
- .getStringOrDefault(
- Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
- CONNECTOR_FORMAT_HYBRID_VALUE)
- .equals(CONNECTOR_FORMAT_TS_FILE_VALUE));
+ PipeType.SUBSCRIPTION.equals(pipeType));
return new PipeDataNodeTask(
pipeStaticMeta.getPipeName(), regionId, extractorStage,
processorStage, connectorStage);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index 3bc4553c852..e385a8d1037 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -149,7 +149,7 @@ public class PipeEventCollector implements EventCollector {
}
}
- private boolean canSkipParsing4TsFileEvent(final PipeTsFileInsertionEvent
sourceEvent) {
+ public static boolean canSkipParsing4TsFileEvent(final
PipeTsFileInsertionEvent sourceEvent) {
return !sourceEvent.shouldParseTimeOrPattern()
|| (sourceEvent.isTableModelEvent()
&& (sourceEvent.getTablePattern() == null
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 14e8c27902c..49ac3f6b8dd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.subscription.broker;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -245,6 +247,10 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
@Override
protected boolean onEvent(final TsFileInsertionEvent event) {
+ if
(!PipeEventCollector.canSkipParsing4TsFileEvent((PipeTsFileInsertionEvent)
event)) {
+ return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
+ }
+
final SubscriptionCommitContext commitContext =
generateSubscriptionCommitContext();
final SubscriptionEvent ev =
new SubscriptionEvent(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 6f56ed3887b..7902a069aa0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.subscription.event.batch;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFileBatchEvents;
@@ -82,10 +83,29 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
@Override
protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
- LOGGER.warn(
- "SubscriptionPipeTsFileEventBatch {} ignore TsFileInsertionEvent {}
when batching.",
- this,
- event);
+ // TODO: parse tsfile event on the fly like
SubscriptionPipeTabletEventBatch
+ try {
+ for (final TabletInsertionEvent parsedEvent :
event.toTabletInsertionEvents()) {
+ if (!((PipeRawTabletInsertionEvent) parsedEvent)
+ .increaseReferenceCount(this.getClass().getName())) {
+ LOGGER.warn(
+ "SubscriptionPipeTsFileEventBatch: Failed to increase the
reference count of event {}, skipping it.",
+ ((PipeRawTabletInsertionEvent) parsedEvent).coreReportMessage());
+ } else {
+ try {
+ batch.onEvent(parsedEvent);
+ } catch (final Exception ignored) {
+ // no exceptions will be thrown
+ }
+ }
+ }
+ } finally {
+ try {
+ event.close();
+ } catch (final Exception ignored) {
+ // no exceptions will be thrown
+ }
+ }
}
@Override