This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch addAlignedForTSGroupByQueryIntervalReq
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bfe99f3fa85153b5bab94d8540252d4b6171f2b2
Author: Beyyes <[email protected]>
AuthorDate: Wed Apr 9 13:59:43 2025 +0800

    Optimize group by query in ClientRPCServiceImpl to reduce cpu usage (#15178)
---
 .../protocol/thrift/impl/ClientRPCServiceImpl.java |  66 ++++++-------
 .../execution/aggregation/TreeAggregator.java      |  37 +++----
 .../execution/driver/DriverContext.java            |   6 ++
 .../fragment/FakedFragmentInstanceContext.java     | 106 +++++++++++++++++++++
 .../fragment/FragmentInstanceContext.java          |  17 +++-
 .../execution/fragment/QueryContext.java           |   2 +-
 .../execution/schedule/task/DriverTaskId.java      |   3 +-
 .../memory/FakedMemoryReservationManager.java      |  35 +++++++
 .../dataregion/tsfile/TsFileResourceList.java      |   2 +-
 .../thrift-datanode/src/main/thrift/client.thrift  |   1 +
 10 files changed, 210 insertions(+), 65 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 7c11bb54b38..b29521a3b2e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -56,23 +56,18 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
 import org.apache.iotdb.db.protocol.session.IClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.protocol.thrift.OperationType;
-import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
-import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
-import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
 import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
 import 
org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory;
 import org.apache.iotdb.db.queryengine.execution.aggregation.TreeAggregator;
 import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
-import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
-import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
-import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FakedFragmentInstanceContext;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryUtil;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesAggregationScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesAggregationScanOperator;
-import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
@@ -124,6 +119,7 @@ import 
org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.schemaengine.template.TemplateQueryType;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
 import 
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager;
 import org.apache.iotdb.db.storageengine.rescon.quotas.OperationQuota;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
@@ -226,7 +222,6 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
 import static 
org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
-import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest;
 import static 
org.apache.iotdb.db.utils.CommonUtils.getContentOfTSFastLastDataQueryForOneDeviceReq;
@@ -270,7 +265,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
   private final TreeDeviceSchemaCacheManager DATA_NODE_SCHEMA_CACHE =
       TreeDeviceSchemaCacheManager.getInstance();
 
-  public static Duration DEFAULT_TIME_SLICE = new Duration(60_000, 
TimeUnit.MILLISECONDS);
+  public static final Duration DEFAULT_TIME_SLICE = new Duration(60_000, 
TimeUnit.MILLISECONDS);
 
   private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
       TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
@@ -790,6 +785,9 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     }
   }
 
+  private final List<InputLocation[]> inputLocationList =
+      Collections.singletonList(new InputLocation[] {new InputLocation(0, 0)});
+
   @SuppressWarnings("java:S2095") // close() do nothing
   private List<TsBlock> executeGroupByQueryInternal(
       SessionInfo sessionInfo,
@@ -812,21 +810,14 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
     Filter timeFilter = TimeFilterApi.between(startTime, endTime - 1);
 
-    QueryId queryId = new QueryId("stub_query");
-    FragmentInstanceId instanceId =
-        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
-    FragmentInstanceStateMachine stateMachine =
-        new FragmentInstanceStateMachine(
-            instanceId, 
FragmentInstanceManager.getInstance().instanceNotificationExecutor);
-    FragmentInstanceContext fragmentInstanceContext =
-        createFragmentInstanceContext(
-            instanceId, stateMachine, sessionInfo, dataRegionList.get(0), 
timeFilter);
+    FakedFragmentInstanceContext fragmentInstanceContext =
+        new FakedFragmentInstanceContext(timeFilter, dataRegionList.get(0));
+
     DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
     PlanNodeId planNodeId = new PlanNodeId("1");
-    driverContext.addOperatorContext(1, planNodeId, 
SeriesScanOperator.class.getSimpleName());
-    driverContext
-        .getOperatorContexts()
-        .forEach(operatorContext -> 
operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE));
+    OperatorContext operatorContext =
+        new OperatorContext(1, planNodeId, "SeriesAggregationScanOperator", 
driverContext);
+    operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE);
 
     SeriesScanOptions.Builder scanOptionsBuilder = new 
SeriesScanOptions.Builder();
     scanOptionsBuilder.withAllSensors(Collections.singleton(measurement));
@@ -844,7 +835,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
                 true,
                 true),
             AggregationStep.SINGLE,
-            Collections.singletonList(new InputLocation[] {new 
InputLocation(0, 0)}));
+            inputLocationList);
 
     GroupByTimeParameter groupByTimeParameter =
         new GroupByTimeParameter(
@@ -852,6 +843,10 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
     IMeasurementSchema measurementSchema = new MeasurementSchema(measurement, 
dataType);
     AbstractSeriesAggregationScanOperator operator;
+    boolean canUseStatistics =
+        !TSDataType.BLOB.equals(dataType)
+            || (!TAggregationType.LAST_VALUE.equals(aggregationType)
+                && !TAggregationType.FIRST_VALUE.equals(aggregationType));
     IFullPath path;
     if (isAligned) {
       path =
@@ -865,36 +860,37 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
               (AlignedFullPath) path,
               Ordering.ASC,
               scanOptionsBuilder.build(),
-              driverContext.getOperatorContexts().get(0),
+              operatorContext,
               Collections.singletonList(aggregator),
               initTimeRangeIterator(groupByTimeParameter, true, true, 
sessionInfo.getZoneId()),
               groupByTimeParameter,
               DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
-              !TSDataType.BLOB.equals(dataType)
-                  || (!TAggregationType.LAST_VALUE.equals(aggregationType)
-                      && 
!TAggregationType.FIRST_VALUE.equals(aggregationType)));
+              canUseStatistics);
     } else {
       path = new NonAlignedFullPath(deviceID, measurementSchema);
+      //      String[] splits = device.split("\\.");
+      //      String[] fullPaths = new String[splits.length + 1];
+      //      System.arraycopy(splits, 0, fullPaths, 0, splits.length);
+      //      fullPaths[splits.length] = measurement;
+      //      path = new MeasurementPath(fullPaths, measurementSchema);
       operator =
           new SeriesAggregationScanOperator(
               planNodeId,
               path,
               Ordering.ASC,
               scanOptionsBuilder.build(),
-              driverContext.getOperatorContexts().get(0),
+              operatorContext,
               Collections.singletonList(aggregator),
               initTimeRangeIterator(groupByTimeParameter, true, true, 
sessionInfo.getZoneId()),
               groupByTimeParameter,
               DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
-              !TSDataType.BLOB.equals(dataType)
-                  || (!TAggregationType.LAST_VALUE.equals(aggregationType)
-                      && 
!TAggregationType.FIRST_VALUE.equals(aggregationType)));
+              canUseStatistics);
     }
 
     try {
       List<TsBlock> result = new ArrayList<>();
-      fragmentInstanceContext.setSourcePaths(Collections.singletonList(path));
-      
operator.initQueryDataSource(fragmentInstanceContext.getSharedQueryDataSource());
+      QueryDataSource dataSource = 
fragmentInstanceContext.getSharedQueryDataSource(path);
+      operator.initQueryDataSource(dataSource);
 
       while (operator.hasNext()) {
         result.add(operator.next());
@@ -904,7 +900,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     } catch (Exception e) {
       throw new RuntimeException(e);
     } finally {
-      fragmentInstanceContext.releaseResource();
+      fragmentInstanceContext.releaseSharedQueryDataSource();
     }
   }
 
@@ -1300,7 +1296,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
               deviceId,
               measurementId,
               dataType,
-              true,
+              req.isAligned,
               req.getStartTime(),
               req.getEndTime(),
               req.getInterval(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java
index 30ce92091d8..e1c9697e3e6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java
@@ -64,29 +64,22 @@ public class TreeAggregator {
 
   // Used for SeriesAggregateScanOperator and RawDataAggregateOperator
   public void processTsBlock(TsBlock tsBlock, BitMap bitMap) {
-    long startTime = System.nanoTime();
-    try {
-      checkArgument(
-          step.isInputRaw(),
-          "Step in SeriesAggregateScanOperator and RawDataAggregateOperator 
can only process raw input");
-      for (InputLocation[] inputLocations : inputLocationList) {
-        Column[] timeAndValueColumn = new Column[1 + inputLocations.length];
-        timeAndValueColumn[0] = tsBlock.getTimeColumn();
-        for (int i = 0; i < inputLocations.length; i++) {
-          checkArgument(
-              inputLocations[i].getTsBlockIndex() == 0,
-              "RawDataAggregateOperator can only process one tsBlock input.");
-          int index = inputLocations[i].getValueColumnIndex();
-          // for count_time, time column is also its value column
-          // for max_by, the input column can also be time column.
-          timeAndValueColumn[1 + i] =
-              index == -1 ? timeAndValueColumn[0] : tsBlock.getColumn(index);
-        }
-        accumulator.addInput(timeAndValueColumn, bitMap);
+    checkArgument(
+        step.isInputRaw(),
+        "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can 
only process raw input");
+    for (InputLocation[] inputLocations : inputLocationList) {
+      Column[] timeAndValueColumn = new Column[1 + inputLocations.length];
+      timeAndValueColumn[0] = tsBlock.getTimeColumn();
+      for (int i = 0; i < inputLocations.length; i++) {
+        checkArgument(
+            inputLocations[i].getTsBlockIndex() == 0,
+            "RawDataAggregateOperator can only process one tsBlock input.");
+        int index = inputLocations[i].getValueColumnIndex();
+        // for count_time, time column is also its value column
+        // for max_by, the input column can also be time column.
+        timeAndValueColumn[1 + i] = index == -1 ? timeAndValueColumn[0] : 
tsBlock.getColumn(index);
       }
-    } finally {
-      QUERY_EXECUTION_METRICS.recordExecutionCost(
-          AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
+      accumulator.addInput(timeAndValueColumn, bitMap);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
index 0c3fa448c64..9231b89e326 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
@@ -50,6 +50,12 @@ public class DriverContext {
     this.fragmentInstanceContext = null;
   }
 
+  @TestOnly
+  // should only be used by executeGroupByQueryInternal
+  public DriverContext(FragmentInstanceContext fragmentInstanceContext) {
+    this.fragmentInstanceContext = fragmentInstanceContext;
+  }
+
   public DriverContext(FragmentInstanceContext fragmentInstanceContext, int 
pipelineId) {
     this.fragmentInstanceContext = fragmentInstanceContext;
     this.driverTaskID = new DriverTaskId(fragmentInstanceContext.getId(), 
pipelineId);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java
new file mode 100644
index 00000000000..ea2757345dd
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FakedFragmentInstanceContext.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.fragment;
+
+import org.apache.iotdb.commons.path.IFullPath;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.FakedMemoryReservationManager;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+import 
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.read.filter.basic.Filter;
+
+import java.util.Collections;
+import java.util.List;
+
+public class FakedFragmentInstanceContext extends FragmentInstanceContext {
+
+  public FakedFragmentInstanceContext(Filter timeFilter, DataRegion 
dataRegion) {
+    super(0, new FakedMemoryReservationManager(), timeFilter, dataRegion);
+  }
+
+  public QueryDataSource getSharedQueryDataSource(IFullPath sourcePath)
+      throws QueryProcessException {
+    if (sharedQueryDataSource == null) {
+      initQueryDataSource(sourcePath);
+    }
+    return (QueryDataSource) sharedQueryDataSource;
+  }
+
+  public void initQueryDataSource(IFullPath sourcePath) throws 
QueryProcessException {
+
+    dataRegion.readLock();
+    try {
+      this.sharedQueryDataSource =
+          dataRegion.query(
+              Collections.singletonList(sourcePath),
+              sourcePath.getDeviceId(),
+              this,
+              getGlobalTimeFilter(),
+              null);
+
+      // used files should be added before mergeLock is unlocked, or they may 
be deleted by
+      // running merge
+      if (sharedQueryDataSource != null) {
+        ((QueryDataSource) sharedQueryDataSource).setSingleDevice(true);
+        List<TsFileResource> tsFileList =
+            ((QueryDataSource) sharedQueryDataSource).getSeqResources();
+        if (tsFileList != null) {
+          for (TsFileResource tsFile : tsFileList) {
+            
FileReaderManager.getInstance().increaseFileReaderReference(tsFile, 
tsFile.isClosed());
+          }
+        }
+        tsFileList = ((QueryDataSource) 
sharedQueryDataSource).getUnseqResources();
+        if (tsFileList != null) {
+          for (TsFileResource tsFile : tsFileList) {
+            
FileReaderManager.getInstance().increaseFileReaderReference(tsFile, 
tsFile.isClosed());
+          }
+        }
+      }
+    } finally {
+      dataRegion.readUnlock();
+    }
+  }
+
+  public void releaseSharedQueryDataSource() {
+    if (sharedQueryDataSource != null) {
+      List<TsFileResource> tsFileList = ((QueryDataSource) 
sharedQueryDataSource).getSeqResources();
+      if (tsFileList != null) {
+        for (TsFileResource tsFile : tsFileList) {
+          FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, 
tsFile.isClosed());
+        }
+      }
+      tsFileList = ((QueryDataSource) 
sharedQueryDataSource).getUnseqResources();
+      if (tsFileList != null) {
+        for (TsFileResource tsFile : tsFileList) {
+          FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, 
tsFile.isClosed());
+        }
+      }
+      sharedQueryDataSource = null;
+    }
+  }
+
+  @Override
+  protected boolean checkIfModificationExists(TsFileResource tsFileResource) {
+    return false;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 3702f8a3e45..4d0a0a07752 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationMana
 import 
org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate;
 import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
 import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
 import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
@@ -83,7 +84,7 @@ public class FragmentInstanceContext extends QueryContext {
 
   private final MemoryReservationManager memoryReservationManager;
 
-  private IDataRegionForQuery dataRegion;
+  protected IDataRegionForQuery dataRegion;
   private Filter globalTimeFilter;
 
   // it will only be used once, after sharedQueryDataSource being inited, it 
will be set to null
@@ -93,7 +94,7 @@ public class FragmentInstanceContext extends QueryContext {
   private Map<IDeviceID, DeviceContext> devicePathsToContext;
 
   // Shared by all scan operators in this fragment instance to avoid memory 
problem
-  private IQueryDataSource sharedQueryDataSource;
+  protected IQueryDataSource sharedQueryDataSource;
 
   /** closed tsfile used in this fragment instance. */
   private Set<TsFileResource> closedFilePaths;
@@ -185,7 +186,7 @@ public class FragmentInstanceContext extends QueryContext {
   }
 
   public static FragmentInstanceContext 
createFragmentInstanceContextForCompaction(long queryId) {
-    return new FragmentInstanceContext(queryId);
+    return new FragmentInstanceContext(queryId, null, null, null);
   }
 
   public void setQueryDataSourceType(QueryDataSourceType queryDataSourceType) {
@@ -288,13 +289,19 @@ public class FragmentInstanceContext extends QueryContext 
{
   }
 
   // used for compaction
-  private FragmentInstanceContext(long queryId) {
+  protected FragmentInstanceContext(
+      long queryId,
+      MemoryReservationManager memoryReservationManager,
+      Filter timeFilter,
+      DataRegion dataRegion) {
     this.queryId = queryId;
     this.id = null;
     this.stateMachine = null;
     this.dataNodeQueryContextMap = null;
     this.dataNodeQueryContext = null;
-    this.memoryReservationManager = null;
+    this.dataRegion = dataRegion;
+    this.globalTimeFilter = timeFilter;
+    this.memoryReservationManager = memoryReservationManager;
   }
 
   public void start() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
index b7ba7c9d14a..c2f95e39b17 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
@@ -91,7 +91,7 @@ public class QueryContext {
   }
 
   // if the mods file does not exist, do not add it to the cache
-  private boolean checkIfModificationExists(TsFileResource tsFileResource) {
+  protected boolean checkIfModificationExists(TsFileResource tsFileResource) {
     if (nonExistentModFiles.contains(tsFileResource.getTsFileID())) {
       return false;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTaskId.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTaskId.java
index 865af681b37..3dff2044a6e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTaskId.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTaskId.java
@@ -36,11 +36,12 @@ public class DriverTaskId implements ID, 
Comparable<DriverTaskId> {
   // Currently, we just save pipelineId in driverTask since it's one-to-one 
relation.
   private final int pipelineId;
   private final String fullId;
+  private static final String EMPTY_FULL_ID = "EmptyFullId";
 
   public DriverTaskId(FragmentInstanceId id, int pipelineId) {
     this.fragmentInstanceId = id;
     this.pipelineId = pipelineId;
-    this.fullId = String.format("%s.%d", id.getFullId(), pipelineId);
+    this.fullId = String.format("%s.%d", id == null ? EMPTY_FULL_ID : 
id.getFullId(), pipelineId);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
new file mode 100644
index 00000000000..265ca47ca23
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.memory;
+
+public class FakedMemoryReservationManager implements MemoryReservationManager 
{
+
+  @Override
+  public void reserveMemoryCumulatively(long size) {}
+
+  @Override
+  public void reserveMemoryImmediately() {}
+
+  @Override
+  public void releaseMemoryCumulatively(long size) {}
+
+  @Override
+  public void releaseAllReservedMemory() {}
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResourceList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResourceList.java
index 35cedd5d1c0..a1d7b3dafbf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResourceList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResourceList.java
@@ -354,7 +354,7 @@ public class TsFileResourceList implements 
List<TsFileResource> {
   }
 
   public List<TsFileResource> getArrayList() {
-    List<TsFileResource> list = new ArrayList<>();
+    List<TsFileResource> list = new ArrayList<>(count);
     TsFileResource current = header;
     while (current != null) {
       list.add(current);
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index aaa319f3500..7e9a0387722 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -397,6 +397,7 @@ struct TSGroupByQueryIntervalReq {
   10: optional i64 interval
   11: optional i32 fetchSize
   12: optional i64 timeout
+  13: optional bool isAligned
 }
 
 struct TSCreateMultiTimeseriesReq {

Reply via email to