This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new bae30752517 [To dev/1.3] Change Query Data Source init way from lock 
to tryLock & Add more log and remove useless synchronize
bae30752517 is described below

commit bae307525177e05c8c8b802f743f80248e138ff0
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Aug 26 13:57:23 2025 +0800

    [To dev/1.3] Change Query Data Source init way from lock to tryLock & Add 
more log and remove useless synchronize
    
    (cherry picked from commit 1b377c049d80a5e83520746c0952a83f5f03bac0)
---
 .../queryengine/execution/driver/DataDriver.java   |  44 ++-
 .../fragment/FakedFragmentInstanceContext.java     |   5 +-
 .../fragment/FragmentInstanceContext.java          | 245 ++++++++----
 .../fragment/FragmentInstanceExecution.java        |   8 +-
 .../queryengine/metric/QueryResourceMetricSet.java |  26 +-
 .../FragmentInstanceStatisticsDrawer.java          |  11 +
 .../db/storageengine/dataregion/DataRegion.java    | 411 +++++++++++++++------
 .../dataregion/IDataRegionForQuery.java            |  29 +-
 .../dataregion/VirtualDataRegion.java              |  36 +-
 .../memtable/AbstractWritableMemChunk.java         |  11 +
 .../dataregion/memtable/TsFileProcessor.java       |  63 +++-
 .../dataregion/tsfile/TsFileManager.java           |  41 ++
 .../db/queryengine/execution/DataDriverTest.java   |  11 +-
 .../src/main/thrift/datanode.thrift                |   1 +
 14 files changed, 693 insertions(+), 249 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
index a225793437f..743a05d3777 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
@@ -32,6 +32,7 @@ import javax.annotation.concurrent.NotThreadSafe;
 import java.util.List;
 
 import static 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.QUERY_RESOURCE_INIT;
+import static 
org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.UNFINISHED_QUERY_DATA_SOURCE;
 
 /**
  * One {@link DataDriver} is responsible for one {@link FragmentInstance} 
which is for data query,
@@ -40,7 +41,7 @@ import static 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.QUE
 @NotThreadSafe
 public class DataDriver extends Driver {
 
-  private boolean init;
+  private boolean init = false;
 
   // Unit : Byte
   private final long estimatedMemorySize;
@@ -55,7 +56,13 @@ public class DataDriver extends Driver {
   protected boolean init(SettableFuture<?> blockedFuture) {
     if (!init) {
       try {
-        initialize();
+        if (!initialize()) { // failed to init this time, but now exception 
thrown, possibly failed
+          // to acquire lock within the specific time
+          blockedFuture.set(null);
+          return false;
+        } else {
+          return true;
+        }
       } catch (Throwable t) {
         LOGGER.error(
             "Failed to do the initialization for driver {} ", 
driverContext.getDriverTaskID(), t);
@@ -78,8 +85,7 @@ public class DataDriver extends Driver {
    *     org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} is 
null after
    *     initialization, IllegalStateException will be thrown
    */
-  private void initialize() throws QueryProcessException {
-    long startTime = System.nanoTime();
+  private boolean initialize() throws QueryProcessException {
     try {
       List<DataSourceOperator> sourceOperators =
           ((DataDriverContext) driverContext).getSourceOperators();
@@ -90,20 +96,30 @@ public class DataDriver extends Driver {
           // for some reasons, we may get null QueryDataSource here.
           // And it's safe for us to throw this exception here in such case.
           throw new IllegalStateException("QueryDataSource should never be 
null!");
+        } else if (dataSource == UNFINISHED_QUERY_DATA_SOURCE) {
+          // init query data source timeout. Maybe failed to acquire the read 
lock within the
+          // specified time
+          // do nothing, wait for next try
+        } else {
+          sourceOperators.forEach(
+              sourceOperator -> {
+                // Construct QueryDataSource for source operator
+                sourceOperator.initQueryDataSource(dataSource.clone());
+              });
+          this.init = true;
         }
-        sourceOperators.forEach(
-            sourceOperator -> {
-              // Construct QueryDataSource for source operator
-              sourceOperator.initQueryDataSource(dataSource.clone());
-            });
+      } else {
+        this.init = true;
       }
-
-      this.init = true;
     } finally {
-      ((DataDriverContext) driverContext).clearSourceOperators();
-      QUERY_EXECUTION_METRICS.recordExecutionCost(
-          QUERY_RESOURCE_INIT, System.nanoTime() - startTime);
+      if (this.init) {
+        ((DataDriverContext) driverContext).clearSourceOperators();
+        QUERY_EXECUTION_METRICS.recordExecutionCost(
+            QUERY_RESOURCE_INIT,
+            
driverContext.getFragmentInstanceContext().getInitQueryDataSourceCost());
+      }
     }
+    return this.init;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java
index a91e41d2379..a76ca60fd5c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java
@@ -48,7 +48,7 @@ public class FakedFragmentInstanceContext extends 
FragmentInstanceContext {
 
   public void initQueryDataSource(PartialPath sourcePath) throws 
QueryProcessException {
 
-    dataRegion.readLock();
+    dataRegion.tryReadLock(Long.MAX_VALUE);
     try {
       this.sharedQueryDataSource =
           dataRegion.query(
@@ -56,7 +56,8 @@ public class FakedFragmentInstanceContext extends 
FragmentInstanceContext {
               sourcePath.getDevice(),
               this,
               getGlobalTimeFilter(),
-              null);
+              null,
+              Long.MAX_VALUE);
 
       // used files should be added before mergeLock is unlocked, or they may 
be deleted by
       // running merge
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 5510394396d..cdb44db5f66 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
+import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
 import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
 import 
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
 import 
org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager;
@@ -73,11 +74,12 @@ import java.util.stream.Collectors;
 import static 
org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME;
 import static 
org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME;
 import static 
org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.EMPTY_QUERY_DATA_SOURCE;
+import static 
org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.UNFINISHED_QUERY_DATA_SOURCE;
 
 public class FragmentInstanceContext extends QueryContext {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(FragmentInstanceContext.class);
-  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
   private static final long END_TIME_INITIAL_VALUE = -1L;
   // wait over 5s for driver to close is abnormal
   private static final long LONG_WAIT_DURATION = 5_000_000_000L;
@@ -139,6 +141,7 @@ public class FragmentInstanceContext extends QueryContext {
   private TFetchFragmentInstanceStatisticsResp fragmentInstanceStatistics = 
null;
 
   private long initQueryDataSourceCost = 0;
+  private int initQueryDataSourceRetryCount = 0;
   private final AtomicLong readyQueueTime = new AtomicLong(0);
   private final AtomicLong blockQueueTime = new AtomicLong(0);
   private long unclosedSeqFileNum = 0;
@@ -353,7 +356,7 @@ public class FragmentInstanceContext extends QueryContext {
               PatternTreeMap<Modification, 
PatternTreeMapFactory.ModsSerializer> allMods =
                   loadAllModificationsFromDisk(resource);
               atomicReference.set(allMods);
-              if (cachedModEntriesSize.get() >= 
config.getModsCacheSizeLimitPerFI()) {
+              if (cachedModEntriesSize.get() >= 
CONFIG.getModsCacheSizeLimitPerFI()) {
                 return null;
               }
               long memCost =
@@ -361,7 +364,7 @@ public class FragmentInstanceContext extends QueryContext {
                       + 
RamUsageEstimator.SHALLOW_SIZE_OF_CONCURRENT_HASHMAP_ENTRY;
               long alreadyUsedMemoryForCachedModEntries = 
cachedModEntriesSize.get();
               while (alreadyUsedMemoryForCachedModEntries + memCost
-                  < config.getModsCacheSizeLimitPerFI()) {
+                  < CONFIG.getModsCacheSizeLimitPerFI()) {
                 if (cachedModEntriesSize.compareAndSet(
                     alreadyUsedMemoryForCachedModEntries,
                     alreadyUsedMemoryForCachedModEntries + memCost)) {
@@ -476,10 +479,6 @@ public class FragmentInstanceContext extends QueryContext {
     return dataNodeQueryContext;
   }
 
-  public void setDataNodeQueryContext(DataNodeQueryContext 
dataNodeQueryContext) {
-    this.dataNodeQueryContext = dataNodeQueryContext;
-  }
-
   public FragmentInstanceInfo getInstanceInfo() {
     return getErrorCode()
         .map(
@@ -535,11 +534,11 @@ public class FragmentInstanceContext extends QueryContext 
{
     memoryReservationManager.releaseAllReservedMemory();
   }
 
-  public void initQueryDataSource(List<PartialPath> sourcePaths) throws 
QueryProcessException {
+  public boolean initQueryDataSource(List<PartialPath> sourcePaths) throws 
QueryProcessException {
     long startTime = System.nanoTime();
     if (sourcePaths == null || sourcePaths.isEmpty()) {
       this.sharedQueryDataSource = EMPTY_QUERY_DATA_SOURCE;
-      return;
+      return true;
     }
     String singleDevice = null;
     if (sourcePaths.size() == 1) {
@@ -560,82 +559,136 @@ public class FragmentInstanceContext extends 
QueryContext {
         }
       }
     }
-    dataRegion.readLock();
-    try {
-      this.sharedQueryDataSource =
-          dataRegion.query(
-              sourcePaths,
-              // when all the selected series are under the same device, the 
QueryDataSource will be
-              // filtered according to timeIndex
-              singleDevice,
-              this,
-              // time filter may be stateful, so we need to copy it
-              globalTimeFilter != null ? globalTimeFilter.copy() : null,
-              timePartitions);
-
-      // used files should be added before mergeLock is unlocked, or they may 
be deleted by
-      // running merge
-      if (sharedQueryDataSource != null) {
-        closedFilePaths = new HashSet<>();
-        unClosedFilePaths = new HashSet<>();
-        addUsedFilesForQuery((QueryDataSource) sharedQueryDataSource);
-        ((QueryDataSource) sharedQueryDataSource).setSingleDevice(singleDevice 
!= null);
+
+    long waitForLockTime = CONFIG.getDriverTaskExecutionTimeSliceInMs();
+    long startAcquireLockTime = System.nanoTime();
+    if (dataRegion.tryReadLock(waitForLockTime)) {
+      try {
+        // minus already consumed time
+        waitForLockTime -= (System.nanoTime() - startAcquireLockTime) / 
1_000_000;
+
+        // no remaining time slice
+        if (waitForLockTime <= 0) {
+          return false;
+        }
+
+        this.sharedQueryDataSource =
+            dataRegion.query(
+                sourcePaths,
+                // when all the selected series are under the same device, the 
QueryDataSource will
+                // be
+                // filtered according to timeIndex
+                singleDevice,
+                this,
+                // time filter may be stateful, so we need to copy it
+                globalTimeFilter != null ? globalTimeFilter.copy() : null,
+                timePartitions,
+                waitForLockTime);
+
+        // used files should be added before mergeLock is unlocked, or they 
may be deleted by
+        // running merge
+        if (sharedQueryDataSource != null) {
+          closedFilePaths = new HashSet<>();
+          unClosedFilePaths = new HashSet<>();
+          addUsedFilesForQuery((QueryDataSource) sharedQueryDataSource);
+          ((QueryDataSource) 
sharedQueryDataSource).setSingleDevice(singleDevice != null);
+          return true;
+        } else {
+          // failed to acquire lock within the specific time
+          return false;
+        }
+      } finally {
+        addInitQueryDataSourceCost(System.nanoTime() - startTime);
+        dataRegion.readUnlock();
       }
-    } finally {
-      setInitQueryDataSourceCost(System.nanoTime() - startTime);
-      dataRegion.readUnlock();
+    } else {
+      addInitQueryDataSourceCost(System.nanoTime() - startTime);
+      return false;
     }
   }
 
-  public void initRegionScanQueryDataSource(Map<IDeviceID, DeviceContext> 
devicePathsToContext)
-      throws QueryProcessException {
+  public boolean initRegionScanQueryDataSource(Map<IDeviceID, DeviceContext> 
devicePathsToContext) {
     long startTime = System.nanoTime();
     if (devicePathsToContext == null) {
-      return;
+      return true;
     }
-    dataRegion.readLock();
-    try {
-      this.sharedQueryDataSource =
-          dataRegion.queryForDeviceRegionScan(
-              devicePathsToContext,
-              this,
-              globalTimeFilter != null ? globalTimeFilter.copy() : null,
-              timePartitions);
-
-      if (sharedQueryDataSource != null) {
-        closedFilePaths = new HashSet<>();
-        unClosedFilePaths = new HashSet<>();
-        addUsedFilesForRegionQuery((QueryDataSourceForRegionScan) 
sharedQueryDataSource);
+
+    long waitForLockTime = CONFIG.getDriverTaskExecutionTimeSliceInMs();
+    if (dataRegion.tryReadLock(waitForLockTime)) {
+      try {
+        // minus already consumed time
+        waitForLockTime -= (System.nanoTime() - startTime) / 1_000_000;
+
+        // no remaining time slice
+        if (waitForLockTime <= 0) {
+          return false;
+        }
+        this.sharedQueryDataSource =
+            dataRegion.queryForDeviceRegionScan(
+                devicePathsToContext,
+                this,
+                globalTimeFilter != null ? globalTimeFilter.copy() : null,
+                timePartitions,
+                waitForLockTime);
+
+        if (sharedQueryDataSource != null) {
+          closedFilePaths = new HashSet<>();
+          unClosedFilePaths = new HashSet<>();
+          addUsedFilesForRegionQuery((QueryDataSourceForRegionScan) 
sharedQueryDataSource);
+          return true;
+        } else {
+          // failed to acquire lock within the specific time
+          return false;
+        }
+      } finally {
+        addInitQueryDataSourceCost(System.nanoTime() - startTime);
+        dataRegion.readUnlock();
       }
-    } finally {
-      setInitQueryDataSourceCost(System.nanoTime() - startTime);
-      dataRegion.readUnlock();
+    } else {
+      addInitQueryDataSourceCost(System.nanoTime() - startTime);
+      return false;
     }
   }
 
-  public void initRegionScanQueryDataSource(List<PartialPath> pathList)
-      throws QueryProcessException {
+  public boolean initRegionScanQueryDataSource(List<PartialPath> pathList) {
     long startTime = System.nanoTime();
     if (pathList == null) {
-      return;
+      return true;
     }
-    dataRegion.readLock();
-    try {
-      this.sharedQueryDataSource =
-          dataRegion.queryForSeriesRegionScan(
-              pathList,
-              this,
-              globalTimeFilter != null ? globalTimeFilter.copy() : null,
-              timePartitions);
-
-      if (sharedQueryDataSource != null) {
-        closedFilePaths = new HashSet<>();
-        unClosedFilePaths = new HashSet<>();
-        addUsedFilesForRegionQuery((QueryDataSourceForRegionScan) 
sharedQueryDataSource);
+    long waitForLockTime = CONFIG.getDriverTaskExecutionTimeSliceInMs();
+    if (dataRegion.tryReadLock(waitForLockTime)) {
+      // minus already consumed time
+      waitForLockTime -= (System.nanoTime() - startTime) / 1_000_000;
+
+      // no remaining time slice
+      if (waitForLockTime <= 0) {
+        return false;
+      }
+      try {
+        this.sharedQueryDataSource =
+            dataRegion.queryForSeriesRegionScan(
+                pathList,
+                this,
+                globalTimeFilter != null ? globalTimeFilter.copy() : null,
+                timePartitions,
+                waitForLockTime);
+
+        if (sharedQueryDataSource != null) {
+          closedFilePaths = new HashSet<>();
+          unClosedFilePaths = new HashSet<>();
+          addUsedFilesForRegionQuery((QueryDataSourceForRegionScan) 
sharedQueryDataSource);
+          return true;
+        } else {
+          // failed to acquire lock within the specific time
+          return false;
+        }
+      } finally {
+        addInitQueryDataSourceCost(System.nanoTime() - startTime);
+        dataRegion.readUnlock();
       }
-    } finally {
-      setInitQueryDataSourceCost(System.nanoTime() - startTime);
-      dataRegion.readUnlock();
+    } else {
+      addInitQueryDataSourceCost(System.nanoTime() - startTime);
+      return false;
     }
   }
 
@@ -643,17 +696,26 @@ public class FragmentInstanceContext extends QueryContext 
{
     if (sharedQueryDataSource == null) {
       switch (queryDataSourceType) {
         case SERIES_SCAN:
-          initQueryDataSource(sourcePaths);
-          // Friendly for gc
-          sourcePaths = null;
+          if (initQueryDataSource(sourcePaths)) {
+            // Friendly for gc
+            sourcePaths = null;
+          } else {
+            return getUnfinishedQueryDataSource();
+          }
           break;
         case DEVICE_REGION_SCAN:
-          initRegionScanQueryDataSource(devicePathsToContext);
-          devicePathsToContext = null;
+          if (initRegionScanQueryDataSource(devicePathsToContext)) {
+            devicePathsToContext = null;
+          } else {
+            return getUnfinishedQueryDataSource();
+          }
           break;
         case TIME_SERIES_REGION_SCAN:
-          initRegionScanQueryDataSource(sourcePaths);
-          sourcePaths = null;
+          if (initRegionScanQueryDataSource(sourcePaths)) {
+            sourcePaths = null;
+          } else {
+            return getUnfinishedQueryDataSource();
+          }
           break;
         default:
           throw new QueryProcessException(
@@ -663,6 +725,18 @@ public class FragmentInstanceContext extends QueryContext {
     return sharedQueryDataSource;
   }
 
+  private IQueryDataSource getUnfinishedQueryDataSource() {
+    increaseInitQueryDataSourceRetryCount();
+    // record warn log every 10 times retry
+    if (initQueryDataSourceRetryCount % 10 == 0) {
+      LOGGER.warn(
+          "Failed to acquire the read lock of DataRegion-{} for {} times",
+          dataRegion == null ? "UNKNOWN" : dataRegion.getDataRegionId(),
+          initQueryDataSourceRetryCount);
+    }
+    return UNFINISHED_QUERY_DATA_SOURCE;
+  }
+
   /** Lock and check if tsFileResource is deleted */
   private boolean processTsFileResource(TsFileResource tsFileResource, boolean 
isClosed) {
     addFilePathToMap(tsFileResource, isClosed);
@@ -840,6 +914,9 @@ public class FragmentInstanceContext extends QueryContext {
 
     
QueryRelatedResourceMetricSet.getInstance().updateFragmentInstanceTime(durationTime);
 
+    QueryResourceMetricSet.getInstance()
+        .recordInitQueryResourceRetryCount(getInitQueryDataSourceRetryCount());
+
     SeriesScanCostMetricSet.getInstance()
         .recordBloomFilterMetrics(
             getQueryStatistics().getLoadBloomFilterFromCacheCount().get(),
@@ -921,7 +998,7 @@ public class FragmentInstanceContext extends QueryContext {
         
.updatePageReaderMemoryUsage(getQueryStatistics().getPageReaderMaxUsedMemorySize().get());
   }
 
-  private synchronized void releaseDataNodeQueryContext() {
+  private void releaseDataNodeQueryContext() {
     if (dataNodeQueryContextMap == null) {
       // this process is in fetch schema, nothing need to release
       return;
@@ -958,14 +1035,22 @@ public class FragmentInstanceContext extends 
QueryContext {
     return fragmentInstanceStatistics;
   }
 
-  public void setInitQueryDataSourceCost(long initQueryDataSourceCost) {
-    this.initQueryDataSourceCost = initQueryDataSourceCost;
+  public void addInitQueryDataSourceCost(long initQueryDataSourceCost) {
+    this.initQueryDataSourceCost += initQueryDataSourceCost;
   }
 
   public long getInitQueryDataSourceCost() {
     return initQueryDataSourceCost;
   }
 
+  public void increaseInitQueryDataSourceRetryCount() {
+    this.initQueryDataSourceRetryCount++;
+  }
+
+  public int getInitQueryDataSourceRetryCount() {
+    return initQueryDataSourceRetryCount;
+  }
+
   public void addReadyQueuedTime(long time) {
     readyQueueTime.addAndGet(time);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index 9a825a055c4..d8ce02aa3fc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.execution.fragment;
 
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
@@ -29,7 +30,6 @@ import 
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager
 import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
-import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
 import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion;
 import org.apache.iotdb.db.utils.SetThreadName;
@@ -53,6 +53,7 @@ import static 
org.apache.iotdb.db.queryengine.statistics.StatisticsMergeUtil.mer
 public class FragmentInstanceExecution {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(FragmentInstanceExecution.class);
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
   private final FragmentInstanceId instanceId;
   private final FragmentInstanceContext context;
 
@@ -167,8 +168,8 @@ public class FragmentInstanceExecution {
       // We don't need to output the region having ExplainAnalyzeOperator only.
       return false;
     }
-    statistics.setDataRegion(((DataRegion) 
context.getDataRegion()).getDataRegionId());
-    
statistics.setIp(IoTDBDescriptor.getInstance().getConfig().getAddressAndPort().ip);
+    statistics.setDataRegion(context.getDataRegion().getDataRegionId());
+    statistics.setIp(CONFIG.getInternalAddress() + ":" + 
CONFIG.getInternalPort());
     statistics.setStartTimeInMS(context.getStartTime());
     statistics.setEndTimeInMS(
         context.isEndTimeUpdate() ? context.getEndTime() : 
System.currentTimeMillis());
@@ -177,6 +178,7 @@ public class FragmentInstanceExecution {
     statistics.setReadyQueuedTime(context.getReadyQueueTime());
 
     
statistics.setInitDataQuerySourceCost(context.getInitQueryDataSourceCost());
+    
statistics.setInitDataQuerySourceRetryCount(context.getInitQueryDataSourceRetryCount());
 
     statistics.setSeqClosednNum(context.getClosedSeqFileNum());
     statistics.setSeqUnclosedNum(context.getUnclosedSeqFileNum());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryResourceMetricSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryResourceMetricSet.java
index 401fc2e0c8f..241486750a4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryResourceMetricSet.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryResourceMetricSet.java
@@ -41,10 +41,12 @@ public class QueryResourceMetricSet implements IMetricSet {
   public static final String UNSEQUENCE_TSFILE = "unsequence_tsfile";
   public static final String FLUSHING_MEMTABLE = "flushing_memtable";
   public static final String WORKING_MEMTABLE = "working_memtable";
+  public static final String INIT_QUERY_RESOURCE_RETRY_COUNT = "retry_count";
   private Histogram sequenceTsFileHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
   private Histogram unsequenceTsFileHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
   private Histogram flushingMemTableHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
   private Histogram workingMemTableHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+  private Histogram retryCountHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
 
   public void recordQueryResourceNum(String type, int count) {
     switch (type) {
@@ -60,11 +62,22 @@ public class QueryResourceMetricSet implements IMetricSet {
       case WORKING_MEMTABLE:
         workingMemTableHistogram.update(count);
         break;
+      case INIT_QUERY_RESOURCE_RETRY_COUNT:
+        if (count > 0) {
+          retryCountHistogram.update(count);
+        }
+        break;
       default:
         break;
     }
   }
 
+  public void recordInitQueryResourceRetryCount(int count) {
+    if (count > 0) {
+      retryCountHistogram.update(count);
+    }
+  }
+
   @Override
   public void bindTo(AbstractMetricService metricService) {
     sequenceTsFileHistogram =
@@ -91,11 +104,22 @@ public class QueryResourceMetricSet implements IMetricSet {
             MetricLevel.IMPORTANT,
             Tag.TYPE.toString(),
             WORKING_MEMTABLE);
+    retryCountHistogram =
+        metricService.getOrCreateHistogram(
+            Metric.QUERY_RESOURCE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.TYPE.toString(),
+            INIT_QUERY_RESOURCE_RETRY_COUNT);
   }
 
   @Override
   public void unbindFrom(AbstractMetricService metricService) {
-    Arrays.asList(SEQUENCE_TSFILE, UNSEQUENCE_TSFILE, FLUSHING_MEMTABLE, 
WORKING_MEMTABLE)
+    Arrays.asList(
+            SEQUENCE_TSFILE,
+            UNSEQUENCE_TSFILE,
+            FLUSHING_MEMTABLE,
+            WORKING_MEMTABLE,
+            INIT_QUERY_RESOURCE_RETRY_COUNT)
         .forEach(
             type ->
                 metricService.remove(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java
index 0141a86e627..b9f1a55b5bd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java
@@ -113,6 +113,17 @@ public class FragmentInstanceStatisticsDrawer {
           String.format(
               "Cost of initDataQuerySource: %.3f ms",
               statistics.getInitDataQuerySourceCost() * NS_TO_MS_FACTOR));
+
+      if (statistics.isSetInitDataQuerySourceRetryCount()
+          && statistics.getInitDataQuerySourceRetryCount() > 0) {
+        addLine(
+            singleFragmentInstanceArea,
+            1,
+            String.format(
+                "Retry count of initDataQuerySource: %d",
+                statistics.getInitDataQuerySourceRetryCount()));
+      }
+
       addLine(
           singleFragmentInstanceArea,
           1,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index a04c81f4801..b68a7199436 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -234,11 +234,6 @@ public class DataRegion implements IDataRegionForQuery {
   /** closeStorageGroupCondition is used to wait for all currently closing 
TsFiles to be done. */
   private final Object closeStorageGroupCondition = new Object();
 
-  /**
-   * Avoid some tsfileResource is changed (e.g., from unsealed to sealed) when 
a read is executed.
-   */
-  private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock();
-
   /** time partition id in the database -> {@link TsFileProcessor} for this 
time partition. */
   private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = 
new TreeMap<>();
 
@@ -1956,8 +1951,8 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** used for query engine */
   @Override
+  @TestOnly
   public QueryDataSource query(
       List<PartialPath> pathList,
       String singleDeviceId,
@@ -1965,31 +1960,154 @@ public class DataRegion implements IDataRegionForQuery 
{
       Filter globalTimeFilter,
       List<Long> timePartitions)
       throws QueryProcessException {
-    try {
-      List<TsFileResource> seqResources =
-          getFileResourceListForQuery(
-              tsFileManager.getTsFileList(true, timePartitions, 
globalTimeFilter),
-              pathList,
-              singleDeviceId,
-              context,
+    return query(
+        pathList, singleDeviceId, context, globalTimeFilter, timePartitions, 
Long.MAX_VALUE);
+  }
+
+  /** used for query engine */
+  @Override
+  public QueryDataSource query(
+      List<PartialPath> pathList,
+      String singleDeviceId,
+      QueryContext context,
+      Filter globalTimeFilter,
+      List<Long> timePartitions,
+      long waitForLockTimeInMs)
+      throws QueryProcessException {
+
+    Pair<List<TsFileResource>, List<TsFileResource>> pair =
+        tsFileManager.getAllTsFileListForQuery(timePartitions, 
globalTimeFilter);
+
+    List<TsFileResource> seqTsFileResouceList = pair.left;
+    List<TsFileResource> unSeqTsFileResouceList = pair.right;
+
+    List<TsFileProcessor> needToUnLockList = new ArrayList<>();
+
+    boolean success =
+        tryGetFLushLock(
+            waitForLockTimeInMs,
+            singleDeviceId,
+            globalTimeFilter,
+            context.isDebug(),
+            seqTsFileResouceList,
+            unSeqTsFileResouceList,
+            needToUnLockList);
+
+    if (success) {
+      try {
+        List<TsFileResource> satisfiedSeqResourceList =
+            getFileResourceListForQuery(
+                seqTsFileResouceList, pathList, singleDeviceId, context, 
globalTimeFilter, true);
+
+        List<TsFileResource> satisfiedUnSeqResourceList =
+            getFileResourceListForQuery(
+                unSeqTsFileResouceList, pathList, singleDeviceId, context, 
globalTimeFilter, false);
+
+        QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
+            SEQUENCE_TSFILE, satisfiedSeqResourceList.size());
+        QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
+            UNSEQUENCE_TSFILE, satisfiedUnSeqResourceList.size());
+        return new QueryDataSource(satisfiedSeqResourceList, 
satisfiedUnSeqResourceList);
+      } catch (MetadataException e) {
+        throw new QueryProcessException(e);
+      } finally {
+        clearAlreadyLockedList(needToUnLockList);
+      }
+    } else {
+      // means that failed to acquire lock within the specific time
+      return null;
+    }
+  }
+
+  /**
+   * try to get flush lock for each unclosed satisfied tsfile
+   *
+   * @return true if lock successfully, otherwise false if return false, 
needToUnLockList will
+   *     always be empty because this method is responsible for unlocking all 
the already-acquiring
+   *     lock if return true, the caller is responsible for unlocking all the 
already-acquiring lock
+   *     in needToUnLockList
+   */
+  private boolean tryGetFLushLock(
+      long waitTimeInMs,
+      String singleDeviceId,
+      Filter globalTimeFilter,
+      boolean isDebug,
+      List<TsFileResource> seqResources,
+      List<TsFileResource> unSeqResources,
+      List<TsFileProcessor> needToUnLockList) {
+    // deal with seq resources
+    for (TsFileResource tsFileResource : seqResources) {
+      // only need to acquire flush lock for those unclosed and satisfied 
tsfile
+      if (!tsFileResource.isClosed()
+          && tsFileResource.isSatisfied(
+              singleDeviceId == null ? null : new 
PlainDeviceID(singleDeviceId),
               globalTimeFilter,
-              true);
-      List<TsFileResource> unseqResources =
-          getFileResourceListForQuery(
-              tsFileManager.getTsFileList(false, timePartitions, 
globalTimeFilter),
-              pathList,
-              singleDeviceId,
-              context,
+              true,
+              isDebug)) {
+        TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
+        try {
+          long startTime = System.nanoTime();
+          if (tsFileProcessor.tryReadLock(waitTimeInMs)) {
+            // minus already consumed time
+            waitTimeInMs -= (System.nanoTime() - startTime) / 1_000_000;
+
+            needToUnLockList.add(tsFileProcessor);
+            // no remaining time slice
+            if (waitTimeInMs <= 0) {
+              clearAlreadyLockedList(needToUnLockList);
+              return false;
+            }
+          } else {
+            clearAlreadyLockedList(needToUnLockList);
+            return false;
+          }
+        } catch (InterruptedException e) {
+          clearAlreadyLockedList(needToUnLockList);
+          Thread.currentThread().interrupt();
+          return false;
+        }
+      }
+    }
+    // deal with unSeq resources
+    for (TsFileResource tsFileResource : unSeqResources) {
+      if (!tsFileResource.isClosed()
+          && tsFileResource.isSatisfied(
+              singleDeviceId == null ? null : new 
PlainDeviceID(singleDeviceId),
               globalTimeFilter,
-              false);
-
-      QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE, 
seqResources.size());
-      QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(UNSEQUENCE_TSFILE, 
unseqResources.size());
+              false,
+              isDebug)) {
+        TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
+        try {
+          long startTime = System.nanoTime();
+          if (tsFileProcessor.tryReadLock(waitTimeInMs)) {
+            // minus already consumed time
+            waitTimeInMs -= (System.nanoTime() - startTime) / 1_000_000;
+
+            needToUnLockList.add(tsFileProcessor);
+            // no remaining time slice
+            if (waitTimeInMs <= 0) {
+              clearAlreadyLockedList(needToUnLockList);
+              return false;
+            }
+          } else {
+            clearAlreadyLockedList(needToUnLockList);
+            return false;
+          }
+        } catch (InterruptedException e) {
+          clearAlreadyLockedList(needToUnLockList);
+          Thread.currentThread().interrupt();
+          return false;
+        }
+      }
+    }
+    return true;
+  }
 
-      return new QueryDataSource(seqResources, unseqResources);
-    } catch (MetadataException e) {
-      throw new QueryProcessException(e);
+  private void clearAlreadyLockedList(List<TsFileProcessor> needToUnLockList) {
+    for (TsFileProcessor processor : needToUnLockList) {
+      processor.readUnLock();
     }
+    needToUnLockList.clear();
   }
 
   @Override
@@ -1997,31 +2115,51 @@ public class DataRegion implements IDataRegionForQuery {
       List<PartialPath> pathList,
       QueryContext queryContext,
       Filter globalTimeFilter,
-      List<Long> timePartitions)
-      throws QueryProcessException {
-    try {
-      List<IFileScanHandle> seqFileScanHandles =
-          getFileHandleListForQuery(
-              tsFileManager.getTsFileList(true, timePartitions, 
globalTimeFilter),
-              pathList,
-              queryContext,
-              globalTimeFilter,
-              true);
-      List<IFileScanHandle> unseqFileScanHandles =
-          getFileHandleListForQuery(
-              tsFileManager.getTsFileList(false, timePartitions, 
globalTimeFilter),
-              pathList,
-              queryContext,
-              globalTimeFilter,
-              false);
-
-      QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE, 
seqFileScanHandles.size());
-      QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
-          UNSEQUENCE_TSFILE, unseqFileScanHandles.size());
+      List<Long> timePartitions,
+      long waitForLockTimeInMs) {
+    Pair<List<TsFileResource>, List<TsFileResource>> pair =
+        tsFileManager.getAllTsFileListForQuery(timePartitions, 
globalTimeFilter);
+
+    List<TsFileResource> seqTsFileResouceList = pair.left;
+    List<TsFileResource> unSeqTsFileResouceList = pair.right;
+
+    List<TsFileProcessor> needToUnLockList = new ArrayList<>();
+
+    boolean success =
+        tryGetFLushLock(
+            waitForLockTimeInMs,
+            null,
+            globalTimeFilter,
+            queryContext.isDebug(),
+            seqTsFileResouceList,
+            unSeqTsFileResouceList,
+            needToUnLockList);
+
+    if (success) {
+      try {
+        List<IFileScanHandle> seqFileScanHandles =
+            getFileHandleListForQuery(
+                seqTsFileResouceList, pathList, queryContext, 
globalTimeFilter, true);
+
+        List<IFileScanHandle> unSeqFileScanHandles =
+            getFileHandleListForQuery(
+                tsFileManager.getTsFileList(false, timePartitions, 
globalTimeFilter),
+                pathList,
+                queryContext,
+                globalTimeFilter,
+                false);
 
-      return new QueryDataSourceForRegionScan(seqFileScanHandles, 
unseqFileScanHandles);
-    } catch (MetadataException e) {
-      throw new QueryProcessException(e);
+        QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
+            SEQUENCE_TSFILE, seqFileScanHandles.size());
+        QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
+            UNSEQUENCE_TSFILE, unSeqFileScanHandles.size());
+        return new QueryDataSourceForRegionScan(seqFileScanHandles, 
unSeqFileScanHandles);
+      } finally {
+        clearAlreadyLockedList(needToUnLockList);
+      }
+    } else {
+      // means that failed to acquire lock within the specific time
+      return null;
     }
   }
 
@@ -2030,25 +2168,19 @@ public class DataRegion implements IDataRegionForQuery {
       List<PartialPath> partialPaths,
       QueryContext context,
       Filter globalTimeFilter,
-      boolean isSeq)
-      throws MetadataException {
+      boolean isSeq) {
     List<IFileScanHandle> fileScanHandles = new ArrayList<>();
 
     for (TsFileResource tsFileResource : tsFileResources) {
       if (!tsFileResource.isSatisfied(null, globalTimeFilter, isSeq, 
context.isDebug())) {
         continue;
       }
-      closeQueryLock.readLock().lock();
-      try {
-        if (tsFileResource.isClosed()) {
-          fileScanHandles.add(new ClosedFileScanHandleImpl(tsFileResource, 
context));
-        } else {
-          tsFileResource
-              .getProcessor()
-              .queryForSeriesRegionScan(partialPaths, context, 
fileScanHandles);
-        }
-      } finally {
-        closeQueryLock.readLock().unlock();
+      if (tsFileResource.isClosed()) {
+        fileScanHandles.add(new ClosedFileScanHandleImpl(tsFileResource, 
context));
+      } else {
+        tsFileResource
+            .getProcessor()
+            .queryForSeriesRegionScanWithoutLock(partialPaths, context, 
fileScanHandles);
       }
     }
     return fileScanHandles;
@@ -2059,31 +2191,52 @@ public class DataRegion implements IDataRegionForQuery {
       Map<IDeviceID, DeviceContext> devicePathToAligned,
       QueryContext queryContext,
       Filter globalTimeFilter,
-      List<Long> timePartitions)
-      throws QueryProcessException {
-    try {
-      List<IFileScanHandle> seqFileScanHandles =
-          getFileHandleListForQuery(
-              tsFileManager.getTsFileList(true, timePartitions, 
globalTimeFilter),
-              devicePathToAligned,
-              queryContext,
-              globalTimeFilter,
-              true);
-      List<IFileScanHandle> unseqFileScanHandles =
-          getFileHandleListForQuery(
-              tsFileManager.getTsFileList(false, timePartitions, 
globalTimeFilter),
-              devicePathToAligned,
-              queryContext,
-              globalTimeFilter,
-              false);
+      List<Long> timePartitions,
+      long waitForLockTimeInMs) {
+
+    Pair<List<TsFileResource>, List<TsFileResource>> pair =
+        tsFileManager.getAllTsFileListForQuery(timePartitions, 
globalTimeFilter);
+
+    List<TsFileResource> seqTsFileResouceList = pair.left;
+    List<TsFileResource> unSeqTsFileResouceList = pair.right;
+
+    List<TsFileProcessor> needToUnLockList = new ArrayList<>();
 
-      QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE, 
seqFileScanHandles.size());
-      QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
-          UNSEQUENCE_TSFILE, unseqFileScanHandles.size());
+    boolean success =
+        tryGetFLushLock(
+            waitForLockTimeInMs,
+            null,
+            globalTimeFilter,
+            queryContext.isDebug(),
+            seqTsFileResouceList,
+            unSeqTsFileResouceList,
+            needToUnLockList);
 
-      return new QueryDataSourceForRegionScan(seqFileScanHandles, 
unseqFileScanHandles);
-    } catch (MetadataException e) {
-      throw new QueryProcessException(e);
+    if (success) {
+      try {
+        List<IFileScanHandle> seqFileScanHandles =
+            getFileHandleListForQuery(
+                seqTsFileResouceList, devicePathToAligned, queryContext, 
globalTimeFilter, true);
+
+        List<IFileScanHandle> unSeqFileScanHandles =
+            getFileHandleListForQuery(
+                tsFileManager.getTsFileList(false, timePartitions, 
globalTimeFilter),
+                devicePathToAligned,
+                queryContext,
+                globalTimeFilter,
+                false);
+
+        QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
+            SEQUENCE_TSFILE, seqFileScanHandles.size());
+        QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(
+            UNSEQUENCE_TSFILE, unSeqFileScanHandles.size());
+        return new QueryDataSourceForRegionScan(seqFileScanHandles, 
unSeqFileScanHandles);
+      } finally {
+        clearAlreadyLockedList(needToUnLockList);
+      }
+    } else {
+      // means that failed to acquire lock within the specific time
+      return null;
     }
   }
 
@@ -2092,25 +2245,19 @@ public class DataRegion implements IDataRegionForQuery {
       Map<IDeviceID, DeviceContext> devicePathsToContext,
       QueryContext context,
       Filter globalTimeFilter,
-      boolean isSeq)
-      throws MetadataException {
+      boolean isSeq) {
     List<IFileScanHandle> fileScanHandles = new ArrayList<>();
 
     for (TsFileResource tsFileResource : tsFileResources) {
       if (!tsFileResource.isSatisfied(null, globalTimeFilter, isSeq, 
context.isDebug())) {
         continue;
       }
-      closeQueryLock.readLock().lock();
-      try {
-        if (tsFileResource.isClosed()) {
-          fileScanHandles.add(new ClosedFileScanHandleImpl(tsFileResource, 
context));
-        } else {
-          tsFileResource
-              .getProcessor()
-              .queryForDeviceRegionScan(devicePathsToContext, context, 
fileScanHandles);
-        }
-      } finally {
-        closeQueryLock.readLock().unlock();
+      if (tsFileResource.isClosed()) {
+        fileScanHandles.add(new ClosedFileScanHandleImpl(tsFileResource, 
context));
+      } else {
+        tsFileResource
+            .getProcessor()
+            .queryForDeviceRegionScanWithoutLock(devicePathsToContext, 
context, fileScanHandles);
       }
     }
     return fileScanHandles;
@@ -2118,11 +2265,45 @@ public class DataRegion implements IDataRegionForQuery {
 
   /** lock the read lock of the insert lock */
   @Override
-  public void readLock() {
-    // apply read lock for SG insert lock to prevent inconsistent with 
concurrently writing memtable
-    insertLock.readLock().lock();
+  public boolean tryReadLock(long waitMillis) {
+    try {
+      // apply read lock for SG insert lock to prevent inconsistent with 
concurrently writing
+      // memtable
+      long startTime = System.nanoTime();
+      if (insertLock.readLock().tryLock(waitMillis, TimeUnit.MILLISECONDS)) {
+        // minus already consumed time
+        waitMillis -= (System.nanoTime() - startTime) / 1_000_000;
+        // no remaining time slice
+        if (waitMillis <= 0) {
+          insertLock.readLock().unlock();
+          return false;
+        }
+        return tryGetTsFileManagerReadLock(waitMillis);
+      } else {
+        return false;
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return false;
+    }
+  }
+
+  private boolean tryGetTsFileManagerReadLock(long waitMillis) {
     // apply read lock for TsFileResource list
-    tsFileManager.readLock();
+    try {
+      if (tsFileManager.tryReadLock(waitMillis)) {
+        return true;
+      } else {
+        // failed to acquire tsFileManager read lock, we also need to unlock 
the insertLock
+        insertLock.readLock().unlock();
+        return false;
+      }
+    } catch (InterruptedException e) {
+      // failed to acquire tsFileManager read lock, we also need to unlock the 
insertLock
+      insertLock.readLock().unlock();
+      Thread.currentThread().interrupt();
+      return false;
+    }
   }
 
   /** unlock the read lock of insert lock */
@@ -2157,15 +2338,6 @@ public class DataRegion implements IDataRegionForQuery {
       boolean isSeq)
       throws MetadataException {
 
-    if (context.isDebug()) {
-      DEBUG_LOGGER.info(
-          "Path: {}, get tsfile list: {} isSeq: {} time filter: {}",
-          pathList,
-          tsFileResources,
-          isSeq,
-          (globalTimeFilter == null ? "null" : globalTimeFilter));
-    }
-
     List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
 
     for (TsFileResource tsFileResource : tsFileResources) {
@@ -2176,19 +2348,16 @@ public class DataRegion implements IDataRegionForQuery {
           context.isDebug())) {
         continue;
       }
-      closeQueryLock.readLock().lock();
       try {
         if (tsFileResource.isClosed()) {
           tsfileResourcesForQuery.add(tsFileResource);
         } else {
           tsFileResource
               .getProcessor()
-              .query(pathList, context, tsfileResourcesForQuery, 
globalTimeFilter);
+              .queryWithoutLock(pathList, context, tsfileResourcesForQuery, 
globalTimeFilter);
         }
       } catch (IOException e) {
         throw new MetadataException(e);
-      } finally {
-        closeQueryLock.readLock().unlock();
       }
     }
     return tsfileResourcesForQuery;
@@ -2660,9 +2829,9 @@ public class DataRegion implements IDataRegionForQuery {
       isValidateTsFileFailed =
           
!TsFileValidator.getInstance().validateTsFile(tsFileProcessor.getTsFileResource());
     }
-    closeQueryLock.writeLock().lock();
+    tsFileProcessor.writeLock();
     try {
-      tsFileProcessor.close();
+      tsFileProcessor.closeWithoutSettingResourceStatus();
       if (isEmptyFile) {
         tsFileProcessor.getTsFileResource().remove();
       } else if (isValidateTsFileFailed) {
@@ -2671,10 +2840,11 @@ public class DataRegion implements IDataRegionForQuery {
         renameAndHandleError(
             tsFilePath + RESOURCE_SUFFIX, tsFilePath + RESOURCE_SUFFIX + 
BROKEN_SUFFIX);
       } else {
+        
tsFileProcessor.getTsFileResource().setStatus(TsFileResourceStatus.NORMAL);
         
tsFileResourceManager.registerSealedTsFileResource(tsFileProcessor.getTsFileResource());
       }
     } finally {
-      closeQueryLock.writeLock().unlock();
+      tsFileProcessor.writeUnlock();
     }
     if (isEmptyFile || isValidateTsFileFailed) {
       tsFileManager.remove(tsFileProcessor.getTsFileResource(), 
tsFileProcessor.isSequence());
@@ -3424,6 +3594,7 @@ public class DataRegion implements IDataRegionForQuery {
     return tsFileManager.getTsFileList(false);
   }
 
+  @Override
   public String getDataRegionId() {
     return dataRegionId;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java
index c18262145e5..14c6db513fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.storageengine.dataregion;
 
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.queryengine.common.DeviceContext;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
@@ -34,12 +35,12 @@ import java.util.Map;
 /** It's an interface that storage engine must provide for query engine */
 public interface IDataRegionForQuery {
 
-  /** lock the read lock for thread-safe */
-  void readLock();
+  boolean tryReadLock(long waitMillis);
 
   void readUnlock();
 
   /** Get satisfied QueryDataSource from DataRegion for seriesScan */
+  @TestOnly
   QueryDataSource query(
       List<PartialPath> pathList,
       String singleDeviceId,
@@ -48,21 +49,37 @@ public interface IDataRegionForQuery {
       List<Long> timePartitions)
       throws QueryProcessException;
 
+  /**
+   * Get satisfied QueryDataSource from DataRegion for seriesScan
+   *
+   * @return null means that failed to acquire lock within the specific time
+   */
+  QueryDataSource query(
+      List<PartialPath> pathList,
+      String singleDeviceId,
+      QueryContext context,
+      Filter globalTimeFilter,
+      List<Long> timePartitions,
+      long waitForLockTimeInMs)
+      throws QueryProcessException;
+
   /** Get satisfied QueryDataSource from DataRegion for regionScan */
   IQueryDataSource queryForDeviceRegionScan(
       Map<IDeviceID, DeviceContext> devicePathsToContext,
       QueryContext queryContext,
       Filter globalTimeFilter,
-      List<Long> timePartitions)
-      throws QueryProcessException;
+      List<Long> timePartitions,
+      long waitForLockTimeInMs);
 
   IQueryDataSource queryForSeriesRegionScan(
       List<PartialPath> pathList,
       QueryContext queryContext,
       Filter globalTimeFilter,
-      List<Long> timePartitions)
-      throws QueryProcessException;
+      List<Long> timePartitions,
+      long waitForLockTimeInMs);
 
   /** Get database name of this DataRegion */
   String getDatabaseName();
+
+  String getDataRegionId();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java
index c71111324c4..9e2f53b6b18 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java
@@ -41,9 +41,14 @@ public class VirtualDataRegion implements 
IDataRegionForQuery {
 
   private static final String VIRTUAL_DB_NAME = "root.__virtual";
 
+  private static final String VIRTUAL_DATA_REGION_ID = "virtual_data_region";
+
   public static final QueryDataSource EMPTY_QUERY_DATA_SOURCE =
       new QueryDataSource(Collections.emptyList(), Collections.emptyList());
 
+  public static final QueryDataSource UNFINISHED_QUERY_DATA_SOURCE =
+      new QueryDataSource(Collections.emptyList(), Collections.emptyList());
+
   private static final QueryDataSourceForRegionScan 
EMPTY_REGION_QUERY_DATA_SOURCE =
       new QueryDataSourceForRegionScan(Collections.emptyList(), 
Collections.emptyList());
 
@@ -52,8 +57,9 @@ public class VirtualDataRegion implements IDataRegionForQuery 
{
   }
 
   @Override
-  public void readLock() {
-    // do nothing, because itself is thread-safe already
+  public boolean tryReadLock(long waitMillis) {
+    // do nothing, always return true
+    return true;
   }
 
   @Override
@@ -69,6 +75,19 @@ public class VirtualDataRegion implements 
IDataRegionForQuery {
       Filter globalTimeFilter,
       List<Long> timePartitions)
       throws QueryProcessException {
+    return query(
+        pathList, singleDeviceId, context, globalTimeFilter, timePartitions, 
Long.MAX_VALUE);
+  }
+
+  @Override
+  public QueryDataSource query(
+      List<PartialPath> pathList,
+      String singleDeviceId,
+      QueryContext context,
+      Filter globalTimeFilter,
+      List<Long> timePartitions,
+      long waitForLockTimeInMs)
+      throws QueryProcessException {
     return EMPTY_QUERY_DATA_SOURCE;
   }
 
@@ -77,8 +96,8 @@ public class VirtualDataRegion implements IDataRegionForQuery 
{
       Map<IDeviceID, DeviceContext> devicePathsToContext,
       QueryContext queryContext,
       Filter globalTimeFilter,
-      List<Long> timePartitions)
-      throws QueryProcessException {
+      List<Long> timePartitions,
+      long waitForLockTimeInMs) {
     return EMPTY_REGION_QUERY_DATA_SOURCE;
   }
 
@@ -87,8 +106,8 @@ public class VirtualDataRegion implements 
IDataRegionForQuery {
       List<PartialPath> pathList,
       QueryContext queryContext,
       Filter globalTimeFilter,
-      List<Long> timePartitions)
-      throws QueryProcessException {
+      List<Long> timePartitions,
+      long waitForLockTimeInMs) {
     return EMPTY_REGION_QUERY_DATA_SOURCE;
   }
 
@@ -97,6 +116,11 @@ public class VirtualDataRegion implements 
IDataRegionForQuery {
     return VIRTUAL_DB_NAME;
   }
 
+  @Override
+  public String getDataRegionId() {
+    return VIRTUAL_DATA_REGION_ID;
+  }
+
   private static class InstanceHolder {
 
     private InstanceHolder() {}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
index f797613b078..58aabe4a5ac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
@@ -32,12 +32,16 @@ import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.write.chunk.IChunkWriter;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 
 public abstract class AbstractWritableMemChunk implements IWritableMemChunk {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractWritableMemChunk.class);
+
   protected static long RETRY_INTERVAL_MS = 100L;
   protected static long MAX_WAIT_QUERY_MS = 60 * 1000L;
 
@@ -53,11 +57,18 @@ public abstract class AbstractWritableMemChunk implements 
IWritableMemChunk {
   protected void maybeReleaseTvList(TVList tvList) {
     long startTimeInMs = System.currentTimeMillis();
     boolean succeed = false;
+    int retryCount = 0;
     while (!succeed) {
       try {
         tryReleaseTvList(tvList);
         succeed = true;
       } catch (MemoryNotEnoughException ex) {
+        // print log every 5 seconds
+        if (retryCount % 50 == 0) {
+          LOGGER.warn(
+              "Failed to transfer tvlist memory owner to query engine, {}", 
ex.getMessage());
+        }
+        retryCount++;
         long waitQueryInMs = System.currentTimeMillis() - startTimeInMs;
         if (waitQueryInMs > MAX_WAIT_QUERY_MS) {
           // Abort first query in the list. When all queries in the list have 
been aborted,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 34070a848c9..2d58015a596 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -1633,10 +1633,10 @@ public class TsFileProcessor {
   }
 
   /** Close this tsfile */
-  public void close() throws TsFileProcessorException {
+  public void closeWithoutSettingResourceStatus() throws 
TsFileProcessorException {
     try {
       // When closing resource file, its corresponding mod file is also closed.
-      tsFileResource.close();
+      tsFileResource.closeWithoutSettingStatus();
     } catch (IOException e) {
       throw new TsFileProcessorException(e);
     }
@@ -1820,7 +1820,7 @@ public class TsFileProcessor {
     return new ArrayList<>(alignedChunkMetadataForOneDevice);
   }
 
-  public void queryForSeriesRegionScan(
+  public void queryForSeriesRegionScanWithoutLock(
       List<PartialPath> pathList,
       QueryContext queryContext,
       List<IFileScanHandle> fileScanHandlesForQuery) {
@@ -1829,7 +1829,6 @@ public class TsFileProcessor {
       Map<IDeviceID, Map<String, List<IChunkHandle>>> 
deviceToMemChunkHandleMap = new HashMap<>();
       Map<IDeviceID, Map<String, List<IChunkMetadata>>> 
deviceToChunkMetadataListMap =
           new HashMap<>();
-      flushQueryLock.readLock().lock();
       try {
         for (PartialPath seriesPath : pathList) {
           Map<String, List<IChunkMetadata>> measurementToChunkMetaList = new 
HashMap<>();
@@ -1884,9 +1883,6 @@ public class TsFileProcessor {
         QUERY_RESOURCE_METRICS.recordQueryResourceNum(FLUSHING_MEMTABLE, 
flushingMemTables.size());
         QUERY_RESOURCE_METRICS.recordQueryResourceNum(
             WORKING_MEMTABLE, workMemTable != null ? 1 : 0);
-
-        flushQueryLock.readLock().unlock();
-        logFlushQueryReadUnlocked();
       }
       if (!deviceToMemChunkHandleMap.isEmpty() || 
!deviceToChunkMetadataListMap.isEmpty()) {
         fileScanHandlesForQuery.add(
@@ -1903,7 +1899,7 @@ public class TsFileProcessor {
    * Construct IFileScanHandle for data in memtable and the other ones in 
flushing memtables. Then
    * get the related ChunkMetadata of data on disk.
    */
-  public void queryForDeviceRegionScan(
+  public void queryForDeviceRegionScanWithoutLock(
       Map<IDeviceID, DeviceContext> devicePathsToContext,
       QueryContext queryContext,
       List<IFileScanHandle> fileScanHandlesForQuery) {
@@ -1912,7 +1908,6 @@ public class TsFileProcessor {
       Map<IDeviceID, Map<String, List<IChunkHandle>>> 
deviceToMemChunkHandleMap = new HashMap<>();
       Map<IDeviceID, Map<String, List<IChunkMetadata>>> 
deviceToChunkMetadataListMap =
           new HashMap<>();
-      flushQueryLock.readLock().lock();
       try {
         for (Map.Entry<IDeviceID, DeviceContext> entry : 
devicePathsToContext.entrySet()) {
           IDeviceID deviceID = entry.getKey();
@@ -1968,9 +1963,6 @@ public class TsFileProcessor {
         QUERY_RESOURCE_METRICS.recordQueryResourceNum(FLUSHING_MEMTABLE, 
flushingMemTables.size());
         QUERY_RESOURCE_METRICS.recordQueryResourceNum(
             WORKING_MEMTABLE, workMemTable != null ? 1 : 0);
-
-        flushQueryLock.readLock().unlock();
-        logFlushQueryReadUnlocked();
       }
 
       if (!deviceToMemChunkHandleMap.isEmpty() || 
!deviceToChunkMetadataListMap.isEmpty()) {
@@ -1991,18 +1983,40 @@ public class TsFileProcessor {
    *
    * @param seriesPaths selected paths
    */
+  public void queryWithoutLock(
+      List<PartialPath> seriesPaths,
+      QueryContext context,
+      List<TsFileResource> tsfileResourcesForQuery,
+      Filter globalTimeFilter)
+      throws IOException {
+    query(seriesPaths, context, tsfileResourcesForQuery, globalTimeFilter, 
false);
+  }
+
+  @TestOnly
   public void query(
       List<PartialPath> seriesPaths,
       QueryContext context,
       List<TsFileResource> tsfileResourcesForQuery,
       Filter globalTimeFilter)
       throws IOException {
+    query(seriesPaths, context, tsfileResourcesForQuery, globalTimeFilter, 
true);
+  }
+
+  private void query(
+      List<PartialPath> seriesPaths,
+      QueryContext context,
+      List<TsFileResource> tsfileResourcesForQuery,
+      Filter globalTimeFilter,
+      boolean needLock)
+      throws IOException {
     long startTime = System.nanoTime();
     try {
       Map<PartialPath, List<IChunkMetadata>> pathToChunkMetadataListMap = new 
HashMap<>();
       Map<PartialPath, List<ReadOnlyMemChunk>> pathToReadOnlyMemChunkMap = new 
HashMap<>();
 
-      flushQueryLock.readLock().lock();
+      if (needLock) {
+        flushQueryLock.readLock().lock();
+      }
       try {
         for (PartialPath seriesPath : seriesPaths) {
           List<ReadOnlyMemChunk> readOnlyMemChunks = new ArrayList<>();
@@ -2047,9 +2061,10 @@ public class TsFileProcessor {
         QUERY_RESOURCE_METRICS.recordQueryResourceNum(FLUSHING_MEMTABLE, 
flushingMemTables.size());
         QUERY_RESOURCE_METRICS.recordQueryResourceNum(
             WORKING_MEMTABLE, workMemTable != null ? 1 : 0);
-
-        flushQueryLock.readLock().unlock();
-        logFlushQueryReadUnlocked();
+        if (needLock) {
+          flushQueryLock.readLock().unlock();
+          logFlushQueryReadUnlocked();
+        }
       }
 
       if (!pathToReadOnlyMemChunkMap.isEmpty() || 
!pathToChunkMetadataListMap.isEmpty()) {
@@ -2159,6 +2174,22 @@ public class TsFileProcessor {
     return flushingMemTables;
   }
 
+  public void writeLock() {
+    flushQueryLock.writeLock().lock();
+  }
+
+  public void writeUnlock() {
+    flushQueryLock.writeLock().unlock();
+  }
+
+  public boolean tryReadLock(long waitInMs) throws InterruptedException {
+    return flushQueryLock.readLock().tryLock(waitInMs, TimeUnit.MILLISECONDS);
+  }
+
+  public void readUnLock() {
+    flushQueryLock.readLock().unlock();
+  }
+
   private void logFlushQueryWriteLocked() {
     if (logger.isDebugEnabled()) {
       logger.debug(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
index fb7d3303576..1c831534991 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeInd
 import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
 
 import org.apache.tsfile.read.filter.basic.Filter;
+import org.apache.tsfile.utils.Pair;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -34,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -96,6 +98,41 @@ public class TsFileManager {
     }
   }
 
+  /**
+   * don't need to acquire lock again, caller should guarantee the lock has 
been acquired
+   *
+   * @return left is seq resource list, right is unSeq resource list
+   */
+  public Pair<List<TsFileResource>, List<TsFileResource>> 
getAllTsFileListForQuery(
+      List<Long> timePartitions, Filter timeFilter) {
+    List<TsFileResource> seq = new ArrayList<>();
+    List<TsFileResource> unSeq = new ArrayList<>();
+    if (timePartitions == null) {
+      for (Map.Entry<Long, TsFileResourceList> entry : 
sequenceFiles.entrySet()) {
+        if (TimePartitionUtils.satisfyTimePartition(timeFilter, 
entry.getKey())) {
+          seq.addAll(entry.getValue().getArrayList());
+        }
+      }
+      for (Map.Entry<Long, TsFileResourceList> entry : 
unsequenceFiles.entrySet()) {
+        if (TimePartitionUtils.satisfyTimePartition(timeFilter, 
entry.getKey())) {
+          unSeq.addAll(entry.getValue().getArrayList());
+        }
+      }
+    } else {
+      for (Long timePartitionId : timePartitions) {
+        TsFileResourceList tsFileResources = 
sequenceFiles.get(timePartitionId);
+        if (tsFileResources != null) {
+          seq.addAll(tsFileResources.getArrayList());
+        }
+        tsFileResources = unsequenceFiles.get(timePartitionId);
+        if (tsFileResources != null) {
+          unSeq.addAll(tsFileResources.getArrayList());
+        }
+      }
+    }
+    return new Pair<>(seq, unSeq);
+  }
+
   public List<TsFileResource> getTsFileListSnapshot(long timePartition, 
boolean sequence) {
     readLock();
     try {
@@ -345,6 +382,10 @@ public class TsFileManager {
     resourceListLock.readLock().lock();
   }
 
+  public boolean tryReadLock(long waitMillis) throws InterruptedException {
+    return resourceListLock.readLock().tryLock(waitMillis, 
TimeUnit.MILLISECONDS);
+  }
+
   public void readUnlock() {
     resourceListLock.readLock().unlock();
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
index 4920f39a645..1f092e4c97c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
@@ -74,6 +74,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.eq;
 
 public class DataDriverTest {
 
@@ -91,6 +92,7 @@ public class DataDriverTest {
 
   @Before
   public void setUp() throws MetadataException, IOException, 
WriteProcessException {
+    
IoTDBDescriptor.getInstance().getConfig().setDriverTaskExecutionTimeSliceInMs(10000);
     SeriesReaderTestUtil.setUp(
         measurementSchemas, deviceIds, seqResources, unSeqResources, 
DATA_DRIVER_TEST_SG);
   }
@@ -116,6 +118,7 @@ public class DataDriverTest {
       FragmentInstanceStateMachine stateMachine =
           new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
       DataRegion dataRegion = Mockito.mock(DataRegion.class);
+      Mockito.when(dataRegion.tryReadLock(Mockito.anyLong())).thenReturn(true);
       FragmentInstanceContext fragmentInstanceContext =
           createFragmentInstanceContext(instanceId, stateMachine);
       fragmentInstanceContext.setDataRegion(dataRegion);
@@ -174,10 +177,16 @@ public class DataDriverTest {
       LimitOperator limitOperator =
           new LimitOperator(driverContext.getOperatorContexts().get(3), 250, 
timeJoinOperator);
 
+      fragmentInstanceContext.setSourcePaths(driverContext.getPaths());
       String deviceId = DATA_DRIVER_TEST_SG + ".device0";
       Mockito.when(
               dataRegion.query(
-                  driverContext.getPaths(), deviceId, fragmentInstanceContext, 
null, null))
+                  eq(driverContext.getPaths()),
+                  eq(deviceId),
+                  eq(fragmentInstanceContext),
+                  Mockito.isNull(),
+                  Mockito.isNull(),
+                  Mockito.anyLong()))
           .thenReturn(new QueryDataSource(seqResources, unSeqResources));
       fragmentInstanceContext.initQueryDataSource(driverContext.getPaths());
       fragmentInstanceContext.initializeNumOfDrivers(1);
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index f7a9c4c3854..da5fac70664 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -660,6 +660,7 @@ struct TFetchFragmentInstanceStatisticsResp {
   14: optional i64 blockQueuedTime
   15: optional string ip
   16: optional string state
+  17: optional i32 initDataQuerySourceRetryCount
 }
 
 /**

Reply via email to