This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 818b6fc5581 [To dev/1.3] Optimize aggregation metric recording
818b6fc5581 is described below
commit 818b6fc5581ca47d4c3858956e63e24a3c8bb641
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Jun 25 09:10:36 2026 +0800
[To dev/1.3] Optimize aggregation metric recording
---
.../execution/aggregation/Aggregator.java | 92 ++++------
.../fragment/FragmentInstanceContext.java | 52 ++++++
.../execution/operator/OperatorContext.java | 24 +++
.../operator/process/AggregationOperator.java | 25 +--
.../process/RawDataAggregationOperator.java | 129 ++++++-------
.../operator/process/TagAggregationOperator.java | 15 +-
.../AbstractSeriesAggregationScanOperator.java | 26 ++-
.../metric/QueryExecutionMetricSet.java | 15 +-
.../fragment/FragmentInstanceContextTest.java | 200 +++++++++++++++++++++
9 files changed, 434 insertions(+), 144 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java
index 6992d80b53f..5661f54a85a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.queryengine.execution.aggregation;
-import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
@@ -34,8 +33,6 @@ import java.util.Collections;
import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
-import static
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
-import static
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS;
public class Aggregator {
@@ -43,8 +40,6 @@ public class Aggregator {
// In some intermediate result input, inputLocation[] should include two
columns
protected List<InputLocation[]> inputLocationList;
protected final AggregationStep step;
- protected static final QueryExecutionMetricSet QUERY_EXECUTION_METRICS =
- QueryExecutionMetricSet.getInstance();
// Used for SeriesAggregateScanOperator
public Aggregator(Accumulator accumulator, AggregationStep step) {
@@ -64,57 +59,44 @@ public class Aggregator {
// Used for SeriesAggregateScanOperator and RawDataAggregateOperator
public void processTsBlock(TsBlock tsBlock, BitMap bitMap) {
- long startTime = System.nanoTime();
- try {
- checkArgument(
- step.isInputRaw(),
- "Step in SeriesAggregateScanOperator and RawDataAggregateOperator
can only process raw input");
- for (InputLocation[] inputLocations : inputLocationList) {
- Column[] timeAndValueColumn = new Column[1 + inputLocations.length];
- timeAndValueColumn[0] = tsBlock.getTimeColumn();
- for (int i = 0; i < inputLocations.length; i++) {
- checkArgument(
- inputLocations[i].getTsBlockIndex() == 0,
- "RawDataAggregateOperator can only process one tsBlock input.");
- int index = inputLocations[i].getValueColumnIndex();
- // for count_time, time column is also its value column
- // for max_by, the input column can also be time column.
- timeAndValueColumn[1 + i] =
- index == -1 ? timeAndValueColumn[0] : tsBlock.getColumn(index);
- }
- accumulator.addInput(timeAndValueColumn, bitMap);
+ checkArgument(
+ step.isInputRaw(),
+ "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can
only process raw input");
+ for (InputLocation[] inputLocations : inputLocationList) {
+ Column[] timeAndValueColumn = new Column[1 + inputLocations.length];
+ timeAndValueColumn[0] = tsBlock.getTimeColumn();
+ for (int i = 0; i < inputLocations.length; i++) {
+ checkArgument(
+ inputLocations[i].getTsBlockIndex() == 0,
+ "RawDataAggregateOperator can only process one tsBlock input.");
+ int index = inputLocations[i].getValueColumnIndex();
+ // for count_time, time column is also its value column
+ // for max_by, the input column can also be time column.
+ timeAndValueColumn[1 + i] = index == -1 ? timeAndValueColumn[0] :
tsBlock.getColumn(index);
}
- } finally {
- QUERY_EXECUTION_METRICS.recordExecutionCost(
- AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
+ accumulator.addInput(timeAndValueColumn, bitMap);
}
}
// Used for AggregateOperator
public void processTsBlocks(TsBlock[] tsBlock) {
- long startTime = System.nanoTime();
- try {
- checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot
process raw input");
- if (step.isInputFinal()) {
- checkArgument(inputLocationList.size() == 1, "Final output can only be
single column");
- Column finalResult =
- tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn(
- inputLocationList.get(0)[0].getValueColumnIndex());
- accumulator.setFinal(finalResult);
- } else {
- for (InputLocation[] inputLocations : inputLocationList) {
- Column[] columns = new Column[inputLocations.length];
- for (int i = 0; i < inputLocations.length; i++) {
- columns[i] =
- tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
- inputLocations[i].getValueColumnIndex());
- }
- accumulator.addIntermediate(columns);
+ checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot
process raw input");
+ if (step.isInputFinal()) {
+ checkArgument(inputLocationList.size() == 1, "Final output can only be
single column");
+ Column finalResult =
+ tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn(
+ inputLocationList.get(0)[0].getValueColumnIndex());
+ accumulator.setFinal(finalResult);
+ } else {
+ for (InputLocation[] inputLocations : inputLocationList) {
+ Column[] columns = new Column[inputLocations.length];
+ for (int i = 0; i < inputLocations.length; i++) {
+ columns[i] =
+ tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
+ inputLocations[i].getValueColumnIndex());
}
+ accumulator.addIntermediate(columns);
}
- } finally {
- QUERY_EXECUTION_METRICS.recordExecutionCost(
- AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
}
}
@@ -128,16 +110,10 @@ public class Aggregator {
/** Used for SeriesAggregateScanOperator. */
public void processStatistics(Statistics timeStatistics, Statistics[]
valueStatistics) {
- long startTime = System.nanoTime();
- try {
- for (InputLocation[] inputLocations : inputLocationList) {
- int valueIndex = inputLocations[0].getValueColumnIndex();
- // valueIndex == -1 means it is count_time, we need to use
timeStatistics
- accumulator.addStatistics(valueIndex == -1 ? timeStatistics :
valueStatistics[valueIndex]);
- }
- } finally {
- QUERY_EXECUTION_METRICS.recordExecutionCost(
- AGGREGATION_FROM_STATISTICS, System.nanoTime() - startTime);
+ for (InputLocation[] inputLocations : inputLocationList) {
+ int valueIndex = inputLocations[0].getValueColumnIndex();
+ // valueIndex == -1 means it is count_time, we need to use timeStatistics
+ accumulator.addStatistics(valueIndex == -1 ? timeStatistics :
valueStatistics[valueIndex]);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index a224e5b2c35..d579b7e15c6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
import org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet;
+import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
@@ -77,6 +78,9 @@ import java.util.stream.Collectors;
import static
org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME;
import static
org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME;
+import static
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
+import static
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS;
+import static
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.AGGREGATION_OPERATOR_FROM_RAW_DATA;
import static
org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.EMPTY_QUERY_DATA_SOURCE;
import static
org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.UNFINISHED_QUERY_DATA_SOURCE;
@@ -149,6 +153,9 @@ public class FragmentInstanceContext extends QueryContext {
private int initQueryDataSourceRetryCount = 0;
private final AtomicLong readyQueueTime = new AtomicLong(0);
private final AtomicLong blockQueueTime = new AtomicLong(0);
+ private final AtomicLong scanAggregationFromRawDataCost = new AtomicLong(0);
+ private final AtomicLong scanAggregationFromStatisticsCost = new
AtomicLong(0);
+ private final AtomicLong aggregationOperatorFromRawDataCost = new
AtomicLong(0);
private long unclosedSeqFileNum = 0;
private long unclosedUnseqFileNum = 0;
private long closedSeqFileNum = 0;
@@ -970,6 +977,8 @@ public class FragmentInstanceContext extends QueryContext {
QueryRelatedResourceMetricSet.getInstance().updateFragmentInstanceTime(durationTime);
+ recordAggregationCostToMetric();
+
QueryResourceMetricSet.getInstance()
.recordInitQueryResourceRetryCount(getInitQueryDataSourceRetryCount());
@@ -1115,6 +1124,49 @@ public class FragmentInstanceContext extends
QueryContext {
blockQueueTime.addAndGet(time);
}
+ public void recordScanAggregationFromRawDataCost(long costTimeInNanos) {
+ addCost(scanAggregationFromRawDataCost, costTimeInNanos);
+ }
+
+ public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) {
+ addCost(scanAggregationFromStatisticsCost, costTimeInNanos);
+ }
+
+ public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) {
+ addCost(aggregationOperatorFromRawDataCost, costTimeInNanos);
+ }
+
+ private void addCost(AtomicLong cost, long costTimeInNanos) {
+ if (costTimeInNanos > 0) {
+ cost.addAndGet(costTimeInNanos);
+ }
+ }
+
+ long drainScanAggregationFromRawDataCost() {
+ return scanAggregationFromRawDataCost.getAndSet(0);
+ }
+
+ long drainScanAggregationFromStatisticsCost() {
+ return scanAggregationFromStatisticsCost.getAndSet(0);
+ }
+
+ long drainAggregationOperatorFromRawDataCost() {
+ return aggregationOperatorFromRawDataCost.getAndSet(0);
+ }
+
+ void recordAggregationCostToMetric() {
+ recordAggregationCost(AGGREGATION_FROM_RAW_DATA,
drainScanAggregationFromRawDataCost());
+ recordAggregationCost(AGGREGATION_FROM_STATISTICS,
drainScanAggregationFromStatisticsCost());
+ recordAggregationCost(
+ AGGREGATION_OPERATOR_FROM_RAW_DATA,
drainAggregationOperatorFromRawDataCost());
+ }
+
+ private void recordAggregationCost(String stage, long costTimeInNanos) {
+ if (costTimeInNanos > 0) {
+ QueryExecutionMetricSet.getInstance().recordExecutionCost(stage,
costTimeInNanos);
+ }
+ }
+
public long getReadyQueueTime() {
return readyQueueTime.get();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
index 49be9ad1cc0..90bded8a0ee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
@@ -119,6 +119,30 @@ public class OperatorContext implements Accountable {
return getInstanceContext().getSessionInfo();
}
+ public void recordScanAggregationFromRawDataCost(long costTimeInNanos) {
+ if (driverContext != null && driverContext.getFragmentInstanceContext() !=
null) {
+ driverContext
+ .getFragmentInstanceContext()
+ .recordScanAggregationFromRawDataCost(costTimeInNanos);
+ }
+ }
+
+ public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) {
+ if (driverContext != null && driverContext.getFragmentInstanceContext() !=
null) {
+ driverContext
+ .getFragmentInstanceContext()
+ .recordScanAggregationFromStatisticsCost(costTimeInNanos);
+ }
+ }
+
+ public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) {
+ if (driverContext != null && driverContext.getFragmentInstanceContext() !=
null) {
+ driverContext
+ .getFragmentInstanceContext()
+ .recordAggregationOperatorFromRawDataCost(costTimeInNanos);
+ }
+ }
+
public PlanNodeId getPlanNodeId() {
return planNodeId;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
index ff1de243b27..ae36cc39c6b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
@@ -153,19 +153,24 @@ public class AggregationOperator extends
AbstractConsumeAllOperator {
private void calculateNextAggregationResult() {
// Consume current input tsBlocks
- for (Aggregator aggregator : aggregators) {
- aggregator.processTsBlocks(inputTsBlocks);
- }
+ long startTime = System.nanoTime();
+ try {
+ for (Aggregator aggregator : aggregators) {
+ aggregator.processTsBlocks(inputTsBlocks);
+ }
- for (int i = 0; i < inputOperatorsCount; i++) {
- inputTsBlocks[i] = inputTsBlocks[i].skipFirst();
- if (inputTsBlocks[i].isEmpty()) {
- inputTsBlocks[i] = null;
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ inputTsBlocks[i] = inputTsBlocks[i].skipFirst();
+ if (inputTsBlocks[i].isEmpty()) {
+ inputTsBlocks[i] = null;
+ }
}
- }
- // Update result using aggregators
- updateResultTsBlock();
+ // Update result using aggregators
+ updateResultTsBlock();
+ } finally {
+
operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() -
startTime);
+ }
}
private void updateResultTsBlock() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
index 69f67503cc4..3ca36ff7d67 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
@@ -150,78 +150,83 @@ public class RawDataAggregationOperator extends
SingleInputAggregationOperator {
@SuppressWarnings({"squid:S3776", "squid:S135"})
private boolean calculateFromRawData() {
- // if window is not initialized, we should init window status and reset
aggregators
- if (!windowManager.isCurWindowInit() &&
!skipPreviousWindowAndInitCurWindow()) {
- return false;
- }
-
- // If current window has been initialized, we should judge whether
inputTsBlock is empty
- if (inputTsBlock == null || inputTsBlock.isEmpty()) {
- return false;
- }
-
- if (windowManager.satisfiedCurWindow(inputTsBlock)) {
-
- // Get the indexes in tsBlock which needs to be processed by aggregator,
and the last row
- // needed to be processed.
- int tsBlockSize = inputTsBlock.getPositionCount();
- IWindow curWindow = windowManager.getCurWindow();
-
- Column[] controlAndTimeColumn = new Column[2];
- controlAndTimeColumn[0] = curWindow.getControlColumn(inputTsBlock);
- controlAndTimeColumn[1] = inputTsBlock.getTimeColumn();
+ long startTime = System.nanoTime();
+ try {
+ // if window is not initialized, we should init window status and reset
aggregators
+ if (!windowManager.isCurWindowInit() &&
!skipPreviousWindowAndInitCurWindow()) {
+ return false;
+ }
- BitMap needProcess = new BitMap(tsBlockSize);
- int lastIndexToProcess = -1;
- boolean hasSkip = false;
+ // If current window has been initialized, we should judge whether
inputTsBlock is empty
+ if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+ return false;
+ }
- for (int i = 0; i < tsBlockSize; i++) {
- if (windowManager.isIgnoringNull() &&
controlAndTimeColumn[0].isNull(i)) {
+ if (windowManager.satisfiedCurWindow(inputTsBlock)) {
+
+ // Get the indexes in tsBlock which needs to be processed by
aggregator, and the last row
+ // needed to be processed.
+ int tsBlockSize = inputTsBlock.getPositionCount();
+ IWindow curWindow = windowManager.getCurWindow();
+
+ Column[] controlAndTimeColumn = new Column[2];
+ controlAndTimeColumn[0] = curWindow.getControlColumn(inputTsBlock);
+ controlAndTimeColumn[1] = inputTsBlock.getTimeColumn();
+
+ BitMap needProcess = new BitMap(tsBlockSize);
+ int lastIndexToProcess = -1;
+ boolean hasSkip = false;
+
+ for (int i = 0; i < tsBlockSize; i++) {
+ if (windowManager.isIgnoringNull() &&
controlAndTimeColumn[0].isNull(i)) {
+ lastIndexToProcess = i;
+ hasSkip = true;
+ continue;
+ }
+ if (!curWindow.satisfy(controlAndTimeColumn[0], i)) {
+ break;
+ }
+ needProcess.mark(i);
+ curWindow.mergeOnePoint(controlAndTimeColumn, i);
lastIndexToProcess = i;
- hasSkip = true;
- continue;
}
- if (!curWindow.satisfy(controlAndTimeColumn[0], i)) {
- break;
+
+ // if no row needs to skip, just send a null parameter.
+ if (!hasSkip) {
+ needProcess = null;
}
- needProcess.mark(i);
- curWindow.mergeOnePoint(controlAndTimeColumn, i);
- lastIndexToProcess = i;
- }
- // if no row needs to skip, just send a null parameter.
- if (!hasSkip) {
- needProcess = null;
- }
+ TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess +
1);
+ for (Aggregator aggregator : aggregators) {
+ // Current agg method has been calculated
+ if (aggregator.hasFinalResult()) {
+ continue;
+ }
- TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1);
- for (Aggregator aggregator : aggregators) {
- // Current agg method has been calculated
- if (aggregator.hasFinalResult()) {
- continue;
+ aggregator.processTsBlock(inputRegion, needProcess);
+ }
+ int lastReadRowIndex = lastIndexToProcess + 1;
+ // If lastReadRowIndex is not zero, some of tsBlock is consumed and
result is cached in
+ // aggregators.
+ if (lastReadRowIndex != 0) {
+ hasCachedDataInAggregator = true;
+ }
+ if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
+ inputTsBlock = null;
+ // For the last index of TsBlock, if we can know the aggregation
calculation is over
+ // we can directly updateResultTsBlock and return true
+ return isAllAggregatorsHasFinalResult(aggregators);
+ } else {
+ inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex);
+ return true;
}
-
- aggregator.processTsBlock(inputRegion, needProcess);
- }
- int lastReadRowIndex = lastIndexToProcess + 1;
- // If lastReadRowIndex is not zero, some of tsBlock is consumed and
result is cached in
- // aggregators.
- if (lastReadRowIndex != 0) {
- hasCachedDataInAggregator = true;
- }
- if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
- inputTsBlock = null;
- // For the last index of TsBlock, if we can know the aggregation
calculation is over
- // we can directly updateResultTsBlock and return true
- return isAllAggregatorsHasFinalResult(aggregators);
- } else {
- inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex);
- return true;
}
- }
- boolean isTsBlockOutOfBound =
windowManager.isTsBlockOutOfBound(inputTsBlock);
- return isAllAggregatorsHasFinalResult(aggregators) || isTsBlockOutOfBound;
+ boolean isTsBlockOutOfBound =
windowManager.isTsBlockOutOfBound(inputTsBlock);
+ return isAllAggregatorsHasFinalResult(aggregators) ||
isTsBlockOutOfBound;
+ } finally {
+
operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() -
startTime);
+ }
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
index d7399fb4554..243b9ee0744 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
@@ -122,12 +122,17 @@ public class TagAggregationOperator extends
AbstractConsumeAllOperator {
}
private void aggregate(List<Aggregator> aggregators, TsBlock[] rowBlocks) {
- for (Aggregator aggregator : aggregators) {
- if (aggregator == null) {
- continue;
+ long startTime = System.nanoTime();
+ try {
+ for (Aggregator aggregator : aggregators) {
+ if (aggregator == null) {
+ continue;
+ }
+ aggregator.reset();
+ aggregator.processTsBlocks(rowBlocks);
}
- aggregator.reset();
- aggregator.processTsBlocks(rowBlocks);
+ } finally {
+
operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() -
startTime);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index fe24513ae43..0d69b933ee7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -244,18 +244,28 @@ public abstract class
AbstractSeriesAggregationScanOperator extends AbstractData
}
private boolean calcFromRawData(TsBlock tsBlock) {
- Pair<Boolean, TsBlock> calcResult =
- calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange,
ascending);
- inputTsBlock = calcResult.getRight();
- return calcResult.getLeft();
+ long startTime = System.nanoTime();
+ try {
+ Pair<Boolean, TsBlock> calcResult =
+ calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange,
ascending);
+ inputTsBlock = calcResult.getRight();
+ return calcResult.getLeft();
+ } finally {
+ operatorContext.recordScanAggregationFromRawDataCost(System.nanoTime() -
startTime);
+ }
}
protected void calcFromStatistics(Statistics timeStatistics, Statistics[]
valueStatistics) {
- for (Aggregator aggregator : aggregators) {
- if (aggregator.hasFinalResult()) {
- continue;
+ long startTime = System.nanoTime();
+ try {
+ for (Aggregator aggregator : aggregators) {
+ if (aggregator.hasFinalResult()) {
+ continue;
+ }
+ aggregator.processStatistics(timeStatistics, valueStatistics);
}
- aggregator.processStatistics(timeStatistics, valueStatistics);
+ } finally {
+
operatorContext.recordScanAggregationFromStatisticsCost(System.nanoTime() -
startTime);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryExecutionMetricSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryExecutionMetricSet.java
index 27a238d3d8c..8766dc60624 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryExecutionMetricSet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryExecutionMetricSet.java
@@ -143,8 +143,11 @@ public class QueryExecutionMetricSet implements IMetricSet
{
// region query aggregation
public static final String AGGREGATION_FROM_RAW_DATA =
"aggregation_from_raw_data";
public static final String AGGREGATION_FROM_STATISTICS =
"aggregation_from_statistics";
+ public static final String AGGREGATION_OPERATOR_FROM_RAW_DATA =
+ "aggregation_operator_from_raw_data";
private Timer aggregationFromRawDataTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer aggregationFromStatisticsTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer aggregationOperatorFromRawDataTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
private void bindQueryAggregation(AbstractMetricService metricService) {
aggregationFromRawDataTimer =
@@ -156,12 +159,19 @@ public class QueryExecutionMetricSet implements
IMetricSet {
MetricLevel.IMPORTANT,
Tag.FROM.toString(),
"statistics");
+ aggregationOperatorFromRawDataTimer =
+ metricService.getOrCreateTimer(
+ Metric.AGGREGATION.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.FROM.toString(),
+ "raw_data_operator");
}
private void unbindQueryAggregation(AbstractMetricService metricService) {
aggregationFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
aggregationFromStatisticsTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
- Arrays.asList("raw_data", "statistics")
+ aggregationOperatorFromRawDataTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ Arrays.asList("raw_data", "statistics", "raw_data_operator")
.forEach(
from ->
metricService.remove(
@@ -213,6 +223,9 @@ public class QueryExecutionMetricSet implements IMetricSet {
case AGGREGATION_FROM_STATISTICS:
aggregationFromStatisticsTimer.update(costTimeInNanos,
TimeUnit.NANOSECONDS);
break;
+ case AGGREGATION_OPERATOR_FROM_RAW_DATA:
+ aggregationOperatorFromRawDataTimer.update(costTimeInNanos,
TimeUnit.NANOSECONDS);
+ break;
default:
break;
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java
new file mode 100644
index 00000000000..6f085e6235b
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.fragment;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.metrics.DoNothingMetricService;
+import org.apache.iotdb.metrics.type.HistogramSnapshot;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import org.junit.Test;
+
+import javax.management.ObjectName;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+
+public class FragmentInstanceContextTest {
+
+ @Test
+ public void testDrainAggregationCostsSeparatelyOnce() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ FragmentInstanceContext context =
newFragmentInstanceContext(instanceNotificationExecutor);
+
+ context.recordScanAggregationFromRawDataCost(10);
+ context.recordScanAggregationFromRawDataCost(20);
+ context.recordScanAggregationFromStatisticsCost(30);
+ context.recordAggregationOperatorFromRawDataCost(40);
+
+ assertEquals(30, context.drainScanAggregationFromRawDataCost());
+ assertEquals(30, context.drainScanAggregationFromStatisticsCost());
+ assertEquals(40, context.drainAggregationOperatorFromRawDataCost());
+
+ assertEquals(0, context.drainScanAggregationFromRawDataCost());
+ assertEquals(0, context.drainScanAggregationFromStatisticsCost());
+ assertEquals(0, context.drainAggregationOperatorFromRawDataCost());
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void
testOperatorContextForwardsAggregationCostsToFragmentInstanceContext() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ FragmentInstanceContext context =
newFragmentInstanceContext(instanceNotificationExecutor);
+ DriverContext driverContext = new DriverContext(context, 0);
+ OperatorContext operatorContext =
+ driverContext.addOperatorContext(1, new PlanNodeId("aggregation"),
"aggregation");
+
+ operatorContext.recordScanAggregationFromRawDataCost(11);
+ operatorContext.recordScanAggregationFromStatisticsCost(13);
+ operatorContext.recordAggregationOperatorFromRawDataCost(17);
+
+ assertEquals(11, context.drainScanAggregationFromRawDataCost());
+ assertEquals(13, context.drainScanAggregationFromStatisticsCost());
+ assertEquals(17, context.drainAggregationOperatorFromRawDataCost());
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void testAggregationCostsFlushToMetricStagesOnce() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ RecordingMetricService metricService = new RecordingMetricService();
+ QueryExecutionMetricSet metricSet = QueryExecutionMetricSet.getInstance();
+ metricSet.bindTo(metricService);
+ try {
+ FragmentInstanceContext context =
newFragmentInstanceContext(instanceNotificationExecutor);
+
+ context.recordScanAggregationFromRawDataCost(10);
+ context.recordScanAggregationFromStatisticsCost(20);
+ context.recordAggregationOperatorFromRawDataCost(30);
+
+ context.recordAggregationCostToMetric();
+
+ assertEquals(1, metricService.count("raw_data"));
+ assertEquals(10, metricService.sum("raw_data"));
+ assertEquals(1, metricService.count("statistics"));
+ assertEquals(20, metricService.sum("statistics"));
+ assertEquals(1, metricService.count("raw_data_operator"));
+ assertEquals(30, metricService.sum("raw_data_operator"));
+
+ context.recordAggregationCostToMetric();
+
+ assertEquals(1, metricService.count("raw_data"));
+ assertEquals(1, metricService.count("statistics"));
+ assertEquals(1, metricService.count("raw_data_operator"));
+ } finally {
+ metricSet.unbindFrom(metricService);
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ private static FragmentInstanceContext newFragmentInstanceContext(
+ ExecutorService instanceNotificationExecutor) {
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(new QueryId("test_query"),
0), "0");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ return createFragmentInstanceContext(instanceId, stateMachine);
+ }
+
+ private static class RecordingMetricService extends DoNothingMetricService {
+ private final Map<String, RecordingTimer> timers = new HashMap<>();
+
+ @Override
+ public Timer getOrCreateTimer(String metric, MetricLevel metricLevel,
String... tags) {
+ if (!Metric.AGGREGATION.toString().equals(metric)
+ || tags.length != 2
+ || !Tag.FROM.toString().equals(tags[0])) {
+ return super.getOrCreateTimer(metric, metricLevel, tags);
+ }
+ return timers.computeIfAbsent(tags[1], key -> new RecordingTimer());
+ }
+
+ @Override
+ public void remove(MetricType type, String metric, String... tags) {
+ if (Metric.AGGREGATION.toString().equals(metric)
+ && tags.length == 2
+ && Tag.FROM.toString().equals(tags[0])) {
+ timers.remove(tags[1]);
+ }
+ }
+
+ long count(String from) {
+ RecordingTimer timer = timers.get(from);
+ return timer == null ? 0 : timer.getCount();
+ }
+
+ long sum(String from) {
+ RecordingTimer timer = timers.get(from);
+ return timer == null ? 0 : timer.sum.get();
+ }
+ }
+
+ private static class RecordingTimer implements Timer {
+ private final AtomicLong count = new AtomicLong();
+ private final AtomicLong sum = new AtomicLong();
+
+ @Override
+ public void update(long duration, TimeUnit unit) {
+ count.incrementAndGet();
+ sum.addAndGet(unit.toNanos(duration));
+ }
+
+ @Override
+ public HistogramSnapshot takeSnapshot() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getCount() {
+ return count.get();
+ }
+
+ @Override
+ public void setObjectName(ObjectName objectName) {
+ // no-op
+ }
+ }
+}