This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch fix-aggregation-metrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 87ac713007ab6234d844e34d91379ba3f8958c5b
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Jun 24 14:53:55 2026 +0800

    Fix aggregation metric recording
---
 .../execution/operator/CommonOperatorContext.java  |  15 +
 .../aggregation/AggregationOperator.java           |  15 +-
 .../relational/aggregation/TableAggregator.java    |  49 +-
 .../grouped/StreamingAggregationOperator.java      |   9 +-
 .../builder/InMemoryHashAggregationBuilder.java    |   9 +-
 .../iotdb/calc/metric/QueryExecutionMetricSet.java |  15 +-
 .../execution/aggregation/TreeAggregator.java      |  92 ++-
 .../fragment/FragmentInstanceContext.java          |  53 ++
 .../execution/operator/OperatorContext.java        |  27 +
 .../operator/process/AggregationOperator.java      |  25 +-
 .../process/RawDataAggregationOperator.java        | 129 +++--
 .../operator/process/TagAggregationOperator.java   |  15 +-
 .../AbstractSeriesAggregationScanOperator.java     |  26 +-
 .../relational/AbstractAggTableScanOperator.java   |  85 +--
 .../fragment/FragmentInstanceContextTest.java      | 623 +++++++++++++++++++++
 15 files changed, 968 insertions(+), 219 deletions(-)

diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java
index 2c99fa5d777..0ffe44716ce 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java
@@ -103,6 +103,21 @@ public abstract class CommonOperatorContext implements 
Accountable {
     this.totalExecutionTimeInNanos += executionTimeInNanos;
   }
 
+  public void recordScanAggregationFromRawDataCost(long costTimeInNanos) {
+    // calc-commons operators can run in tests or standalone contexts that are 
not backed by a
+    // DataNode FragmentInstanceContext. DataNode OperatorContext overrides 
this to forward costs.
+  }
+
+  public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) {
+    // calc-commons operators can run in tests or standalone contexts that are 
not backed by a
+    // DataNode FragmentInstanceContext. DataNode OperatorContext overrides 
this to forward costs.
+  }
+
+  public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) {
+    // calc-commons operators can run in tests or standalone contexts that are 
not backed by a
+    // DataNode FragmentInstanceContext. DataNode OperatorContext overrides 
this to forward costs.
+  }
+
   public void recordNextCalled() {
     this.nextCalledCount++;
   }
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java
index e65ee82517d..047894fa8bc 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java
@@ -92,9 +92,7 @@ public class AggregationOperator implements ProcessOperator {
         return null;
       }
 
-      for (TableAggregator aggregator : aggregators) {
-        aggregator.processBlock(block);
-      }
+      processBlock(block);
 
       return null;
     } else {
@@ -127,6 +125,17 @@ public class AggregationOperator implements 
ProcessOperator {
     return operatorContext;
   }
 
+  private void processBlock(TsBlock block) {
+    long startTime = System.nanoTime();
+    try {
+      for (TableAggregator aggregator : aggregators) {
+        aggregator.processBlock(block);
+      }
+    } finally {
+      
operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - 
startTime);
+    }
+  }
+
   @Override
   public long calculateMaxPeekMemory() {
     return Math.max(
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java
index 1df3f6fb159..c7a7b6b7895 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.calc.execution.operator.source.relational.aggregation;
 
-import org.apache.iotdb.calc.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.calc.plan.planner.CommonOperatorUtils;
 import 
org.apache.iotdb.commons.queryengine.plan.relational.planner.node.AggregationNode;
 
@@ -36,13 +35,9 @@ import java.util.OptionalInt;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
-import static 
org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
 
 public class TableAggregator {
 
-  public static final QueryExecutionMetricSet QUERY_EXECUTION_METRICS =
-      QueryExecutionMetricSet.getInstance();
-
   private final TableAccumulator accumulator;
   private final AggregationNode.Step step;
   private final TSDataType outputType;
@@ -70,34 +65,28 @@ public class TableAggregator {
   }
 
   public void processBlock(TsBlock block) {
-    long startTime = System.nanoTime();
-    try {
-      Column[] arguments = block.getColumns(inputChannels);
-
-      // process count(*)
-      if (arguments.length == 0) {
-        arguments =
-            new Column[] {
-              new RunLengthEncodedColumn(
-                  CommonOperatorUtils.TIME_COLUMN_TEMPLATE, 
block.getPositionCount())
-            };
-      }
-
-      if (step.isInputRaw()) {
-        // Use select-all AggregationMask here because filter of Agg-Function 
is not supported now
-        AggregationMask mask = 
AggregationMask.createSelectAll(block.getPositionCount());
+    Column[] arguments = block.getColumns(inputChannels);
+
+    // process count(*)
+    if (arguments.length == 0) {
+      arguments =
+          new Column[] {
+            new RunLengthEncodedColumn(
+                CommonOperatorUtils.TIME_COLUMN_TEMPLATE, 
block.getPositionCount())
+          };
+    }
 
-        if (maskChannel.isPresent()) {
-          mask.applyMaskBlock(block.getColumn(maskChannel.getAsInt()));
-        }
+    if (step.isInputRaw()) {
+      // Use select-all AggregationMask here because filter of Agg-Function is 
not supported now
+      AggregationMask mask = 
AggregationMask.createSelectAll(block.getPositionCount());
 
-        accumulator.addInput(arguments, mask);
-      } else {
-        accumulator.addIntermediate(arguments[0]);
+      if (maskChannel.isPresent()) {
+        mask.applyMaskBlock(block.getColumn(maskChannel.getAsInt()));
       }
-    } finally {
-      QUERY_EXECUTION_METRICS.recordExecutionCost(
-          AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
+
+      accumulator.addInput(arguments, mask);
+    } else {
+      accumulator.addIntermediate(arguments[0]);
     }
   }
 
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
index 7609ac32926..9808a1b4eed 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
@@ -176,8 +176,13 @@ public class StreamingAggregationOperator extends 
AbstractOperator {
 
   private void addRowsToAggregators(TsBlock page, int startPosition, int 
endPosition) {
     TsBlock region = page.getRegion(startPosition, endPosition - startPosition 
+ 1);
-    for (TableAggregator aggregator : aggregators) {
-      aggregator.processBlock(region);
+    long startTime = System.nanoTime();
+    try {
+      for (TableAggregator aggregator : aggregators) {
+        aggregator.processBlock(region);
+      }
+    } finally {
+      
operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - 
startTime);
     }
   }
 
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java
index f372717a92f..07e4cb9adbb 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java
@@ -140,8 +140,13 @@ public class InMemoryHashAggregationBuilder implements 
HashAggregationBuilder {
       operatorContext.recordSpecifiedInfo(MAX_GROUP_NUMBER, 
Long.toString(groupCount));
       maxGroupNumber = groupCount;
     }
-    for (GroupedAggregator groupedAggregator : groupedAggregators) {
-      groupedAggregator.processBlock(groupCount, groupByIdBlock, block);
+    long startTime = System.nanoTime();
+    try {
+      for (GroupedAggregator groupedAggregator : groupedAggregators) {
+        groupedAggregator.processBlock(groupCount, groupByIdBlock, block);
+      }
+    } finally {
+      
operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - 
startTime);
     }
   }
 
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java
index 636b11b4cc0..5452e544509 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java
@@ -143,8 +143,11 @@ public class QueryExecutionMetricSet implements IMetricSet 
{
   // region query aggregation
   public static final String AGGREGATION_FROM_RAW_DATA = 
"aggregation_from_raw_data";
   public static final String AGGREGATION_FROM_STATISTICS = 
"aggregation_from_statistics";
+  public static final String AGGREGATION_OPERATOR_FROM_RAW_DATA =
+      "aggregation_operator_from_raw_data";
   private Timer aggregationFromRawDataTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
   private Timer aggregationFromStatisticsTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer aggregationOperatorFromRawDataTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
 
   private void bindQueryAggregation(AbstractMetricService metricService) {
     aggregationFromRawDataTimer =
@@ -156,12 +159,19 @@ public class QueryExecutionMetricSet implements 
IMetricSet {
             MetricLevel.IMPORTANT,
             Tag.FROM.toString(),
             "statistics");
+    aggregationOperatorFromRawDataTimer =
+        metricService.getOrCreateTimer(
+            Metric.AGGREGATION.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.FROM.toString(),
+            "raw_data_operator");
   }
 
   private void unbindQueryAggregation(AbstractMetricService metricService) {
     aggregationFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
     aggregationFromStatisticsTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
-    Arrays.asList("raw_data", "statistics")
+    aggregationOperatorFromRawDataTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+    Arrays.asList("raw_data", "statistics", "raw_data_operator")
         .forEach(
             from ->
                 metricService.remove(
@@ -213,6 +223,9 @@ public class QueryExecutionMetricSet implements IMetricSet {
       case AGGREGATION_FROM_STATISTICS:
         aggregationFromStatisticsTimer.update(costTimeInNanos, 
TimeUnit.NANOSECONDS);
         break;
+      case AGGREGATION_OPERATOR_FROM_RAW_DATA:
+        aggregationOperatorFromRawDataTimer.update(costTimeInNanos, 
TimeUnit.NANOSECONDS);
+        break;
       default:
         break;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java
index 07b482bbdfb..2e8650f50aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.queryengine.execution.aggregation;
 
 import org.apache.iotdb.calc.execution.aggregation.Accumulator;
-import org.apache.iotdb.calc.metric.QueryExecutionMetricSet;
 import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.parameter.InputLocation;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
 
@@ -35,8 +34,6 @@ import java.util.Collections;
 import java.util.List;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static 
org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
-import static 
org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS;
 
 public class TreeAggregator {
 
@@ -44,8 +41,6 @@ public class TreeAggregator {
   // In some intermediate result input, inputLocation[] should include two 
columns
   protected List<InputLocation[]> inputLocationList;
   protected final AggregationStep step;
-  public static final QueryExecutionMetricSet QUERY_EXECUTION_METRICS =
-      QueryExecutionMetricSet.getInstance();
 
   // Used for SeriesAggregateScanOperator
   public TreeAggregator(Accumulator accumulator, AggregationStep step) {
@@ -65,57 +60,44 @@ public class TreeAggregator {
 
   // Used for SeriesAggregateScanOperator and RawDataAggregateOperator
   public void processTsBlock(TsBlock tsBlock, BitMap bitMap) {
-    long startTime = System.nanoTime();
-    try {
-      checkArgument(
-          step.isInputRaw(),
-          "Step in SeriesAggregateScanOperator and RawDataAggregateOperator 
can only process raw input");
-      for (InputLocation[] inputLocations : inputLocationList) {
-        Column[] timeAndValueColumn = new Column[1 + inputLocations.length];
-        timeAndValueColumn[0] = tsBlock.getTimeColumn();
-        for (int i = 0; i < inputLocations.length; i++) {
-          checkArgument(
-              inputLocations[i].getTsBlockIndex() == 0,
-              "RawDataAggregateOperator can only process one tsBlock input.");
-          int index = inputLocations[i].getValueColumnIndex();
-          // for count_time, time column is also its value column
-          // for max_by, the input column can also be time column.
-          timeAndValueColumn[1 + i] =
-              index == -1 ? timeAndValueColumn[0] : tsBlock.getColumn(index);
-        }
-        accumulator.addInput(timeAndValueColumn, bitMap);
+    checkArgument(
+        step.isInputRaw(),
+        "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can 
only process raw input");
+    for (InputLocation[] inputLocations : inputLocationList) {
+      Column[] timeAndValueColumn = new Column[1 + inputLocations.length];
+      timeAndValueColumn[0] = tsBlock.getTimeColumn();
+      for (int i = 0; i < inputLocations.length; i++) {
+        checkArgument(
+            inputLocations[i].getTsBlockIndex() == 0,
+            "RawDataAggregateOperator can only process one tsBlock input.");
+        int index = inputLocations[i].getValueColumnIndex();
+        // for count_time, time column is also its value column
+        // for max_by, the input column can also be time column.
+        timeAndValueColumn[1 + i] = index == -1 ? timeAndValueColumn[0] : 
tsBlock.getColumn(index);
       }
-    } finally {
-      QUERY_EXECUTION_METRICS.recordExecutionCost(
-          AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
+      accumulator.addInput(timeAndValueColumn, bitMap);
     }
   }
 
   // Used for AggregateOperator
   public void processTsBlocks(TsBlock[] tsBlock) {
-    long startTime = System.nanoTime();
-    try {
-      checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot 
process raw input");
-      if (step.isInputFinal()) {
-        checkArgument(inputLocationList.size() == 1, "Final output can only be 
single column");
-        Column finalResult =
-            tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn(
-                inputLocationList.get(0)[0].getValueColumnIndex());
-        accumulator.setFinal(finalResult);
-      } else {
-        for (InputLocation[] inputLocations : inputLocationList) {
-          Column[] columns = new Column[inputLocations.length];
-          for (int i = 0; i < inputLocations.length; i++) {
-            columns[i] =
-                tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
-                    inputLocations[i].getValueColumnIndex());
-          }
-          accumulator.addIntermediate(columns);
+    checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot 
process raw input");
+    if (step.isInputFinal()) {
+      checkArgument(inputLocationList.size() == 1, "Final output can only be 
single column");
+      Column finalResult =
+          tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn(
+              inputLocationList.get(0)[0].getValueColumnIndex());
+      accumulator.setFinal(finalResult);
+    } else {
+      for (InputLocation[] inputLocations : inputLocationList) {
+        Column[] columns = new Column[inputLocations.length];
+        for (int i = 0; i < inputLocations.length; i++) {
+          columns[i] =
+              tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
+                  inputLocations[i].getValueColumnIndex());
         }
+        accumulator.addIntermediate(columns);
       }
-    } finally {
-      QUERY_EXECUTION_METRICS.recordExecutionCost(
-          AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
     }
   }
 
@@ -129,16 +111,10 @@ public class TreeAggregator {
 
   /** Used for SeriesAggregateScanOperator. */
   public void processStatistics(Statistics timeStatistics, Statistics[] 
valueStatistics) {
-    long startTime = System.nanoTime();
-    try {
-      for (InputLocation[] inputLocations : inputLocationList) {
-        int valueIndex = inputLocations[0].getValueColumnIndex();
-        // valueIndex == -1 means it is count_time, we need to use 
timeStatistics
-        accumulator.addStatistics(valueIndex == -1 ? timeStatistics : 
valueStatistics[valueIndex]);
-      }
-    } finally {
-      QUERY_EXECUTION_METRICS.recordExecutionCost(
-          AGGREGATION_FROM_STATISTICS, System.nanoTime() - startTime);
+    for (InputLocation[] inputLocations : inputLocationList) {
+      int valueIndex = inputLocations[0].getValueColumnIndex();
+      // valueIndex == -1 means it is count_time, we need to use timeStatistics
+      accumulator.addStatistics(valueIndex == -1 ? timeStatistics : 
valueStatistics[valueIndex]);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 47eab012364..03d02456474 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.execution.fragment;
 
 import org.apache.iotdb.calc.exception.MemoryNotEnoughException;
 import org.apache.iotdb.calc.exception.QueryProcessException;
+import org.apache.iotdb.calc.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.audit.UserEntity;
@@ -86,6 +87,9 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
+import static 
org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS;
+import static 
org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_OPERATOR_FROM_RAW_DATA;
 import static 
org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause;
 import static 
org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME;
 import static 
org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME;
@@ -168,6 +172,9 @@ public class FragmentInstanceContext extends QueryContext {
   private int initQueryDataSourceRetryCount = 0;
   private final AtomicLong readyQueueTime = new AtomicLong(0);
   private final AtomicLong blockQueueTime = new AtomicLong(0);
+  private final AtomicLong scanAggregationFromRawDataCost = new AtomicLong(0);
+  private final AtomicLong scanAggregationFromStatisticsCost = new 
AtomicLong(0);
+  private final AtomicLong aggregationOperatorFromRawDataCost = new 
AtomicLong(0);
   private long unclosedSeqFileNum = 0;
   private long unclosedUnseqFileNum = 0;
   private long closedSeqFileNum = 0;
@@ -1117,6 +1124,8 @@ public class FragmentInstanceContext extends QueryContext 
{
 
     
QueryRelatedResourceMetricSet.getInstance().updateFragmentInstanceTime(durationTime);
 
+    recordAggregationCostToMetric();
+
     QueryResourceMetricSet.getInstance()
         .recordInitQueryResourceRetryCount(getInitQueryDataSourceRetryCount());
 
@@ -1264,6 +1273,50 @@ public class FragmentInstanceContext extends 
QueryContext {
     blockQueueTime.addAndGet(time);
   }
 
+  public void recordScanAggregationFromRawDataCost(long costTimeInNanos) {
+    addCost(scanAggregationFromRawDataCost, costTimeInNanos);
+  }
+
+  public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) {
+    addCost(scanAggregationFromStatisticsCost, costTimeInNanos);
+  }
+
+  public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) {
+    addCost(aggregationOperatorFromRawDataCost, costTimeInNanos);
+  }
+
+  private void addCost(AtomicLong cost, long costTimeInNanos) {
+    if (costTimeInNanos > 0) {
+      cost.addAndGet(costTimeInNanos);
+    }
+  }
+
+  long drainScanAggregationFromRawDataCost() {
+    return scanAggregationFromRawDataCost.getAndSet(0);
+  }
+
+  long drainScanAggregationFromStatisticsCost() {
+    return scanAggregationFromStatisticsCost.getAndSet(0);
+  }
+
+  long drainAggregationOperatorFromRawDataCost() {
+    return aggregationOperatorFromRawDataCost.getAndSet(0);
+  }
+
+  @TestOnly
+  void recordAggregationCostToMetric() {
+    recordAggregationCost(AGGREGATION_FROM_RAW_DATA, 
drainScanAggregationFromRawDataCost());
+    recordAggregationCost(AGGREGATION_FROM_STATISTICS, 
drainScanAggregationFromStatisticsCost());
+    recordAggregationCost(
+        AGGREGATION_OPERATOR_FROM_RAW_DATA, 
drainAggregationOperatorFromRawDataCost());
+  }
+
+  private void recordAggregationCost(String stage, long costTimeInNanos) {
+    if (costTimeInNanos > 0) {
+      QueryExecutionMetricSet.getInstance().recordExecutionCost(stage, 
costTimeInNanos);
+    }
+  }
+
   public long getReadyQueueTime() {
     return readyQueueTime.get();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
index d9761552e28..584cb4fe74c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
@@ -71,6 +71,33 @@ public class OperatorContext extends CommonOperatorContext {
     return getInstanceContext().getSessionInfo();
   }
 
+  @Override
+  public void recordScanAggregationFromRawDataCost(long costTimeInNanos) {
+    if (driverContext != null && driverContext.getFragmentInstanceContext() != 
null) {
+      driverContext
+          .getFragmentInstanceContext()
+          .recordScanAggregationFromRawDataCost(costTimeInNanos);
+    }
+  }
+
+  @Override
+  public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) {
+    if (driverContext != null && driverContext.getFragmentInstanceContext() != 
null) {
+      driverContext
+          .getFragmentInstanceContext()
+          .recordScanAggregationFromStatisticsCost(costTimeInNanos);
+    }
+  }
+
+  @Override
+  public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) {
+    if (driverContext != null && driverContext.getFragmentInstanceContext() != 
null) {
+      driverContext
+          .getFragmentInstanceContext()
+          .recordAggregationOperatorFromRawDataCost(costTimeInNanos);
+    }
+  }
+
   @Override
   public MemoryReservationManager getMemoryReservationContext() {
     return getInstanceContext().getMemoryReservationContext();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
index 51f6e1c8117..eec1c017430 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
@@ -154,19 +154,24 @@ public class AggregationOperator extends 
AbstractConsumeAllOperator {
 
   private void calculateNextAggregationResult() {
     // Consume current input tsBlocks
-    for (TreeAggregator aggregator : aggregators) {
-      aggregator.processTsBlocks(inputTsBlocks);
-    }
+    long startTime = System.nanoTime();
+    try {
+      for (TreeAggregator aggregator : aggregators) {
+        aggregator.processTsBlocks(inputTsBlocks);
+      }
 
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      inputTsBlocks[i] = inputTsBlocks[i].skipFirst();
-      if (inputTsBlocks[i].isEmpty()) {
-        inputTsBlocks[i] = null;
+      for (int i = 0; i < inputOperatorsCount; i++) {
+        inputTsBlocks[i] = inputTsBlocks[i].skipFirst();
+        if (inputTsBlocks[i].isEmpty()) {
+          inputTsBlocks[i] = null;
+        }
       }
-    }
 
-    // Update result using aggregators
-    updateResultTsBlock();
+      // Update result using aggregators
+      updateResultTsBlock();
+    } finally {
+      
operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - 
startTime);
+    }
   }
 
   private void updateResultTsBlock() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
index 3018adfd5e1..385cc48e67c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
@@ -150,78 +150,83 @@ public class RawDataAggregationOperator extends 
SingleInputAggregationOperator {
 
   @SuppressWarnings({"squid:S3776", "squid:S135"})
   private boolean calculateFromRawData() {
-    // if window is not initialized, we should init window status and reset 
aggregators
-    if (!windowManager.isCurWindowInit() && 
!skipPreviousWindowAndInitCurWindow()) {
-      return false;
-    }
-
-    // If current window has been initialized, we should judge whether 
inputTsBlock is empty
-    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
-      return false;
-    }
-
-    if (windowManager.satisfiedCurWindow(inputTsBlock)) {
-
-      // Get the indexes in tsBlock which needs to be processed by aggregator, 
and the last row
-      // needed to be processed.
-      int tsBlockSize = inputTsBlock.getPositionCount();
-      IWindow curWindow = windowManager.getCurWindow();
-
-      Column[] controlAndTimeColumn = new Column[2];
-      controlAndTimeColumn[0] = curWindow.getControlColumn(inputTsBlock);
-      controlAndTimeColumn[1] = inputTsBlock.getTimeColumn();
+    long startTime = System.nanoTime();
+    try {
+      // if window is not initialized, we should init window status and reset 
aggregators
+      if (!windowManager.isCurWindowInit() && 
!skipPreviousWindowAndInitCurWindow()) {
+        return false;
+      }
 
-      BitMap needProcess = new BitMap(tsBlockSize);
-      int lastIndexToProcess = -1;
-      boolean hasSkip = false;
+      // If current window has been initialized, we should judge whether 
inputTsBlock is empty
+      if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+        return false;
+      }
 
-      for (int i = 0; i < tsBlockSize; i++) {
-        if (windowManager.isIgnoringNull() && 
controlAndTimeColumn[0].isNull(i)) {
+      if (windowManager.satisfiedCurWindow(inputTsBlock)) {
+
+        // Get the indexes in tsBlock which needs to be processed by 
aggregator, and the last row
+        // needed to be processed.
+        int tsBlockSize = inputTsBlock.getPositionCount();
+        IWindow curWindow = windowManager.getCurWindow();
+
+        Column[] controlAndTimeColumn = new Column[2];
+        controlAndTimeColumn[0] = curWindow.getControlColumn(inputTsBlock);
+        controlAndTimeColumn[1] = inputTsBlock.getTimeColumn();
+
+        BitMap needProcess = new BitMap(tsBlockSize);
+        int lastIndexToProcess = -1;
+        boolean hasSkip = false;
+
+        for (int i = 0; i < tsBlockSize; i++) {
+          if (windowManager.isIgnoringNull() && 
controlAndTimeColumn[0].isNull(i)) {
+            lastIndexToProcess = i;
+            hasSkip = true;
+            continue;
+          }
+          if (!curWindow.satisfy(controlAndTimeColumn[0], i)) {
+            break;
+          }
+          needProcess.mark(i);
+          curWindow.mergeOnePoint(controlAndTimeColumn, i);
           lastIndexToProcess = i;
-          hasSkip = true;
-          continue;
         }
-        if (!curWindow.satisfy(controlAndTimeColumn[0], i)) {
-          break;
+
+        // if no row needs to skip, just send a null parameter.
+        if (!hasSkip) {
+          needProcess = null;
         }
-        needProcess.mark(i);
-        curWindow.mergeOnePoint(controlAndTimeColumn, i);
-        lastIndexToProcess = i;
-      }
 
-      // if no row needs to skip, just send a null parameter.
-      if (!hasSkip) {
-        needProcess = null;
-      }
+        TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 
1);
+        for (TreeAggregator aggregator : aggregators) {
+          // Current agg method has been calculated
+          if (aggregator.hasFinalResult()) {
+            continue;
+          }
 
-      TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1);
-      for (TreeAggregator aggregator : aggregators) {
-        // Current agg method has been calculated
-        if (aggregator.hasFinalResult()) {
-          continue;
+          aggregator.processTsBlock(inputRegion, needProcess);
+        }
+        int lastReadRowIndex = lastIndexToProcess + 1;
+        // If lastReadRowIndex is not zero, some of tsBlock is consumed and 
result is cached in
+        // aggregators.
+        if (lastReadRowIndex != 0) {
+          hasCachedDataInAggregator = true;
+        }
+        if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
+          inputTsBlock = null;
+          // For the last index of TsBlock, if we can know the aggregation 
calculation is over
+          // we can directly updateResultTsBlock and return true
+          return isAllAggregatorsHasFinalResult(aggregators);
+        } else {
+          inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex);
+          return true;
         }
-
-        aggregator.processTsBlock(inputRegion, needProcess);
-      }
-      int lastReadRowIndex = lastIndexToProcess + 1;
-      // If lastReadRowIndex is not zero, some of tsBlock is consumed and 
result is cached in
-      // aggregators.
-      if (lastReadRowIndex != 0) {
-        hasCachedDataInAggregator = true;
-      }
-      if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
-        inputTsBlock = null;
-        // For the last index of TsBlock, if we can know the aggregation 
calculation is over
-        // we can directly updateResultTsBlock and return true
-        return isAllAggregatorsHasFinalResult(aggregators);
-      } else {
-        inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex);
-        return true;
       }
-    }
 
-    boolean isTsBlockOutOfBound = 
windowManager.isTsBlockOutOfBound(inputTsBlock);
-    return isAllAggregatorsHasFinalResult(aggregators) || isTsBlockOutOfBound;
+      boolean isTsBlockOutOfBound = 
windowManager.isTsBlockOutOfBound(inputTsBlock);
+      return isAllAggregatorsHasFinalResult(aggregators) || 
isTsBlockOutOfBound;
+    } finally {
+      
operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - 
startTime);
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
index adab0fb1d30..afa5c4c86ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
@@ -123,12 +123,17 @@ public class TagAggregationOperator extends 
AbstractConsumeAllOperator {
   }
 
   private void aggregate(List<TreeAggregator> aggregators, TsBlock[] 
rowBlocks) {
-    for (TreeAggregator aggregator : aggregators) {
-      if (aggregator == null) {
-        continue;
+    long startTime = System.nanoTime();
+    try {
+      for (TreeAggregator aggregator : aggregators) {
+        if (aggregator == null) {
+          continue;
+        }
+        aggregator.reset();
+        aggregator.processTsBlocks(rowBlocks);
       }
-      aggregator.reset();
-      aggregator.processTsBlocks(rowBlocks);
+    } finally {
+      
operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - 
startTime);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index c63cd24d50e..3f9db7bc165 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -245,18 +245,28 @@ public abstract class 
AbstractSeriesAggregationScanOperator extends AbstractData
   }
 
   private boolean calcFromRawData(TsBlock tsBlock) {
-    Pair<Boolean, TsBlock> calcResult =
-        calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, 
ascending);
-    inputTsBlock = calcResult.getRight();
-    return calcResult.getLeft();
+    long startTime = System.nanoTime();
+    try {
+      Pair<Boolean, TsBlock> calcResult =
+          calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, 
ascending);
+      inputTsBlock = calcResult.getRight();
+      return calcResult.getLeft();
+    } finally {
+      operatorContext.recordScanAggregationFromRawDataCost(System.nanoTime() - 
startTime);
+    }
   }
 
   protected void calcFromStatistics(Statistics timeStatistics, Statistics[] 
valueStatistics) {
-    for (TreeAggregator aggregator : aggregators) {
-      if (aggregator.hasFinalResult()) {
-        continue;
+    long startTime = System.nanoTime();
+    try {
+      for (TreeAggregator aggregator : aggregators) {
+        if (aggregator.hasFinalResult()) {
+          continue;
+        }
+        aggregator.processStatistics(timeStatistics, valueStatistics);
       }
-      aggregator.processStatistics(timeStatistics, valueStatistics);
+    } finally {
+      
operatorContext.recordScanAggregationFromStatisticsCost(System.nanoTime() - 
startTime);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
index 031c10dc8a4..856961601f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
@@ -285,28 +285,33 @@ public abstract class AbstractAggTableScanOperator 
extends AbstractDataSourceOpe
       return new Pair<>(false, inputTsBlock);
     }
 
-    updateCurTimeRange(inputTsBlock.getStartTime());
-
-    TimeRange curTimeRange = timeIterator.getCurTimeRange();
-    // check if the tsBlock does not contain points in current interval
-    if (satisfiedTimeRange(inputTsBlock, curTimeRange, ascending)) {
-      // skip points that cannot be calculated
-      if ((ascending && inputTsBlock.getStartTime() < curTimeRange.getMin())
-          || (!ascending && inputTsBlock.getStartTime() > 
curTimeRange.getMax())) {
-        inputTsBlock = skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, 
ascending);
+    long startTime = System.nanoTime();
+    try {
+      updateCurTimeRange(inputTsBlock.getStartTime());
+
+      TimeRange curTimeRange = timeIterator.getCurTimeRange();
+      // check if the tsBlock does not contain points in current interval
+      if (satisfiedTimeRange(inputTsBlock, curTimeRange, ascending)) {
+        // skip points that cannot be calculated
+        if ((ascending && inputTsBlock.getStartTime() < curTimeRange.getMin())
+            || (!ascending && inputTsBlock.getStartTime() > 
curTimeRange.getMax())) {
+          inputTsBlock = skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, 
ascending);
+        }
+
+        inputTsBlock = process(inputTsBlock, curTimeRange);
       }
 
-      inputTsBlock = process(inputTsBlock, curTimeRange);
+      // judge whether the calculation finished
+      boolean isTsBlockOutOfBound =
+          inputTsBlock != null
+              && (ascending
+                  ? inputTsBlock.getEndTime() > curTimeRange.getMax()
+                  : inputTsBlock.getEndTime() < curTimeRange.getMin());
+      return new Pair<>(
+          isAllAggregatorsHasFinalResult(tableAggregators) || 
isTsBlockOutOfBound, inputTsBlock);
+    } finally {
+      operatorContext.recordScanAggregationFromRawDataCost(System.nanoTime() - 
startTime);
     }
-
-    // judge whether the calculation finished
-    boolean isTsBlockOutOfBound =
-        inputTsBlock != null
-            && (ascending
-                ? inputTsBlock.getEndTime() > curTimeRange.getMax()
-                : inputTsBlock.getEndTime() < curTimeRange.getMin());
-    return new Pair<>(
-        isAllAggregatorsHasFinalResult(tableAggregators) || 
isTsBlockOutOfBound, inputTsBlock);
   }
 
   private TsBlock process(TsBlock inputTsBlock, TimeRange curTimeRange) {
@@ -394,28 +399,32 @@ public abstract class AbstractAggTableScanOperator 
extends AbstractDataSourceOpe
 
   protected void calcFromStatistics(Statistics timeStatistics, Statistics[] 
valueStatistics) {
     int idx = -1;
+    long startTime = System.nanoTime();
+    try {
+      for (TableAggregator aggregator : tableAggregators) {
+        if (aggregator.hasFinalResult()) {
+          idx += aggregator.getChannelCount();
+          continue;
+        }
 
-    for (TableAggregator aggregator : tableAggregators) {
-      if (aggregator.hasFinalResult()) {
-        idx += aggregator.getChannelCount();
-        continue;
-      }
+        Statistics[] statisticsArray = new 
Statistics[aggregator.getChannelCount()];
+        for (int i = 0; i < aggregator.getChannelCount(); i++) {
+          idx++;
+
+          TsTableColumnCategory columnSchemaCategory =
+              
aggColumnSchemas.get(aggregatorInputChannels.get(idx)).getColumnCategory();
+          statisticsArray[i] =
+              buildStatistics(
+                  columnSchemaCategory,
+                  timeStatistics,
+                  valueStatistics,
+                  aggregatorInputChannels.get(idx));
+        }
 
-      Statistics[] statisticsArray = new 
Statistics[aggregator.getChannelCount()];
-      for (int i = 0; i < aggregator.getChannelCount(); i++) {
-        idx++;
-
-        TsTableColumnCategory columnSchemaCategory =
-            
aggColumnSchemas.get(aggregatorInputChannels.get(idx)).getColumnCategory();
-        statisticsArray[i] =
-            buildStatistics(
-                columnSchemaCategory,
-                timeStatistics,
-                valueStatistics,
-                aggregatorInputChannels.get(idx));
+        aggregator.processStatistics(statisticsArray);
       }
-
-      aggregator.processStatistics(statisticsArray);
+    } finally {
+      
operatorContext.recordScanAggregationFromStatisticsCost(System.nanoTime() - 
startTime);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java
new file mode 100644
index 00000000000..0e621334621
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java
@@ -0,0 +1,623 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.fragment;
+
+import org.apache.iotdb.calc.execution.aggregation.Accumulator;
+import org.apache.iotdb.calc.execution.operator.CommonOperatorContext;
+import org.apache.iotdb.calc.execution.operator.Operator;
+import 
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.TableAccumulator;
+import 
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.TableAggregator;
+import org.apache.iotdb.calc.metric.QueryExecutionMetricSet;
+import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.parameter.InputLocation;
+import 
org.apache.iotdb.commons.queryengine.plan.relational.planner.node.AggregationNode;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.execution.aggregation.TreeAggregator;
+import 
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.TagAggregationOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.metrics.DoNothingMetricService;
+import org.apache.iotdb.metrics.type.HistogramSnapshot;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.junit.Test;
+
+import javax.management.ObjectName;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Collections.singletonList;
+import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
+import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class FragmentInstanceContextTest {
+
+  @Test
+  public void testDrainAggregationCostsSeparatelyOnce() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 0), "0");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext context = 
createFragmentInstanceContext(instanceId, stateMachine);
+
+      context.recordScanAggregationFromRawDataCost(10);
+      context.recordScanAggregationFromRawDataCost(20);
+      context.recordScanAggregationFromStatisticsCost(30);
+      context.recordAggregationOperatorFromRawDataCost(40);
+
+      assertEquals(30, context.drainScanAggregationFromRawDataCost());
+      assertEquals(30, context.drainScanAggregationFromStatisticsCost());
+      assertEquals(40, context.drainAggregationOperatorFromRawDataCost());
+
+      assertEquals(0, context.drainScanAggregationFromRawDataCost());
+      assertEquals(0, context.drainScanAggregationFromStatisticsCost());
+      assertEquals(0, context.drainAggregationOperatorFromRawDataCost());
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void 
testOperatorContextForwardsAggregationCostsToFragmentInstanceContext() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      FragmentInstanceContext context = 
newFragmentInstanceContext(instanceNotificationExecutor);
+      DriverContext driverContext = new DriverContext(context, 0);
+      OperatorContext operatorContext =
+          driverContext.addOperatorContext(1, new PlanNodeId("forward"), 
"forward");
+
+      operatorContext.recordScanAggregationFromRawDataCost(11);
+      operatorContext.recordScanAggregationFromStatisticsCost(13);
+      operatorContext.recordAggregationOperatorFromRawDataCost(17);
+
+      assertEquals(11, context.drainScanAggregationFromRawDataCost());
+      assertEquals(13, context.drainScanAggregationFromStatisticsCost());
+      assertEquals(17, context.drainAggregationOperatorFromRawDataCost());
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void 
testOperatorContextWithoutFragmentInstanceAndCommonContextUseNoOpFallback() {
+    OperatorContext operatorContext =
+        new OperatorContext(1, new PlanNodeId("no-op"), "no-op", new 
DriverContext());
+    operatorContext.recordScanAggregationFromRawDataCost(11);
+    operatorContext.recordScanAggregationFromStatisticsCost(13);
+    operatorContext.recordAggregationOperatorFromRawDataCost(17);
+
+    TestCommonOperatorContext commonOperatorContext = new 
TestCommonOperatorContext();
+    commonOperatorContext.recordScanAggregationFromRawDataCost(19);
+    commonOperatorContext.recordScanAggregationFromStatisticsCost(23);
+    commonOperatorContext.recordAggregationOperatorFromRawDataCost(29);
+
+    assertEquals(0, commonOperatorContext.getTotalExecutionTimeInNanos());
+    assertNull(commonOperatorContext.getMemoryReservationContext());
+  }
+
+  @Test
+  public void testAggregationCostsFromOperatorsFlushToMetricStages() throws 
Exception {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    RecordingMetricService metricService = new RecordingMetricService();
+    QueryExecutionMetricSet metricSet = QueryExecutionMetricSet.getInstance();
+    metricSet.bindTo(metricService);
+    try {
+      FragmentInstanceContext context = 
newFragmentInstanceContext(instanceNotificationExecutor);
+      DriverContext driverContext = new DriverContext(context, 0);
+      OperatorContext scanOperatorContext =
+          driverContext.addOperatorContext(1, new PlanNodeId("scan"), "scan");
+      TestSeriesAggregationScanOperator scanOperator =
+          new TestSeriesAggregationScanOperator(scanOperatorContext);
+
+      scanOperator.calculateRaw(longTsBlock());
+      scanOperator.calculateStatistics(
+          Statistics.getStatsByType(TSDataType.INT64),
+          new Statistics[] {Statistics.getStatsByType(TSDataType.INT64)});
+
+      OperatorContext aggregationOperatorContext =
+          driverContext.addOperatorContext(2, new PlanNodeId("aggregation"), 
"aggregation");
+      
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.AggregationOperator
+          aggregationOperator = 
newAggregationOperator(aggregationOperatorContext);
+      aggregationOperator.next();
+
+      context.recordAggregationCostToMetric();
+
+      assertEquals(1, metricService.count("raw_data"));
+      assertTrue(metricService.sum("raw_data") > 0);
+      assertEquals(1, metricService.count("statistics"));
+      assertTrue(metricService.sum("statistics") > 0);
+      assertEquals(1, metricService.count("raw_data_operator"));
+      assertTrue(metricService.sum("raw_data_operator") > 0);
+
+      context.recordAggregationCostToMetric();
+
+      assertEquals(1, metricService.count("raw_data"));
+      assertEquals(1, metricService.count("statistics"));
+      assertEquals(1, metricService.count("raw_data_operator"));
+    } finally {
+      metricSet.unbindFrom(metricService);
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testAggregationOperatorRecordsTsBlockCostForIntermediateInput() 
throws Exception {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      FragmentInstanceContext context = 
newFragmentInstanceContext(instanceNotificationExecutor);
+      DriverContext driverContext = new DriverContext(context, 0);
+      OperatorContext operatorContext =
+          driverContext.addOperatorContext(1, new PlanNodeId("aggregation"), 
"aggregation");
+      
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.AggregationOperator
+          aggregationOperator = newAggregationOperator(operatorContext);
+
+      assertNull(aggregationOperator.next());
+
+      assertTrue(context.drainAggregationOperatorFromRawDataCost() > 0);
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testTreeAggregationOperatorRecordsTsBlockCost() throws Exception 
{
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      FragmentInstanceContext context = 
newFragmentInstanceContext(instanceNotificationExecutor);
+      DriverContext driverContext = new DriverContext(context, 0);
+      OperatorContext operatorContext =
+          driverContext.addOperatorContext(1, new 
PlanNodeId("tree-aggregation"), "aggregation");
+      
org.apache.iotdb.db.queryengine.execution.operator.process.AggregationOperator
+          aggregationOperator =
+              new 
org.apache.iotdb.db.queryengine.execution.operator.process.AggregationOperator(
+                  operatorContext,
+                  singletonList(newTreeFinalAggregator()),
+                  new SingleTimeRangeIterator(new TimeRange(0, 10)),
+                  singletonList(new SingleTsBlockOperator(operatorContext, 
longTsBlock())),
+                  false,
+                  1024);
+
+      aggregationOperator.isBlocked().get();
+      aggregationOperator.next();
+
+      assertTrue(context.drainAggregationOperatorFromRawDataCost() > 0);
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testTagAggregationOperatorRecordsTsBlockCost() throws Exception {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      FragmentInstanceContext context = 
newFragmentInstanceContext(instanceNotificationExecutor);
+      DriverContext driverContext = new DriverContext(context, 0);
+      OperatorContext operatorContext =
+          driverContext.addOperatorContext(1, new 
PlanNodeId("tag-aggregation"), "tag-aggregation");
+      TagAggregationOperator tagAggregationOperator =
+          new TagAggregationOperator(
+              operatorContext,
+              singletonList(singletonList("tag")),
+              singletonList(singletonList(newTreeFinalAggregator())),
+              singletonList(new SingleTsBlockOperator(operatorContext, 
longTsBlock())),
+              1024);
+
+      tagAggregationOperator.isBlocked().get();
+      tagAggregationOperator.next();
+
+      assertTrue(context.drainAggregationOperatorFromRawDataCost() > 0);
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  private static FragmentInstanceContext newFragmentInstanceContext(
+      ExecutorService instanceNotificationExecutor) {
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 0), "0");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    return createFragmentInstanceContext(instanceId, stateMachine);
+  }
+
+  private static TsBlock longTsBlock() {
+    TsBlockBuilder builder = new 
TsBlockBuilder(singletonList(TSDataType.INT64));
+    builder.getTimeColumnBuilder().writeLong(0);
+    builder.getColumnBuilder(0).writeLong(1);
+    builder.declarePosition();
+    return builder.build();
+  }
+
+  private static 
org.apache.iotdb.calc.execution.operator.source.relational.aggregation
+          .AggregationOperator
+      newAggregationOperator(OperatorContext operatorContext) {
+    TableAggregator aggregator =
+        new TableAggregator(
+            new TestTableAccumulator(),
+            AggregationNode.Step.INTERMEDIATE,
+            TSDataType.INT64,
+            singletonList(0),
+            OptionalInt.empty());
+    return new 
org.apache.iotdb.calc.execution.operator.source.relational.aggregation
+        .AggregationOperator(
+        operatorContext,
+        new SingleTsBlockOperator(operatorContext, longTsBlock()),
+        singletonList(aggregator));
+  }
+
+  private static TreeAggregator newTreeFinalAggregator() {
+    return new TreeAggregator(
+        new TestTreeAccumulator(),
+        AggregationStep.FINAL,
+        singletonList(new InputLocation[] {new InputLocation(0, 0)}));
+  }
+
+  private static class TestSeriesAggregationScanOperator
+      extends AbstractSeriesAggregationScanOperator {
+
+    private TestSeriesAggregationScanOperator(OperatorContext operatorContext) 
{
+      super(
+          new PlanNodeId("scan"),
+          operatorContext,
+          null,
+          1,
+          singletonList(new TreeAggregator(new TestTreeAccumulator(), 
AggregationStep.SINGLE)),
+          new SingleTimeRangeIterator(new TimeRange(0, 10)),
+          true,
+          false,
+          null,
+          1024,
+          0,
+          true);
+      this.curTimeRange = new TimeRange(0, 10);
+    }
+
+    private void calculateRaw(TsBlock tsBlock) {
+      this.inputTsBlock = tsBlock;
+      calcFromCachedData();
+    }
+
+    private void calculateStatistics(Statistics timeStatistics, Statistics[] 
valueStatistics) {
+      calcFromStatistics(timeStatistics, valueStatistics);
+    }
+
+    @Override
+    public long ramBytesUsed() {
+      return 0;
+    }
+  }
+
+  private static class SingleTimeRangeIterator implements ITimeRangeIterator {
+    private final TimeRange timeRange;
+    private boolean consumed;
+
+    private SingleTimeRangeIterator(TimeRange timeRange) {
+      this.timeRange = timeRange;
+    }
+
+    @Override
+    public TimeRange getFirstTimeRange() {
+      return timeRange;
+    }
+
+    @Override
+    public boolean hasNextTimeRange() {
+      return !consumed;
+    }
+
+    @Override
+    public TimeRange nextTimeRange() {
+      consumed = true;
+      return timeRange;
+    }
+
+    @Override
+    public boolean isAscending() {
+      return true;
+    }
+
+    @Override
+    public long currentOutputTime() {
+      return timeRange.getMin();
+    }
+
+    @Override
+    public long getTotalIntervalNum() {
+      return 1;
+    }
+  }
+
+  private static class RecordingMetricService extends DoNothingMetricService {
+    private final Map<String, RecordingTimer> timers = new HashMap<>();
+
+    @Override
+    public Timer getOrCreateTimer(String metric, MetricLevel metricLevel, 
String... tags) {
+      if (!Metric.AGGREGATION.toString().equals(metric)
+          || tags.length != 2
+          || !Tag.FROM.toString().equals(tags[0])) {
+        return super.getOrCreateTimer(metric, metricLevel, tags);
+      }
+      return timers.computeIfAbsent(tags[1], key -> new RecordingTimer());
+    }
+
+    @Override
+    public void remove(MetricType type, String metric, String... tags) {
+      if (Metric.AGGREGATION.toString().equals(metric)
+          && tags.length == 2
+          && Tag.FROM.toString().equals(tags[0])) {
+        timers.remove(tags[1]);
+      }
+    }
+
+    long count(String from) {
+      RecordingTimer timer = timers.get(from);
+      return timer == null ? 0 : timer.getCount();
+    }
+
+    long sum(String from) {
+      RecordingTimer timer = timers.get(from);
+      return timer == null ? 0 : timer.sum.get();
+    }
+  }
+
+  private static class RecordingTimer implements Timer {
+    private final AtomicLong count = new AtomicLong();
+    private final AtomicLong sum = new AtomicLong();
+
+    @Override
+    public void update(long duration, TimeUnit unit) {
+      count.incrementAndGet();
+      sum.addAndGet(unit.toNanos(duration));
+    }
+
+    @Override
+    public HistogramSnapshot takeSnapshot() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getCount() {
+      return count.get();
+    }
+
+    @Override
+    public void setObjectName(ObjectName objectName) {
+      // no-op
+    }
+  }
+
+  private static class SingleTsBlockOperator implements Operator {
+    private final CommonOperatorContext operatorContext;
+    private final TsBlock tsBlock;
+    private boolean consumed;
+
+    private SingleTsBlockOperator(CommonOperatorContext operatorContext, 
TsBlock tsBlock) {
+      this.operatorContext = operatorContext;
+      this.tsBlock = tsBlock;
+    }
+
+    @Override
+    public CommonOperatorContext getOperatorContext() {
+      return operatorContext;
+    }
+
+    @Override
+    public TsBlock next() {
+      consumed = true;
+      return tsBlock;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return !consumed;
+    }
+
+    @Override
+    public void close() {
+      // no-op
+    }
+
+    @Override
+    public boolean isFinished() {
+      return consumed;
+    }
+
+    @Override
+    public long calculateMaxPeekMemory() {
+      return 0;
+    }
+
+    @Override
+    public long calculateMaxReturnSize() {
+      return 0;
+    }
+
+    @Override
+    public long calculateRetainedSizeAfterCallingNext() {
+      return 0;
+    }
+
+    @Override
+    public long ramBytesUsed() {
+      return 0;
+    }
+  }
+
+  private static class TestTableAccumulator implements TableAccumulator {
+    @Override
+    public long getEstimatedSize() {
+      return 0;
+    }
+
+    @Override
+    public TableAccumulator copy() {
+      return new TestTableAccumulator();
+    }
+
+    @Override
+    public void addInput(
+        Column[] arguments,
+        
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.AggregationMask
+            mask) {
+      // no-op
+    }
+
+    @Override
+    public void addIntermediate(Column argument) {
+      // no-op
+    }
+
+    @Override
+    public void evaluateIntermediate(ColumnBuilder columnBuilder) {
+      columnBuilder.writeLong(0);
+    }
+
+    @Override
+    public void evaluateFinal(ColumnBuilder columnBuilder) {
+      columnBuilder.writeLong(0);
+    }
+
+    @Override
+    public boolean hasFinalResult() {
+      return false;
+    }
+
+    @Override
+    public void addStatistics(Statistics[] statistics) {
+      // no-op
+    }
+
+    @Override
+    public void reset() {
+      // no-op
+    }
+  }
+
+  private static class TestTreeAccumulator implements Accumulator {
+    @Override
+    public void addInput(Column[] columns, org.apache.tsfile.utils.BitMap 
bitMap) {
+      // no-op
+    }
+
+    @Override
+    public void addIntermediate(Column[] partialResult) {
+      // no-op
+    }
+
+    @Override
+    public void addStatistics(Statistics statistics) {
+      // no-op
+    }
+
+    @Override
+    public void setFinal(Column finalResult) {
+      // no-op
+    }
+
+    @Override
+    public void outputIntermediate(ColumnBuilder[] tsBlockBuilder) {
+      tsBlockBuilder[0].writeLong(0);
+    }
+
+    @Override
+    public void outputFinal(ColumnBuilder tsBlockBuilder) {
+      tsBlockBuilder.writeLong(0);
+    }
+
+    @Override
+    public void reset() {
+      // no-op
+    }
+
+    @Override
+    public boolean hasFinalResult() {
+      return false;
+    }
+
+    @Override
+    public TSDataType[] getIntermediateType() {
+      return new TSDataType[] {TSDataType.INT64};
+    }
+
+    @Override
+    public TSDataType getFinalType() {
+      return TSDataType.INT64;
+    }
+  }
+
+  private static class TestCommonOperatorContext extends CommonOperatorContext 
{
+    private TestCommonOperatorContext() {
+      super(0, new PlanNodeId("common"), "common");
+    }
+
+    @Override
+    public MemoryReservationManager getMemoryReservationContext() {
+      return null;
+    }
+
+    @Override
+    public int getFragmentId() {
+      return 0;
+    }
+
+    @Override
+    public int getPipelineId() {
+      return 0;
+    }
+
+    @Override
+    public long ramBytesUsed() {
+      return 0;
+    }
+  }
+}

Reply via email to