This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 818b6fc5581 [To dev/1.3] Optimize aggregation metric recording
818b6fc5581 is described below

commit 818b6fc5581ca47d4c3858956e63e24a3c8bb641
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Jun 25 09:10:36 2026 +0800

    [To dev/1.3] Optimize aggregation metric recording
---
 .../execution/aggregation/Aggregator.java          |  92 ++++------
 .../fragment/FragmentInstanceContext.java          |  52 ++++++
 .../execution/operator/OperatorContext.java        |  24 +++
 .../operator/process/AggregationOperator.java      |  25 +--
 .../process/RawDataAggregationOperator.java        | 129 ++++++-------
 .../operator/process/TagAggregationOperator.java   |  15 +-
 .../AbstractSeriesAggregationScanOperator.java     |  26 ++-
 .../metric/QueryExecutionMetricSet.java            |  15 +-
 .../fragment/FragmentInstanceContextTest.java      | 200 +++++++++++++++++++++
 9 files changed, 434 insertions(+), 144 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java
index 6992d80b53f..5661f54a85a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.queryengine.execution.aggregation;
 
-import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
 
@@ -34,8 +33,6 @@ import java.util.Collections;
 import java.util.List;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
-import static 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS;
 
 public class Aggregator {
 
@@ -43,8 +40,6 @@ public class Aggregator {
   // In some intermediate result input, inputLocation[] should include two 
columns
   protected List<InputLocation[]> inputLocationList;
   protected final AggregationStep step;
-  protected static final QueryExecutionMetricSet QUERY_EXECUTION_METRICS =
-      QueryExecutionMetricSet.getInstance();
 
   // Used for SeriesAggregateScanOperator
   public Aggregator(Accumulator accumulator, AggregationStep step) {
@@ -64,57 +59,44 @@ public class Aggregator {
 
   // Used for SeriesAggregateScanOperator and RawDataAggregateOperator
   public void processTsBlock(TsBlock tsBlock, BitMap bitMap) {
-    long startTime = System.nanoTime();
-    try {
-      checkArgument(
-          step.isInputRaw(),
-          "Step in SeriesAggregateScanOperator and RawDataAggregateOperator 
can only process raw input");
-      for (InputLocation[] inputLocations : inputLocationList) {
-        Column[] timeAndValueColumn = new Column[1 + inputLocations.length];
-        timeAndValueColumn[0] = tsBlock.getTimeColumn();
-        for (int i = 0; i < inputLocations.length; i++) {
-          checkArgument(
-              inputLocations[i].getTsBlockIndex() == 0,
-              "RawDataAggregateOperator can only process one tsBlock input.");
-          int index = inputLocations[i].getValueColumnIndex();
-          // for count_time, time column is also its value column
-          // for max_by, the input column can also be time column.
-          timeAndValueColumn[1 + i] =
-              index == -1 ? timeAndValueColumn[0] : tsBlock.getColumn(index);
-        }
-        accumulator.addInput(timeAndValueColumn, bitMap);
+    checkArgument(
+        step.isInputRaw(),
+        "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can 
only process raw input");
+    for (InputLocation[] inputLocations : inputLocationList) {
+      Column[] timeAndValueColumn = new Column[1 + inputLocations.length];
+      timeAndValueColumn[0] = tsBlock.getTimeColumn();
+      for (int i = 0; i < inputLocations.length; i++) {
+        checkArgument(
+            inputLocations[i].getTsBlockIndex() == 0,
+            "RawDataAggregateOperator can only process one tsBlock input.");
+        int index = inputLocations[i].getValueColumnIndex();
+        // for count_time, time column is also its value column
+        // for max_by, the input column can also be time column.
+        timeAndValueColumn[1 + i] = index == -1 ? timeAndValueColumn[0] : 
tsBlock.getColumn(index);
       }
-    } finally {
-      QUERY_EXECUTION_METRICS.recordExecutionCost(
-          AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
+      accumulator.addInput(timeAndValueColumn, bitMap);
     }
   }
 
   // Used for AggregateOperator
   public void processTsBlocks(TsBlock[] tsBlock) {
-    long startTime = System.nanoTime();
-    try {
-      checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot 
process raw input");
-      if (step.isInputFinal()) {
-        checkArgument(inputLocationList.size() == 1, "Final output can only be 
single column");
-        Column finalResult =
-            tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn(
-                inputLocationList.get(0)[0].getValueColumnIndex());
-        accumulator.setFinal(finalResult);
-      } else {
-        for (InputLocation[] inputLocations : inputLocationList) {
-          Column[] columns = new Column[inputLocations.length];
-          for (int i = 0; i < inputLocations.length; i++) {
-            columns[i] =
-                tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
-                    inputLocations[i].getValueColumnIndex());
-          }
-          accumulator.addIntermediate(columns);
+    checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot 
process raw input");
+    if (step.isInputFinal()) {
+      checkArgument(inputLocationList.size() == 1, "Final output can only be 
single column");
+      Column finalResult =
+          tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn(
+              inputLocationList.get(0)[0].getValueColumnIndex());
+      accumulator.setFinal(finalResult);
+    } else {
+      for (InputLocation[] inputLocations : inputLocationList) {
+        Column[] columns = new Column[inputLocations.length];
+        for (int i = 0; i < inputLocations.length; i++) {
+          columns[i] =
+              tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
+                  inputLocations[i].getValueColumnIndex());
         }
+        accumulator.addIntermediate(columns);
       }
-    } finally {
-      QUERY_EXECUTION_METRICS.recordExecutionCost(
-          AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
     }
   }
 
@@ -128,16 +110,10 @@ public class Aggregator {
 
   /** Used for SeriesAggregateScanOperator. */
   public void processStatistics(Statistics timeStatistics, Statistics[] 
valueStatistics) {
-    long startTime = System.nanoTime();
-    try {
-      for (InputLocation[] inputLocations : inputLocationList) {
-        int valueIndex = inputLocations[0].getValueColumnIndex();
-        // valueIndex == -1 means it is count_time, we need to use 
timeStatistics
-        accumulator.addStatistics(valueIndex == -1 ? timeStatistics : 
valueStatistics[valueIndex]);
-      }
-    } finally {
-      QUERY_EXECUTION_METRICS.recordExecutionCost(
-          AGGREGATION_FROM_STATISTICS, System.nanoTime() - startTime);
+    for (InputLocation[] inputLocations : inputLocationList) {
+      int valueIndex = inputLocations[0].getValueColumnIndex();
+      // valueIndex == -1 means it is count_time, we need to use timeStatistics
+      accumulator.addStatistics(valueIndex == -1 ? timeStatistics : 
valueStatistics[valueIndex]);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index a224e5b2c35..d579b7e15c6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
 import org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet;
+import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
 import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
@@ -77,6 +78,9 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME;
 import static 
org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME;
+import static 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
+import static 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS;
+import static 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.AGGREGATION_OPERATOR_FROM_RAW_DATA;
 import static 
org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.EMPTY_QUERY_DATA_SOURCE;
 import static 
org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.UNFINISHED_QUERY_DATA_SOURCE;
 
@@ -149,6 +153,9 @@ public class FragmentInstanceContext extends QueryContext {
   private int initQueryDataSourceRetryCount = 0;
   private final AtomicLong readyQueueTime = new AtomicLong(0);
   private final AtomicLong blockQueueTime = new AtomicLong(0);
+  private final AtomicLong scanAggregationFromRawDataCost = new AtomicLong(0);
+  private final AtomicLong scanAggregationFromStatisticsCost = new 
AtomicLong(0);
+  private final AtomicLong aggregationOperatorFromRawDataCost = new 
AtomicLong(0);
   private long unclosedSeqFileNum = 0;
   private long unclosedUnseqFileNum = 0;
   private long closedSeqFileNum = 0;
@@ -970,6 +977,8 @@ public class FragmentInstanceContext extends QueryContext {
 
     
QueryRelatedResourceMetricSet.getInstance().updateFragmentInstanceTime(durationTime);
 
+    recordAggregationCostToMetric();
+
     QueryResourceMetricSet.getInstance()
         .recordInitQueryResourceRetryCount(getInitQueryDataSourceRetryCount());
 
@@ -1115,6 +1124,49 @@ public class FragmentInstanceContext extends 
QueryContext {
     blockQueueTime.addAndGet(time);
   }
 
+  public void recordScanAggregationFromRawDataCost(long costTimeInNanos) {
+    addCost(scanAggregationFromRawDataCost, costTimeInNanos);
+  }
+
+  public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) {
+    addCost(scanAggregationFromStatisticsCost, costTimeInNanos);
+  }
+
+  public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) {
+    addCost(aggregationOperatorFromRawDataCost, costTimeInNanos);
+  }
+
+  private void addCost(AtomicLong cost, long costTimeInNanos) {
+    if (costTimeInNanos > 0) {
+      cost.addAndGet(costTimeInNanos);
+    }
+  }
+
+  long drainScanAggregationFromRawDataCost() {
+    return scanAggregationFromRawDataCost.getAndSet(0);
+  }
+
+  long drainScanAggregationFromStatisticsCost() {
+    return scanAggregationFromStatisticsCost.getAndSet(0);
+  }
+
+  long drainAggregationOperatorFromRawDataCost() {
+    return aggregationOperatorFromRawDataCost.getAndSet(0);
+  }
+
+  void recordAggregationCostToMetric() {
+    recordAggregationCost(AGGREGATION_FROM_RAW_DATA, 
drainScanAggregationFromRawDataCost());
+    recordAggregationCost(AGGREGATION_FROM_STATISTICS, 
drainScanAggregationFromStatisticsCost());
+    recordAggregationCost(
+        AGGREGATION_OPERATOR_FROM_RAW_DATA, 
drainAggregationOperatorFromRawDataCost());
+  }
+
+  private void recordAggregationCost(String stage, long costTimeInNanos) {
+    if (costTimeInNanos > 0) {
+      QueryExecutionMetricSet.getInstance().recordExecutionCost(stage, 
costTimeInNanos);
+    }
+  }
+
   public long getReadyQueueTime() {
     return readyQueueTime.get();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
index 49be9ad1cc0..90bded8a0ee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
@@ -119,6 +119,30 @@ public class OperatorContext implements Accountable {
     return getInstanceContext().getSessionInfo();
   }
 
+  public void recordScanAggregationFromRawDataCost(long costTimeInNanos) {
+    if (driverContext != null && driverContext.getFragmentInstanceContext() != 
null) {
+      driverContext
+          .getFragmentInstanceContext()
+          .recordScanAggregationFromRawDataCost(costTimeInNanos);
+    }
+  }
+
+  public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) {
+    if (driverContext != null && driverContext.getFragmentInstanceContext() != 
null) {
+      driverContext
+          .getFragmentInstanceContext()
+          .recordScanAggregationFromStatisticsCost(costTimeInNanos);
+    }
+  }
+
+  public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) {
+    if (driverContext != null && driverContext.getFragmentInstanceContext() != 
null) {
+      driverContext
+          .getFragmentInstanceContext()
+          .recordAggregationOperatorFromRawDataCost(costTimeInNanos);
+    }
+  }
+
   public PlanNodeId getPlanNodeId() {
     return planNodeId;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
index ff1de243b27..ae36cc39c6b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
@@ -153,19 +153,24 @@ public class AggregationOperator extends 
AbstractConsumeAllOperator {
 
   private void calculateNextAggregationResult() {
     // Consume current input tsBlocks
-    for (Aggregator aggregator : aggregators) {
-      aggregator.processTsBlocks(inputTsBlocks);
-    }
+    long startTime = System.nanoTime();
+    try {
+      for (Aggregator aggregator : aggregators) {
+        aggregator.processTsBlocks(inputTsBlocks);
+      }
 
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      inputTsBlocks[i] = inputTsBlocks[i].skipFirst();
-      if (inputTsBlocks[i].isEmpty()) {
-        inputTsBlocks[i] = null;
+      for (int i = 0; i < inputOperatorsCount; i++) {
+        inputTsBlocks[i] = inputTsBlocks[i].skipFirst();
+        if (inputTsBlocks[i].isEmpty()) {
+          inputTsBlocks[i] = null;
+        }
       }
-    }
 
-    // Update result using aggregators
-    updateResultTsBlock();
+      // Update result using aggregators
+      updateResultTsBlock();
+    } finally {
+      
operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - 
startTime);
+    }
   }
 
   private void updateResultTsBlock() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
index 69f67503cc4..3ca36ff7d67 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
@@ -150,78 +150,83 @@ public class RawDataAggregationOperator extends 
SingleInputAggregationOperator {
 
   @SuppressWarnings({"squid:S3776", "squid:S135"})
   private boolean calculateFromRawData() {
-    // if window is not initialized, we should init window status and reset 
aggregators
-    if (!windowManager.isCurWindowInit() && 
!skipPreviousWindowAndInitCurWindow()) {
-      return false;
-    }
-
-    // If current window has been initialized, we should judge whether 
inputTsBlock is empty
-    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
-      return false;
-    }
-
-    if (windowManager.satisfiedCurWindow(inputTsBlock)) {
-
-      // Get the indexes in tsBlock which needs to be processed by aggregator, 
and the last row
-      // needed to be processed.
-      int tsBlockSize = inputTsBlock.getPositionCount();
-      IWindow curWindow = windowManager.getCurWindow();
-
-      Column[] controlAndTimeColumn = new Column[2];
-      controlAndTimeColumn[0] = curWindow.getControlColumn(inputTsBlock);
-      controlAndTimeColumn[1] = inputTsBlock.getTimeColumn();
+    long startTime = System.nanoTime();
+    try {
+      // if window is not initialized, we should init window status and reset 
aggregators
+      if (!windowManager.isCurWindowInit() && 
!skipPreviousWindowAndInitCurWindow()) {
+        return false;
+      }
 
-      BitMap needProcess = new BitMap(tsBlockSize);
-      int lastIndexToProcess = -1;
-      boolean hasSkip = false;
+      // If current window has been initialized, we should judge whether 
inputTsBlock is empty
+      if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+        return false;
+      }
 
-      for (int i = 0; i < tsBlockSize; i++) {
-        if (windowManager.isIgnoringNull() && 
controlAndTimeColumn[0].isNull(i)) {
+      if (windowManager.satisfiedCurWindow(inputTsBlock)) {
+
+        // Get the indexes in tsBlock which needs to be processed by 
aggregator, and the last row
+        // needed to be processed.
+        int tsBlockSize = inputTsBlock.getPositionCount();
+        IWindow curWindow = windowManager.getCurWindow();
+
+        Column[] controlAndTimeColumn = new Column[2];
+        controlAndTimeColumn[0] = curWindow.getControlColumn(inputTsBlock);
+        controlAndTimeColumn[1] = inputTsBlock.getTimeColumn();
+
+        BitMap needProcess = new BitMap(tsBlockSize);
+        int lastIndexToProcess = -1;
+        boolean hasSkip = false;
+
+        for (int i = 0; i < tsBlockSize; i++) {
+          if (windowManager.isIgnoringNull() && 
controlAndTimeColumn[0].isNull(i)) {
+            lastIndexToProcess = i;
+            hasSkip = true;
+            continue;
+          }
+          if (!curWindow.satisfy(controlAndTimeColumn[0], i)) {
+            break;
+          }
+          needProcess.mark(i);
+          curWindow.mergeOnePoint(controlAndTimeColumn, i);
           lastIndexToProcess = i;
-          hasSkip = true;
-          continue;
         }
-        if (!curWindow.satisfy(controlAndTimeColumn[0], i)) {
-          break;
+
+        // if no row needs to skip, just send a null parameter.
+        if (!hasSkip) {
+          needProcess = null;
         }
-        needProcess.mark(i);
-        curWindow.mergeOnePoint(controlAndTimeColumn, i);
-        lastIndexToProcess = i;
-      }
 
-      // if no row needs to skip, just send a null parameter.
-      if (!hasSkip) {
-        needProcess = null;
-      }
+        TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 
1);
+        for (Aggregator aggregator : aggregators) {
+          // Current agg method has been calculated
+          if (aggregator.hasFinalResult()) {
+            continue;
+          }
 
-      TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1);
-      for (Aggregator aggregator : aggregators) {
-        // Current agg method has been calculated
-        if (aggregator.hasFinalResult()) {
-          continue;
+          aggregator.processTsBlock(inputRegion, needProcess);
+        }
+        int lastReadRowIndex = lastIndexToProcess + 1;
+        // If lastReadRowIndex is not zero, some of tsBlock is consumed and 
result is cached in
+        // aggregators.
+        if (lastReadRowIndex != 0) {
+          hasCachedDataInAggregator = true;
+        }
+        if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
+          inputTsBlock = null;
+          // For the last index of TsBlock, if we can know the aggregation 
calculation is over
+          // we can directly updateResultTsBlock and return true
+          return isAllAggregatorsHasFinalResult(aggregators);
+        } else {
+          inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex);
+          return true;
         }
-
-        aggregator.processTsBlock(inputRegion, needProcess);
-      }
-      int lastReadRowIndex = lastIndexToProcess + 1;
-      // If lastReadRowIndex is not zero, some of tsBlock is consumed and 
result is cached in
-      // aggregators.
-      if (lastReadRowIndex != 0) {
-        hasCachedDataInAggregator = true;
-      }
-      if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
-        inputTsBlock = null;
-        // For the last index of TsBlock, if we can know the aggregation 
calculation is over
-        // we can directly updateResultTsBlock and return true
-        return isAllAggregatorsHasFinalResult(aggregators);
-      } else {
-        inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex);
-        return true;
       }
-    }
 
-    boolean isTsBlockOutOfBound = 
windowManager.isTsBlockOutOfBound(inputTsBlock);
-    return isAllAggregatorsHasFinalResult(aggregators) || isTsBlockOutOfBound;
+      boolean isTsBlockOutOfBound = 
windowManager.isTsBlockOutOfBound(inputTsBlock);
+      return isAllAggregatorsHasFinalResult(aggregators) || 
isTsBlockOutOfBound;
+    } finally {
+      
operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - 
startTime);
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
index d7399fb4554..243b9ee0744 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
@@ -122,12 +122,17 @@ public class TagAggregationOperator extends 
AbstractConsumeAllOperator {
   }
 
   private void aggregate(List<Aggregator> aggregators, TsBlock[] rowBlocks) {
-    for (Aggregator aggregator : aggregators) {
-      if (aggregator == null) {
-        continue;
+    long startTime = System.nanoTime();
+    try {
+      for (Aggregator aggregator : aggregators) {
+        if (aggregator == null) {
+          continue;
+        }
+        aggregator.reset();
+        aggregator.processTsBlocks(rowBlocks);
       }
-      aggregator.reset();
-      aggregator.processTsBlocks(rowBlocks);
+    } finally {
+      
operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - 
startTime);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index fe24513ae43..0d69b933ee7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -244,18 +244,28 @@ public abstract class 
AbstractSeriesAggregationScanOperator extends AbstractData
   }
 
   private boolean calcFromRawData(TsBlock tsBlock) {
-    Pair<Boolean, TsBlock> calcResult =
-        calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, 
ascending);
-    inputTsBlock = calcResult.getRight();
-    return calcResult.getLeft();
+    long startTime = System.nanoTime();
+    try {
+      Pair<Boolean, TsBlock> calcResult =
+          calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, 
ascending);
+      inputTsBlock = calcResult.getRight();
+      return calcResult.getLeft();
+    } finally {
+      operatorContext.recordScanAggregationFromRawDataCost(System.nanoTime() - 
startTime);
+    }
   }
 
   protected void calcFromStatistics(Statistics timeStatistics, Statistics[] 
valueStatistics) {
-    for (Aggregator aggregator : aggregators) {
-      if (aggregator.hasFinalResult()) {
-        continue;
+    long startTime = System.nanoTime();
+    try {
+      for (Aggregator aggregator : aggregators) {
+        if (aggregator.hasFinalResult()) {
+          continue;
+        }
+        aggregator.processStatistics(timeStatistics, valueStatistics);
       }
-      aggregator.processStatistics(timeStatistics, valueStatistics);
+    } finally {
+      
operatorContext.recordScanAggregationFromStatisticsCost(System.nanoTime() - 
startTime);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryExecutionMetricSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryExecutionMetricSet.java
index 27a238d3d8c..8766dc60624 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryExecutionMetricSet.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryExecutionMetricSet.java
@@ -143,8 +143,11 @@ public class QueryExecutionMetricSet implements IMetricSet 
{
   // region query aggregation
   public static final String AGGREGATION_FROM_RAW_DATA = 
"aggregation_from_raw_data";
   public static final String AGGREGATION_FROM_STATISTICS = 
"aggregation_from_statistics";
+  public static final String AGGREGATION_OPERATOR_FROM_RAW_DATA =
+      "aggregation_operator_from_raw_data";
   private Timer aggregationFromRawDataTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
   private Timer aggregationFromStatisticsTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer aggregationOperatorFromRawDataTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
 
   private void bindQueryAggregation(AbstractMetricService metricService) {
     aggregationFromRawDataTimer =
@@ -156,12 +159,19 @@ public class QueryExecutionMetricSet implements 
IMetricSet {
             MetricLevel.IMPORTANT,
             Tag.FROM.toString(),
             "statistics");
+    aggregationOperatorFromRawDataTimer =
+        metricService.getOrCreateTimer(
+            Metric.AGGREGATION.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.FROM.toString(),
+            "raw_data_operator");
   }
 
   private void unbindQueryAggregation(AbstractMetricService metricService) {
     aggregationFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
     aggregationFromStatisticsTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
-    Arrays.asList("raw_data", "statistics")
+    aggregationOperatorFromRawDataTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+    Arrays.asList("raw_data", "statistics", "raw_data_operator")
         .forEach(
             from ->
                 metricService.remove(
@@ -213,6 +223,9 @@ public class QueryExecutionMetricSet implements IMetricSet {
       case AGGREGATION_FROM_STATISTICS:
         aggregationFromStatisticsTimer.update(costTimeInNanos, 
TimeUnit.NANOSECONDS);
         break;
+      case AGGREGATION_OPERATOR_FROM_RAW_DATA:
+        aggregationOperatorFromRawDataTimer.update(costTimeInNanos, 
TimeUnit.NANOSECONDS);
+        break;
       default:
         break;
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java
new file mode 100644
index 00000000000..6f085e6235b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.fragment;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.metrics.DoNothingMetricService;
+import org.apache.iotdb.metrics.type.HistogramSnapshot;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import org.junit.Test;
+
+import javax.management.ObjectName;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+
+public class FragmentInstanceContextTest {
+
+  @Test
+  public void testDrainAggregationCostsSeparatelyOnce() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      FragmentInstanceContext context = 
newFragmentInstanceContext(instanceNotificationExecutor);
+
+      context.recordScanAggregationFromRawDataCost(10);
+      context.recordScanAggregationFromRawDataCost(20);
+      context.recordScanAggregationFromStatisticsCost(30);
+      context.recordAggregationOperatorFromRawDataCost(40);
+
+      assertEquals(30, context.drainScanAggregationFromRawDataCost());
+      assertEquals(30, context.drainScanAggregationFromStatisticsCost());
+      assertEquals(40, context.drainAggregationOperatorFromRawDataCost());
+
+      assertEquals(0, context.drainScanAggregationFromRawDataCost());
+      assertEquals(0, context.drainScanAggregationFromStatisticsCost());
+      assertEquals(0, context.drainAggregationOperatorFromRawDataCost());
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void 
testOperatorContextForwardsAggregationCostsToFragmentInstanceContext() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      FragmentInstanceContext context = 
newFragmentInstanceContext(instanceNotificationExecutor);
+      DriverContext driverContext = new DriverContext(context, 0);
+      OperatorContext operatorContext =
+          driverContext.addOperatorContext(1, new PlanNodeId("aggregation"), 
"aggregation");
+
+      operatorContext.recordScanAggregationFromRawDataCost(11);
+      operatorContext.recordScanAggregationFromStatisticsCost(13);
+      operatorContext.recordAggregationOperatorFromRawDataCost(17);
+
+      assertEquals(11, context.drainScanAggregationFromRawDataCost());
+      assertEquals(13, context.drainScanAggregationFromStatisticsCost());
+      assertEquals(17, context.drainAggregationOperatorFromRawDataCost());
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testAggregationCostsFlushToMetricStagesOnce() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    RecordingMetricService metricService = new RecordingMetricService();
+    QueryExecutionMetricSet metricSet = QueryExecutionMetricSet.getInstance();
+    metricSet.bindTo(metricService);
+    try {
+      FragmentInstanceContext context = 
newFragmentInstanceContext(instanceNotificationExecutor);
+
+      context.recordScanAggregationFromRawDataCost(10);
+      context.recordScanAggregationFromStatisticsCost(20);
+      context.recordAggregationOperatorFromRawDataCost(30);
+
+      context.recordAggregationCostToMetric();
+
+      assertEquals(1, metricService.count("raw_data"));
+      assertEquals(10, metricService.sum("raw_data"));
+      assertEquals(1, metricService.count("statistics"));
+      assertEquals(20, metricService.sum("statistics"));
+      assertEquals(1, metricService.count("raw_data_operator"));
+      assertEquals(30, metricService.sum("raw_data_operator"));
+
+      context.recordAggregationCostToMetric();
+
+      assertEquals(1, metricService.count("raw_data"));
+      assertEquals(1, metricService.count("statistics"));
+      assertEquals(1, metricService.count("raw_data_operator"));
+    } finally {
+      metricSet.unbindFrom(metricService);
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  private static FragmentInstanceContext newFragmentInstanceContext(
+      ExecutorService instanceNotificationExecutor) {
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(new QueryId("test_query"), 
0), "0");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    return createFragmentInstanceContext(instanceId, stateMachine);
+  }
+
+  private static class RecordingMetricService extends DoNothingMetricService {
+    private final Map<String, RecordingTimer> timers = new HashMap<>();
+
+    @Override
+    public Timer getOrCreateTimer(String metric, MetricLevel metricLevel, 
String... tags) {
+      if (!Metric.AGGREGATION.toString().equals(metric)
+          || tags.length != 2
+          || !Tag.FROM.toString().equals(tags[0])) {
+        return super.getOrCreateTimer(metric, metricLevel, tags);
+      }
+      return timers.computeIfAbsent(tags[1], key -> new RecordingTimer());
+    }
+
+    @Override
+    public void remove(MetricType type, String metric, String... tags) {
+      if (Metric.AGGREGATION.toString().equals(metric)
+          && tags.length == 2
+          && Tag.FROM.toString().equals(tags[0])) {
+        timers.remove(tags[1]);
+      }
+    }
+
+    long count(String from) {
+      RecordingTimer timer = timers.get(from);
+      return timer == null ? 0 : timer.getCount();
+    }
+
+    long sum(String from) {
+      RecordingTimer timer = timers.get(from);
+      return timer == null ? 0 : timer.sum.get();
+    }
+  }
+
+  private static class RecordingTimer implements Timer {
+    private final AtomicLong count = new AtomicLong();
+    private final AtomicLong sum = new AtomicLong();
+
+    @Override
+    public void update(long duration, TimeUnit unit) {
+      count.incrementAndGet();
+      sum.addAndGet(unit.toNanos(duration));
+    }
+
+    @Override
+    public HistogramSnapshot takeSnapshot() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getCount() {
+      return count.get();
+    }
+
+    @Override
+    public void setObjectName(ObjectName objectName) {
+      // no-op
+    }
+  }
+}


Reply via email to