This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new e3dda34e3aa Subscription: fully managed tsfile parsing process for 
tsfile format topic (#15524) (#15529)
e3dda34e3aa is described below

commit e3dda34e3aa3a829fef0ef98e5c632268cd505aa
Author: VGalaxies <[email protected]>
AuthorDate: Tue May 20 15:35:48 2025 +0800

    Subscription: fully managed tsfile parsing process for tsfile format topic 
(#15524) (#15529)
---
 .../task/builder/PipeDataNodeTaskBuilder.java      | 11 +--------
 .../agent/task/connection/PipeEventCollector.java  |  6 ++++-
 .../broker/SubscriptionPrefetchingTsFileQueue.java |  6 +++++
 .../batch/SubscriptionPipeTsFileEventBatch.java    | 28 ++++++++++++++++++----
 4 files changed, 36 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 4b78fc49c30..5e7901cb7c7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -49,7 +49,6 @@ import java.util.Map;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TABLET_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY;
@@ -146,15 +145,7 @@ public class PipeDataNodeTaskBuilder {
                     Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
                     CONNECTOR_FORMAT_HYBRID_VALUE)
                 .equals(CONNECTOR_FORMAT_TABLET_VALUE),
-            PipeType.SUBSCRIPTION.equals(pipeType)
-                &&
-                // should not skip parsing when the format is tsfile
-                !pipeStaticMeta
-                    .getConnectorParameters()
-                    .getStringOrDefault(
-                        Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
-                        CONNECTOR_FORMAT_HYBRID_VALUE)
-                    .equals(CONNECTOR_FORMAT_TS_FILE_VALUE));
+            PipeType.SUBSCRIPTION.equals(pipeType));
 
     return new PipeDataNodeTask(
         pipeStaticMeta.getPipeName(), regionId, extractorStage, 
processorStage, connectorStage);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index 86a4d00741e..386855220e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -134,7 +134,7 @@ public class PipeEventCollector implements EventCollector {
       return;
     }
 
-    if (!forceTabletFormat && !sourceEvent.shouldParseTimeOrPattern()) {
+    if (!forceTabletFormat && canSkipParsing4TsFileEvent(sourceEvent)) {
       collectEvent(sourceEvent);
       return;
     }
@@ -148,6 +148,10 @@ public class PipeEventCollector implements EventCollector {
     }
   }
 
+  public static boolean canSkipParsing4TsFileEvent(final 
PipeTsFileInsertionEvent sourceEvent) {
+    return !sourceEvent.shouldParseTimeOrPattern();
+  }
+
   private void collectParsedRawTableEvent(final PipeRawTabletInsertionEvent 
parsedEvent) {
     if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
       hasNoGeneratedEvent = false;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 1b29d1d97c3..6217af0123b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.subscription.broker;
 
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -245,6 +247,10 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
 
   @Override
   protected boolean onEvent(final TsFileInsertionEvent event) {
+    if 
(!PipeEventCollector.canSkipParsing4TsFileEvent((PipeTsFileInsertionEvent) 
event)) {
+      return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
+    }
+
     final SubscriptionCommitContext commitContext = 
generateSubscriptionCommitContext();
     final SubscriptionEvent ev =
         new SubscriptionEvent(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 47be439912b..dae3655ca7a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.subscription.event.batch;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFileBatchEvents;
@@ -81,10 +82,29 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
 
   @Override
   protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
-    LOGGER.warn(
-        "SubscriptionPipeTsFileEventBatch {} ignore TsFileInsertionEvent {} 
when batching.",
-        this,
-        event);
+    // TODO: parse tsfile event on the fly like 
SubscriptionPipeTabletEventBatch
+    try {
+      for (final TabletInsertionEvent parsedEvent : 
event.toTabletInsertionEvents()) {
+        if (!((PipeRawTabletInsertionEvent) parsedEvent)
+            .increaseReferenceCount(this.getClass().getName())) {
+          LOGGER.warn(
+              "SubscriptionPipeTsFileEventBatch: Failed to increase the 
reference count of event {}, skipping it.",
+              ((PipeRawTabletInsertionEvent) parsedEvent).coreReportMessage());
+        } else {
+          try {
+            batch.onEvent(parsedEvent);
+          } catch (final Exception ignored) {
+            // no exceptions will be thrown
+          }
+        }
+      }
+    } finally {
+      try {
+        event.close();
+      } catch (final Exception ignored) {
+        // no exceptions will be thrown
+      }
+    }
   }
 
   @Override

Reply via email to