This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.4
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6de423bd05bbfbfefbd5687a036fc1338bf4d45f
Author: VGalaxies <[email protected]>
AuthorDate: Mon May 19 16:05:40 2025 +0800

    Subscription: fully managed tsfile parsing process for tsfile format topic 
(#15524)
    
    (cherry picked from commit 632d87e2dbc1b35a4583a0dfee5a5841c33be4ee)
---
 .../task/builder/PipeDataNodeTaskBuilder.java      | 11 +--------
 .../agent/task/connection/PipeEventCollector.java  |  2 +-
 .../broker/SubscriptionPrefetchingTsFileQueue.java |  6 +++++
 .../batch/SubscriptionPipeTsFileEventBatch.java    | 28 ++++++++++++++++++----
 4 files changed, 32 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 5be9dc8ab64..c8da29ae11d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -49,7 +49,6 @@ import java.util.Map;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TABLET_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY;
@@ -149,15 +148,7 @@ public class PipeDataNodeTaskBuilder {
                     Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
                     CONNECTOR_FORMAT_HYBRID_VALUE)
                 .equals(CONNECTOR_FORMAT_TABLET_VALUE),
-            PipeType.SUBSCRIPTION.equals(pipeType)
-                &&
-                // should not skip parsing when the format is tsfile
-                !pipeStaticMeta
-                    .getConnectorParameters()
-                    .getStringOrDefault(
-                        Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
-                        CONNECTOR_FORMAT_HYBRID_VALUE)
-                    .equals(CONNECTOR_FORMAT_TS_FILE_VALUE));
+            PipeType.SUBSCRIPTION.equals(pipeType));
 
     return new PipeDataNodeTask(
         pipeStaticMeta.getPipeName(), regionId, extractorStage, 
processorStage, connectorStage);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index 3bc4553c852..e385a8d1037 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -149,7 +149,7 @@ public class PipeEventCollector implements EventCollector {
     }
   }
 
-  private boolean canSkipParsing4TsFileEvent(final PipeTsFileInsertionEvent 
sourceEvent) {
+  public static boolean canSkipParsing4TsFileEvent(final 
PipeTsFileInsertionEvent sourceEvent) {
     return !sourceEvent.shouldParseTimeOrPattern()
         || (sourceEvent.isTableModelEvent()
             && (sourceEvent.getTablePattern() == null
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 14e8c27902c..49ac3f6b8dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.subscription.broker;
 
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -245,6 +247,10 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
 
   @Override
   protected boolean onEvent(final TsFileInsertionEvent event) {
+    if 
(!PipeEventCollector.canSkipParsing4TsFileEvent((PipeTsFileInsertionEvent) 
event)) {
+      return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
+    }
+
     final SubscriptionCommitContext commitContext = 
generateSubscriptionCommitContext();
     final SubscriptionEvent ev =
         new SubscriptionEvent(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 6f56ed3887b..7902a069aa0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.subscription.event.batch;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFileBatchEvents;
@@ -82,10 +83,29 @@ public class SubscriptionPipeTsFileEventBatch extends 
SubscriptionPipeEventBatch
 
   @Override
   protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
-    LOGGER.warn(
-        "SubscriptionPipeTsFileEventBatch {} ignore TsFileInsertionEvent {} 
when batching.",
-        this,
-        event);
+    // TODO: parse tsfile event on the fly like 
SubscriptionPipeTabletEventBatch
+    try {
+      for (final TabletInsertionEvent parsedEvent : 
event.toTabletInsertionEvents()) {
+        if (!((PipeRawTabletInsertionEvent) parsedEvent)
+            .increaseReferenceCount(this.getClass().getName())) {
+          LOGGER.warn(
+              "SubscriptionPipeTsFileEventBatch: Failed to increase the 
reference count of event {}, skipping it.",
+              ((PipeRawTabletInsertionEvent) parsedEvent).coreReportMessage());
+        } else {
+          try {
+            batch.onEvent(parsedEvent);
+          } catch (final Exception ignored) {
+            // no exceptions will be thrown
+          }
+        }
+      }
+    } finally {
+      try {
+        event.close();
+      } catch (final Exception ignored) {
+        // no exceptions will be thrown
+      }
+    }
   }
 
   @Override

Reply via email to