This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch time-opti
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f597a46788ebdf9b74ed2b49d87e41148fb48410
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 7 10:39:30 2026 +0800

    fix
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  6 ++--
 .../agent/task/builder/PipeDataNodeBuilder.java    |  8 ++----
 .../agent/task/stage/PipeTaskProcessorStage.java   |  2 +-
 .../pipe/agent/task/stage/PipeTaskSourceStage.java |  2 +-
 .../source/dataregion/IoTDBDataRegionSource.java   | 32 ++++++++++++++--------
 5 files changed, 26 insertions(+), 24 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 06d6a512b30..cce25e7d2ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -170,13 +170,11 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       final PipeParameters sourceParameters = 
pipeStaticMeta.getSourceParameters();
       final DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
       final boolean needConstructDataRegionTask =
-          
StorageEngine.getInstance().getAllDataRegionIds().contains(dataRegionId)
+          StorageEngine.getInstance().getDataRegion(dataRegionId) != null
               && DataRegionListeningFilter.shouldDataRegionBeListened(
                   sourceParameters, dataRegionId);
       final boolean needConstructSchemaRegionTask =
-          SchemaEngine.getInstance()
-                  .getAllSchemaRegionIds()
-                  .contains(new SchemaRegionId(consensusGroupId))
+          SchemaEngine.getInstance().getSchemaRegion(new 
SchemaRegionId(consensusGroupId)) != null
               && SchemaRegionListeningFilter.shouldSchemaRegionBeListened(
                   consensusGroupId, sourceParameters);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
index 46a10135d88..f4655c7ddc7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
@@ -36,7 +36,6 @@ import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 public class PipeDataNodeBuilder {
@@ -53,9 +52,6 @@ public class PipeDataNodeBuilder {
     final PipeStaticMeta pipeStaticMeta = pipeMeta.getStaticMeta();
     final PipeRuntimeMeta pipeRuntimeMeta = pipeMeta.getRuntimeMeta();
 
-    final List<DataRegionId> dataRegionIds = 
StorageEngine.getInstance().getAllDataRegionIds();
-    final List<SchemaRegionId> schemaRegionIds = 
SchemaEngine.getInstance().getAllSchemaRegionIds();
-
     final Map<Integer, PipeTask> consensusGroupIdToPipeTaskMap = new 
HashMap<>();
     for (Map.Entry<Integer, PipeTaskMeta> consensusGroupIdToPipeTaskMeta :
         pipeRuntimeMeta.getConsensusGroupId2TaskMetaMap().entrySet()) {
@@ -66,11 +62,11 @@ public class PipeDataNodeBuilder {
         final PipeParameters sourceParameters = 
pipeStaticMeta.getSourceParameters();
         final DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
         final boolean needConstructDataRegionTask =
-            dataRegionIds.contains(dataRegionId)
+            StorageEngine.getInstance().getDataRegion(dataRegionId) != null
                 && DataRegionListeningFilter.shouldDataRegionBeListened(
                     sourceParameters, dataRegionId);
         final boolean needConstructSchemaRegionTask =
-            schemaRegionIds.contains(new SchemaRegionId(consensusGroupId))
+            SchemaEngine.getInstance().getSchemaRegion(new 
SchemaRegionId(consensusGroupId)) != null
                 && SchemaRegionListeningFilter.shouldSchemaRegionBeListened(
                     consensusGroupId, sourceParameters);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
index 2373495c8eb..6eff04d0417 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
@@ -77,7 +77,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
             new PipeTaskProcessorRuntimeEnvironment(
                 pipeName, creationTime, regionId, pipeTaskMeta));
     final PipeProcessor pipeProcessor =
-        StorageEngine.getInstance().getAllDataRegionIds().contains(new 
DataRegionId(regionId))
+        StorageEngine.getInstance().getDataRegion(new DataRegionId(regionId)) 
!= null
                 || PipeRuntimeMeta.isSourceExternal(regionId)
             ? PipeDataNodeAgent.plugin()
                 .dataRegion()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
index 5f774ceb379..84f1b35378e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
@@ -49,7 +49,7 @@ public class PipeTaskSourceStage extends PipeTaskStage {
       int regionId,
       PipeTaskMeta pipeTaskMeta) {
     pipeExtractor =
-        StorageEngine.getInstance().getAllDataRegionIds().contains(new 
DataRegionId(regionId))
+        StorageEngine.getInstance().getDataRegion(new DataRegionId(regionId)) 
!= null
                 || PipeRuntimeMeta.isSourceExternal(regionId)
             ? 
PipeDataNodeAgent.plugin().dataRegion().reflectSource(sourceParameters)
             : 
PipeDataNodeAgent.plugin().schemaRegion().reflectSource(sourceParameters);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index a748218d250..7179f8c6872 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -45,6 +45,7 @@ import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegio
 import org.apache.iotdb.db.protocol.session.InternalClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
@@ -605,19 +606,12 @@ public class IoTDBDataRegionSource extends IoTDBSource {
       if (StorageEngine.getInstance()
               .runIfPresent(
                   dataRegionIdObject,
-                  (dataRegion -> {
-                    dataRegion.writeLock(
-                        String.format("Pipe: starting %s", 
IoTDBDataRegionSource.class.getName()));
-                    try {
-                      
startHistoricalExtractorAndRealtimeExtractor(exceptionHolder);
-                    } finally {
-                      dataRegion.writeUnlock();
-                    }
-                  }))
+                  dataRegion ->
+                      
startHistoricalExtractorAndRealtimeExtractor(exceptionHolder, dataRegion))
           || StorageEngine.getInstance()
               .runIfAbsent(
                   dataRegionIdObject,
-                  () -> 
startHistoricalExtractorAndRealtimeExtractor(exceptionHolder))) {
+                  () -> 
startHistoricalExtractorAndRealtimeExtractor(exceptionHolder, null))) {
         rethrowExceptionIfAny(exceptionHolder);
 
         LOGGER.info(
@@ -634,14 +628,28 @@ public class IoTDBDataRegionSource extends IoTDBSource {
   }
 
   private void startHistoricalExtractorAndRealtimeExtractor(
-      final AtomicReference<Exception> exceptionHolder) {
+      final AtomicReference<Exception> exceptionHolder, final DataRegion 
dataRegion) {
     try {
       // Start realtimeSource first to avoid losing data. This may cause some
       // retransmission, yet it is OK according to the idempotency of IoTDB.
       // Note: The order of historical collection is flushing data -> adding 
all tsFile events.
       // There can still be writing when tsFile events are added. If we start
       // realtimeSource after the process, then this part of data will be lost.
-      realtimeSource.start();
+      if (Objects.nonNull(dataRegion)) {
+        dataRegion.writeLock(
+            String.format("Pipe: starting %s", 
IoTDBDataRegionSource.class.getName()));
+        try {
+          realtimeSource.start();
+        } finally {
+          dataRegion.writeUnlock();
+        }
+      } else {
+        realtimeSource.start();
+      }
+
+      // Historical extraction manages its own narrower region write lock. 
Keeping the outer lock
+      // only for realtime-source registration allows the expensive historical 
sort/materialization
+      // phase to stay out of the region critical section.
       historicalSource.start();
     } catch (final Exception e) {
       exceptionHolder.set(e);

Reply via email to