This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new e3dda34e3aa Subscription: fully managed tsfile parsing process for
tsfile format topic (#15524) (#15529)
e3dda34e3aa is described below
commit e3dda34e3aa3a829fef0ef98e5c632268cd505aa
Author: VGalaxies <[email protected]>
AuthorDate: Tue May 20 15:35:48 2025 +0800
Subscription: fully managed tsfile parsing process for tsfile format topic
(#15524) (#15529)
---
.../task/builder/PipeDataNodeTaskBuilder.java | 11 +--------
.../agent/task/connection/PipeEventCollector.java | 6 ++++-
.../broker/SubscriptionPrefetchingTsFileQueue.java | 6 +++++
.../batch/SubscriptionPipeTsFileEventBatch.java | 28 ++++++++++++++++++----
4 files changed, 36 insertions(+), 15 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 4b78fc49c30..5e7901cb7c7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -49,7 +49,6 @@ import java.util.Map;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TABLET_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY;
@@ -146,15 +145,7 @@ public class PipeDataNodeTaskBuilder {
Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
CONNECTOR_FORMAT_HYBRID_VALUE)
.equals(CONNECTOR_FORMAT_TABLET_VALUE),
- PipeType.SUBSCRIPTION.equals(pipeType)
- &&
- // should not skip parsing when the format is tsfile
- !pipeStaticMeta
- .getConnectorParameters()
- .getStringOrDefault(
- Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
- CONNECTOR_FORMAT_HYBRID_VALUE)
- .equals(CONNECTOR_FORMAT_TS_FILE_VALUE));
+ PipeType.SUBSCRIPTION.equals(pipeType));
return new PipeDataNodeTask(
pipeStaticMeta.getPipeName(), regionId, extractorStage,
processorStage, connectorStage);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index 86a4d00741e..386855220e1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -134,7 +134,7 @@ public class PipeEventCollector implements EventCollector {
return;
}
- if (!forceTabletFormat && !sourceEvent.shouldParseTimeOrPattern()) {
+ if (!forceTabletFormat && canSkipParsing4TsFileEvent(sourceEvent)) {
collectEvent(sourceEvent);
return;
}
@@ -148,6 +148,10 @@ public class PipeEventCollector implements EventCollector {
}
}
+ public static boolean canSkipParsing4TsFileEvent(final
PipeTsFileInsertionEvent sourceEvent) {
+ return !sourceEvent.shouldParseTimeOrPattern();
+ }
+
private void collectParsedRawTableEvent(final PipeRawTabletInsertionEvent
parsedEvent) {
if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
hasNoGeneratedEvent = false;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 1b29d1d97c3..6217af0123b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.subscription.broker;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -245,6 +247,10 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
@Override
protected boolean onEvent(final TsFileInsertionEvent event) {
+ if
(!PipeEventCollector.canSkipParsing4TsFileEvent((PipeTsFileInsertionEvent)
event)) {
+ return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
+ }
+
final SubscriptionCommitContext commitContext =
generateSubscriptionCommitContext();
final SubscriptionEvent ev =
new SubscriptionEvent(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 47be439912b..dae3655ca7a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.subscription.event.batch;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFileBatchEvents;
@@ -81,10 +82,29 @@ public class SubscriptionPipeTsFileEventBatch extends
SubscriptionPipeEventBatch
@Override
protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
- LOGGER.warn(
- "SubscriptionPipeTsFileEventBatch {} ignore TsFileInsertionEvent {}
when batching.",
- this,
- event);
+ // TODO: parse tsfile event on the fly like
SubscriptionPipeTabletEventBatch
+ try {
+ for (final TabletInsertionEvent parsedEvent :
event.toTabletInsertionEvents()) {
+ if (!((PipeRawTabletInsertionEvent) parsedEvent)
+ .increaseReferenceCount(this.getClass().getName())) {
+ LOGGER.warn(
+ "SubscriptionPipeTsFileEventBatch: Failed to increase the
reference count of event {}, skipping it.",
+ ((PipeRawTabletInsertionEvent) parsedEvent).coreReportMessage());
+ } else {
+ try {
+ batch.onEvent(parsedEvent);
+ } catch (final Exception ignored) {
+ // no exceptions will be thrown
+ }
+ }
+ }
+ } finally {
+ try {
+ event.close();
+ } catch (final Exception ignored) {
+ // no exceptions will be thrown
+ }
+ }
}
@Override