This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 15dba18b1c7 Fix aggregation metric recording (#18018)
15dba18b1c7 is described below
commit 15dba18b1c702fa8f5fbd8118438043a3c57ca63
Author: Jackie Tien <[email protected]>
AuthorDate: Wed Jun 24 15:51:42 2026 +0800
Fix aggregation metric recording (#18018)
---
.../execution/operator/CommonOperatorContext.java | 15 +
.../aggregation/AggregationOperator.java | 15 +-
.../relational/aggregation/TableAggregator.java | 49 +-
.../grouped/StreamingAggregationOperator.java | 9 +-
.../builder/InMemoryHashAggregationBuilder.java | 9 +-
.../iotdb/calc/metric/QueryExecutionMetricSet.java | 15 +-
.../execution/aggregation/TreeAggregator.java | 92 ++-
.../fragment/FragmentInstanceContext.java | 52 ++
.../execution/operator/OperatorContext.java | 27 +
.../operator/process/AggregationOperator.java | 25 +-
.../process/RawDataAggregationOperator.java | 129 +++--
.../operator/process/TagAggregationOperator.java | 15 +-
.../AbstractSeriesAggregationScanOperator.java | 26 +-
.../relational/AbstractAggTableScanOperator.java | 85 +--
.../fragment/FragmentInstanceContextTest.java | 623 +++++++++++++++++++++
15 files changed, 967 insertions(+), 219 deletions(-)
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java
index 2c99fa5d777..0ffe44716ce 100644
---
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java
@@ -103,6 +103,21 @@ public abstract class CommonOperatorContext implements
Accountable {
this.totalExecutionTimeInNanos += executionTimeInNanos;
}
+ public void recordScanAggregationFromRawDataCost(long costTimeInNanos) {
+ // calc-commons operators can run in tests or standalone contexts that are
not backed by a
+ // DataNode FragmentInstanceContext. DataNode OperatorContext overrides
this to forward costs.
+ }
+
+ public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) {
+ // calc-commons operators can run in tests or standalone contexts that are
not backed by a
+ // DataNode FragmentInstanceContext. DataNode OperatorContext overrides
this to forward costs.
+ }
+
+ public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) {
+ // calc-commons operators can run in tests or standalone contexts that are
not backed by a
+ // DataNode FragmentInstanceContext. DataNode OperatorContext overrides
this to forward costs.
+ }
+
public void recordNextCalled() {
this.nextCalledCount++;
}
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java
index e65ee82517d..047894fa8bc 100644
---
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java
@@ -92,9 +92,7 @@ public class AggregationOperator implements ProcessOperator {
return null;
}
- for (TableAggregator aggregator : aggregators) {
- aggregator.processBlock(block);
- }
+ processBlock(block);
return null;
} else {
@@ -127,6 +125,17 @@ public class AggregationOperator implements
ProcessOperator {
return operatorContext;
}
+ private void processBlock(TsBlock block) {
+ long startTime = System.nanoTime();
+ try {
+ for (TableAggregator aggregator : aggregators) {
+ aggregator.processBlock(block);
+ }
+ } finally {
+
operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() -
startTime);
+ }
+ }
+
@Override
public long calculateMaxPeekMemory() {
return Math.max(
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java
index 1df3f6fb159..c7a7b6b7895 100644
---
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.calc.execution.operator.source.relational.aggregation;
-import org.apache.iotdb.calc.metric.QueryExecutionMetricSet;
import org.apache.iotdb.calc.plan.planner.CommonOperatorUtils;
import
org.apache.iotdb.commons.queryengine.plan.relational.planner.node.AggregationNode;
@@ -36,13 +35,9 @@ import java.util.OptionalInt;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
-import static
org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
public class TableAggregator {
- public static final QueryExecutionMetricSet QUERY_EXECUTION_METRICS =
- QueryExecutionMetricSet.getInstance();
-
private final TableAccumulator accumulator;
private final AggregationNode.Step step;
private final TSDataType outputType;
@@ -70,34 +65,28 @@ public class TableAggregator {
}
public void processBlock(TsBlock block) {
- long startTime = System.nanoTime();
- try {
- Column[] arguments = block.getColumns(inputChannels);
-
- // process count(*)
- if (arguments.length == 0) {
- arguments =
- new Column[] {
- new RunLengthEncodedColumn(
- CommonOperatorUtils.TIME_COLUMN_TEMPLATE,
block.getPositionCount())
- };
- }
-
- if (step.isInputRaw()) {
- // Use select-all AggregationMask here because filter of Agg-Function
is not supported now
- AggregationMask mask =
AggregationMask.createSelectAll(block.getPositionCount());
+ Column[] arguments = block.getColumns(inputChannels);
+
+ // process count(*)
+ if (arguments.length == 0) {
+ arguments =
+ new Column[] {
+ new RunLengthEncodedColumn(
+ CommonOperatorUtils.TIME_COLUMN_TEMPLATE,
block.getPositionCount())
+ };
+ }
- if (maskChannel.isPresent()) {
- mask.applyMaskBlock(block.getColumn(maskChannel.getAsInt()));
- }
+ if (step.isInputRaw()) {
+ // Use select-all AggregationMask here because filter of Agg-Function is
not supported now
+ AggregationMask mask =
AggregationMask.createSelectAll(block.getPositionCount());
- accumulator.addInput(arguments, mask);
- } else {
- accumulator.addIntermediate(arguments[0]);
+ if (maskChannel.isPresent()) {
+ mask.applyMaskBlock(block.getColumn(maskChannel.getAsInt()));
}
- } finally {
- QUERY_EXECUTION_METRICS.recordExecutionCost(
- AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
+
+ accumulator.addInput(arguments, mask);
+ } else {
+ accumulator.addIntermediate(arguments[0]);
}
}
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
index 7609ac32926..9808a1b4eed 100644
---
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
@@ -176,8 +176,13 @@ public class StreamingAggregationOperator extends
AbstractOperator {
private void addRowsToAggregators(TsBlock page, int startPosition, int
endPosition) {
TsBlock region = page.getRegion(startPosition, endPosition - startPosition
+ 1);
- for (TableAggregator aggregator : aggregators) {
- aggregator.processBlock(region);
+ long startTime = System.nanoTime();
+ try {
+ for (TableAggregator aggregator : aggregators) {
+ aggregator.processBlock(region);
+ }
+ } finally {
+
operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() -
startTime);
}
}
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java
index f372717a92f..07e4cb9adbb 100644
---
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java
@@ -140,8 +140,13 @@ public class InMemoryHashAggregationBuilder implements
HashAggregationBuilder {
operatorContext.recordSpecifiedInfo(MAX_GROUP_NUMBER,
Long.toString(groupCount));
maxGroupNumber = groupCount;
}
- for (GroupedAggregator groupedAggregator : groupedAggregators) {
- groupedAggregator.processBlock(groupCount, groupByIdBlock, block);
+ long startTime = System.nanoTime();
+ try {
+ for (GroupedAggregator groupedAggregator : groupedAggregators) {
+ groupedAggregator.processBlock(groupCount, groupByIdBlock, block);
+ }
+ } finally {
+
operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() -
startTime);
}
}
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java
index 636b11b4cc0..5452e544509 100644
---
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java
@@ -143,8 +143,11 @@ public class QueryExecutionMetricSet implements IMetricSet
{
// region query aggregation
public static final String AGGREGATION_FROM_RAW_DATA =
"aggregation_from_raw_data";
public static final String AGGREGATION_FROM_STATISTICS =
"aggregation_from_statistics";
+ public static final String AGGREGATION_OPERATOR_FROM_RAW_DATA =
+ "aggregation_operator_from_raw_data";
private Timer aggregationFromRawDataTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer aggregationFromStatisticsTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer aggregationOperatorFromRawDataTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
private void bindQueryAggregation(AbstractMetricService metricService) {
aggregationFromRawDataTimer =
@@ -156,12 +159,19 @@ public class QueryExecutionMetricSet implements
IMetricSet {
MetricLevel.IMPORTANT,
Tag.FROM.toString(),
"statistics");
+ aggregationOperatorFromRawDataTimer =
+ metricService.getOrCreateTimer(
+ Metric.AGGREGATION.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.FROM.toString(),
+ "raw_data_operator");
}
private void unbindQueryAggregation(AbstractMetricService metricService) {
aggregationFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
aggregationFromStatisticsTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
- Arrays.asList("raw_data", "statistics")
+ aggregationOperatorFromRawDataTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ Arrays.asList("raw_data", "statistics", "raw_data_operator")
.forEach(
from ->
metricService.remove(
@@ -213,6 +223,9 @@ public class QueryExecutionMetricSet implements IMetricSet {
case AGGREGATION_FROM_STATISTICS:
aggregationFromStatisticsTimer.update(costTimeInNanos,
TimeUnit.NANOSECONDS);
break;
+ case AGGREGATION_OPERATOR_FROM_RAW_DATA:
+ aggregationOperatorFromRawDataTimer.update(costTimeInNanos,
TimeUnit.NANOSECONDS);
+ break;
default:
break;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java
index 07b482bbdfb..2e8650f50aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.queryengine.execution.aggregation;
import org.apache.iotdb.calc.execution.aggregation.Accumulator;
-import org.apache.iotdb.calc.metric.QueryExecutionMetricSet;
import
org.apache.iotdb.commons.queryengine.plan.planner.plan.parameter.InputLocation;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
@@ -35,8 +34,6 @@ import java.util.Collections;
import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
-import static
org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
-import static
org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS;
public class TreeAggregator {
@@ -44,8 +41,6 @@ public class TreeAggregator {
// In some intermediate result input, inputLocation[] should include two
columns
protected List<InputLocation[]> inputLocationList;
protected final AggregationStep step;
- public static final QueryExecutionMetricSet QUERY_EXECUTION_METRICS =
- QueryExecutionMetricSet.getInstance();
// Used for SeriesAggregateScanOperator
public TreeAggregator(Accumulator accumulator, AggregationStep step) {
@@ -65,57 +60,44 @@ public class TreeAggregator {
// Used for SeriesAggregateScanOperator and RawDataAggregateOperator
public void processTsBlock(TsBlock tsBlock, BitMap bitMap) {
- long startTime = System.nanoTime();
- try {
- checkArgument(
- step.isInputRaw(),
- "Step in SeriesAggregateScanOperator and RawDataAggregateOperator
can only process raw input");
- for (InputLocation[] inputLocations : inputLocationList) {
- Column[] timeAndValueColumn = new Column[1 + inputLocations.length];
- timeAndValueColumn[0] = tsBlock.getTimeColumn();
- for (int i = 0; i < inputLocations.length; i++) {
- checkArgument(
- inputLocations[i].getTsBlockIndex() == 0,
- "RawDataAggregateOperator can only process one tsBlock input.");
- int index = inputLocations[i].getValueColumnIndex();
- // for count_time, time column is also its value column
- // for max_by, the input column can also be time column.
- timeAndValueColumn[1 + i] =
- index == -1 ? timeAndValueColumn[0] : tsBlock.getColumn(index);
- }
- accumulator.addInput(timeAndValueColumn, bitMap);
+ checkArgument(
+ step.isInputRaw(),
+ "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can
only process raw input");
+ for (InputLocation[] inputLocations : inputLocationList) {
+ Column[] timeAndValueColumn = new Column[1 + inputLocations.length];
+ timeAndValueColumn[0] = tsBlock.getTimeColumn();
+ for (int i = 0; i < inputLocations.length; i++) {
+ checkArgument(
+ inputLocations[i].getTsBlockIndex() == 0,
+ "RawDataAggregateOperator can only process one tsBlock input.");
+ int index = inputLocations[i].getValueColumnIndex();
+ // for count_time, time column is also its value column
+ // for max_by, the input column can also be time column.
+ timeAndValueColumn[1 + i] = index == -1 ? timeAndValueColumn[0] :
tsBlock.getColumn(index);
}
- } finally {
- QUERY_EXECUTION_METRICS.recordExecutionCost(
- AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
+ accumulator.addInput(timeAndValueColumn, bitMap);
}
}
// Used for AggregateOperator
public void processTsBlocks(TsBlock[] tsBlock) {
- long startTime = System.nanoTime();
- try {
- checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot
process raw input");
- if (step.isInputFinal()) {
- checkArgument(inputLocationList.size() == 1, "Final output can only be
single column");
- Column finalResult =
- tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn(
- inputLocationList.get(0)[0].getValueColumnIndex());
- accumulator.setFinal(finalResult);
- } else {
- for (InputLocation[] inputLocations : inputLocationList) {
- Column[] columns = new Column[inputLocations.length];
- for (int i = 0; i < inputLocations.length; i++) {
- columns[i] =
- tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
- inputLocations[i].getValueColumnIndex());
- }
- accumulator.addIntermediate(columns);
+ checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot
process raw input");
+ if (step.isInputFinal()) {
+ checkArgument(inputLocationList.size() == 1, "Final output can only be
single column");
+ Column finalResult =
+ tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn(
+ inputLocationList.get(0)[0].getValueColumnIndex());
+ accumulator.setFinal(finalResult);
+ } else {
+ for (InputLocation[] inputLocations : inputLocationList) {
+ Column[] columns = new Column[inputLocations.length];
+ for (int i = 0; i < inputLocations.length; i++) {
+ columns[i] =
+ tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
+ inputLocations[i].getValueColumnIndex());
}
+ accumulator.addIntermediate(columns);
}
- } finally {
- QUERY_EXECUTION_METRICS.recordExecutionCost(
- AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
}
}
@@ -129,16 +111,10 @@ public class TreeAggregator {
/** Used for SeriesAggregateScanOperator. */
public void processStatistics(Statistics timeStatistics, Statistics[]
valueStatistics) {
- long startTime = System.nanoTime();
- try {
- for (InputLocation[] inputLocations : inputLocationList) {
- int valueIndex = inputLocations[0].getValueColumnIndex();
- // valueIndex == -1 means it is count_time, we need to use
timeStatistics
- accumulator.addStatistics(valueIndex == -1 ? timeStatistics :
valueStatistics[valueIndex]);
- }
- } finally {
- QUERY_EXECUTION_METRICS.recordExecutionCost(
- AGGREGATION_FROM_STATISTICS, System.nanoTime() - startTime);
+ for (InputLocation[] inputLocations : inputLocationList) {
+ int valueIndex = inputLocations[0].getValueColumnIndex();
+ // valueIndex == -1 means it is count_time, we need to use timeStatistics
+ accumulator.addStatistics(valueIndex == -1 ? timeStatistics :
valueStatistics[valueIndex]);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 47eab012364..3ffa7e63e15 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.execution.fragment;
import org.apache.iotdb.calc.exception.MemoryNotEnoughException;
import org.apache.iotdb.calc.exception.QueryProcessException;
+import org.apache.iotdb.calc.metric.QueryExecutionMetricSet;
import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.audit.UserEntity;
@@ -86,6 +87,9 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA;
+import static
org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS;
+import static
org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_OPERATOR_FROM_RAW_DATA;
import static
org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause;
import static
org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME;
import static
org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME;
@@ -168,6 +172,9 @@ public class FragmentInstanceContext extends QueryContext {
private int initQueryDataSourceRetryCount = 0;
private final AtomicLong readyQueueTime = new AtomicLong(0);
private final AtomicLong blockQueueTime = new AtomicLong(0);
+ private final AtomicLong scanAggregationFromRawDataCost = new AtomicLong(0);
+ private final AtomicLong scanAggregationFromStatisticsCost = new
AtomicLong(0);
+ private final AtomicLong aggregationOperatorFromRawDataCost = new
AtomicLong(0);
private long unclosedSeqFileNum = 0;
private long unclosedUnseqFileNum = 0;
private long closedSeqFileNum = 0;
@@ -1117,6 +1124,8 @@ public class FragmentInstanceContext extends QueryContext
{
QueryRelatedResourceMetricSet.getInstance().updateFragmentInstanceTime(durationTime);
+ recordAggregationCostToMetric();
+
QueryResourceMetricSet.getInstance()
.recordInitQueryResourceRetryCount(getInitQueryDataSourceRetryCount());
@@ -1264,6 +1273,49 @@ public class FragmentInstanceContext extends
QueryContext {
blockQueueTime.addAndGet(time);
}
+ public void recordScanAggregationFromRawDataCost(long costTimeInNanos) {
+ addCost(scanAggregationFromRawDataCost, costTimeInNanos);
+ }
+
+ public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) {
+ addCost(scanAggregationFromStatisticsCost, costTimeInNanos);
+ }
+
+ public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) {
+ addCost(aggregationOperatorFromRawDataCost, costTimeInNanos);
+ }
+
+ private void addCost(AtomicLong cost, long costTimeInNanos) {
+ if (costTimeInNanos > 0) {
+ cost.addAndGet(costTimeInNanos);
+ }
+ }
+
+ long drainScanAggregationFromRawDataCost() {
+ return scanAggregationFromRawDataCost.getAndSet(0);
+ }
+
+ long drainScanAggregationFromStatisticsCost() {
+ return scanAggregationFromStatisticsCost.getAndSet(0);
+ }
+
+ long drainAggregationOperatorFromRawDataCost() {
+ return aggregationOperatorFromRawDataCost.getAndSet(0);
+ }
+
+ void recordAggregationCostToMetric() {
+ recordAggregationCost(AGGREGATION_FROM_RAW_DATA,
drainScanAggregationFromRawDataCost());
+ recordAggregationCost(AGGREGATION_FROM_STATISTICS,
drainScanAggregationFromStatisticsCost());
+ recordAggregationCost(
+ AGGREGATION_OPERATOR_FROM_RAW_DATA,
drainAggregationOperatorFromRawDataCost());
+ }
+
+ private void recordAggregationCost(String stage, long costTimeInNanos) {
+ if (costTimeInNanos > 0) {
+ QueryExecutionMetricSet.getInstance().recordExecutionCost(stage,
costTimeInNanos);
+ }
+ }
+
public long getReadyQueueTime() {
return readyQueueTime.get();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
index d9761552e28..584cb4fe74c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
@@ -71,6 +71,33 @@ public class OperatorContext extends CommonOperatorContext {
return getInstanceContext().getSessionInfo();
}
+ @Override
+ public void recordScanAggregationFromRawDataCost(long costTimeInNanos) {
+ if (driverContext != null && driverContext.getFragmentInstanceContext() !=
null) {
+ driverContext
+ .getFragmentInstanceContext()
+ .recordScanAggregationFromRawDataCost(costTimeInNanos);
+ }
+ }
+
+ @Override
+ public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) {
+ if (driverContext != null && driverContext.getFragmentInstanceContext() !=
null) {
+ driverContext
+ .getFragmentInstanceContext()
+ .recordScanAggregationFromStatisticsCost(costTimeInNanos);
+ }
+ }
+
+ @Override
+ public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) {
+ if (driverContext != null && driverContext.getFragmentInstanceContext() !=
null) {
+ driverContext
+ .getFragmentInstanceContext()
+ .recordAggregationOperatorFromRawDataCost(costTimeInNanos);
+ }
+ }
+
@Override
public MemoryReservationManager getMemoryReservationContext() {
return getInstanceContext().getMemoryReservationContext();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
index 51f6e1c8117..eec1c017430 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java
@@ -154,19 +154,24 @@ public class AggregationOperator extends
AbstractConsumeAllOperator {
private void calculateNextAggregationResult() {
// Consume current input tsBlocks
- for (TreeAggregator aggregator : aggregators) {
- aggregator.processTsBlocks(inputTsBlocks);
- }
+ long startTime = System.nanoTime();
+ try {
+ for (TreeAggregator aggregator : aggregators) {
+ aggregator.processTsBlocks(inputTsBlocks);
+ }
- for (int i = 0; i < inputOperatorsCount; i++) {
- inputTsBlocks[i] = inputTsBlocks[i].skipFirst();
- if (inputTsBlocks[i].isEmpty()) {
- inputTsBlocks[i] = null;
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ inputTsBlocks[i] = inputTsBlocks[i].skipFirst();
+ if (inputTsBlocks[i].isEmpty()) {
+ inputTsBlocks[i] = null;
+ }
}
- }
- // Update result using aggregators
- updateResultTsBlock();
+ // Update result using aggregators
+ updateResultTsBlock();
+ } finally {
+
operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() -
startTime);
+ }
}
private void updateResultTsBlock() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
index 3018adfd5e1..385cc48e67c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java
@@ -150,78 +150,83 @@ public class RawDataAggregationOperator extends
SingleInputAggregationOperator {
@SuppressWarnings({"squid:S3776", "squid:S135"})
private boolean calculateFromRawData() {
- // if window is not initialized, we should init window status and reset
aggregators
- if (!windowManager.isCurWindowInit() &&
!skipPreviousWindowAndInitCurWindow()) {
- return false;
- }
-
- // If current window has been initialized, we should judge whether
inputTsBlock is empty
- if (inputTsBlock == null || inputTsBlock.isEmpty()) {
- return false;
- }
-
- if (windowManager.satisfiedCurWindow(inputTsBlock)) {
-
- // Get the indexes in tsBlock which needs to be processed by aggregator,
and the last row
- // needed to be processed.
- int tsBlockSize = inputTsBlock.getPositionCount();
- IWindow curWindow = windowManager.getCurWindow();
-
- Column[] controlAndTimeColumn = new Column[2];
- controlAndTimeColumn[0] = curWindow.getControlColumn(inputTsBlock);
- controlAndTimeColumn[1] = inputTsBlock.getTimeColumn();
+ long startTime = System.nanoTime();
+ try {
+ // if window is not initialized, we should init window status and reset
aggregators
+ if (!windowManager.isCurWindowInit() &&
!skipPreviousWindowAndInitCurWindow()) {
+ return false;
+ }
- BitMap needProcess = new BitMap(tsBlockSize);
- int lastIndexToProcess = -1;
- boolean hasSkip = false;
+ // If current window has been initialized, we should judge whether
inputTsBlock is empty
+ if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+ return false;
+ }
- for (int i = 0; i < tsBlockSize; i++) {
- if (windowManager.isIgnoringNull() &&
controlAndTimeColumn[0].isNull(i)) {
+ if (windowManager.satisfiedCurWindow(inputTsBlock)) {
+
+ // Get the indexes in tsBlock which needs to be processed by
aggregator, and the last row
+ // needed to be processed.
+ int tsBlockSize = inputTsBlock.getPositionCount();
+ IWindow curWindow = windowManager.getCurWindow();
+
+ Column[] controlAndTimeColumn = new Column[2];
+ controlAndTimeColumn[0] = curWindow.getControlColumn(inputTsBlock);
+ controlAndTimeColumn[1] = inputTsBlock.getTimeColumn();
+
+ BitMap needProcess = new BitMap(tsBlockSize);
+ int lastIndexToProcess = -1;
+ boolean hasSkip = false;
+
+ for (int i = 0; i < tsBlockSize; i++) {
+ if (windowManager.isIgnoringNull() &&
controlAndTimeColumn[0].isNull(i)) {
+ lastIndexToProcess = i;
+ hasSkip = true;
+ continue;
+ }
+ if (!curWindow.satisfy(controlAndTimeColumn[0], i)) {
+ break;
+ }
+ needProcess.mark(i);
+ curWindow.mergeOnePoint(controlAndTimeColumn, i);
lastIndexToProcess = i;
- hasSkip = true;
- continue;
}
- if (!curWindow.satisfy(controlAndTimeColumn[0], i)) {
- break;
+
+ // if no row needs to skip, just send a null parameter.
+ if (!hasSkip) {
+ needProcess = null;
}
- needProcess.mark(i);
- curWindow.mergeOnePoint(controlAndTimeColumn, i);
- lastIndexToProcess = i;
- }
- // if no row needs to skip, just send a null parameter.
- if (!hasSkip) {
- needProcess = null;
- }
+ TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess +
1);
+ for (TreeAggregator aggregator : aggregators) {
+ // Current agg method has been calculated
+ if (aggregator.hasFinalResult()) {
+ continue;
+ }
- TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1);
- for (TreeAggregator aggregator : aggregators) {
- // Current agg method has been calculated
- if (aggregator.hasFinalResult()) {
- continue;
+ aggregator.processTsBlock(inputRegion, needProcess);
+ }
+ int lastReadRowIndex = lastIndexToProcess + 1;
+ // If lastReadRowIndex is not zero, some of tsBlock is consumed and
result is cached in
+ // aggregators.
+ if (lastReadRowIndex != 0) {
+ hasCachedDataInAggregator = true;
+ }
+ if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
+ inputTsBlock = null;
+ // For the last index of TsBlock, if we can know the aggregation
calculation is over
+ // we can directly updateResultTsBlock and return true
+ return isAllAggregatorsHasFinalResult(aggregators);
+ } else {
+ inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex);
+ return true;
}
-
- aggregator.processTsBlock(inputRegion, needProcess);
- }
- int lastReadRowIndex = lastIndexToProcess + 1;
- // If lastReadRowIndex is not zero, some of tsBlock is consumed and
result is cached in
- // aggregators.
- if (lastReadRowIndex != 0) {
- hasCachedDataInAggregator = true;
- }
- if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
- inputTsBlock = null;
- // For the last index of TsBlock, if we can know the aggregation
calculation is over
- // we can directly updateResultTsBlock and return true
- return isAllAggregatorsHasFinalResult(aggregators);
- } else {
- inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex);
- return true;
}
- }
- boolean isTsBlockOutOfBound =
windowManager.isTsBlockOutOfBound(inputTsBlock);
- return isAllAggregatorsHasFinalResult(aggregators) || isTsBlockOutOfBound;
+ boolean isTsBlockOutOfBound =
windowManager.isTsBlockOutOfBound(inputTsBlock);
+ return isAllAggregatorsHasFinalResult(aggregators) ||
isTsBlockOutOfBound;
+ } finally {
+
operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() -
startTime);
+ }
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
index adab0fb1d30..afa5c4c86ff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java
@@ -123,12 +123,17 @@ public class TagAggregationOperator extends
AbstractConsumeAllOperator {
}
private void aggregate(List<TreeAggregator> aggregators, TsBlock[]
rowBlocks) {
- for (TreeAggregator aggregator : aggregators) {
- if (aggregator == null) {
- continue;
+ long startTime = System.nanoTime();
+ try {
+ for (TreeAggregator aggregator : aggregators) {
+ if (aggregator == null) {
+ continue;
+ }
+ aggregator.reset();
+ aggregator.processTsBlocks(rowBlocks);
}
- aggregator.reset();
- aggregator.processTsBlocks(rowBlocks);
+ } finally {
+
operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() -
startTime);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index c63cd24d50e..3f9db7bc165 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -245,18 +245,28 @@ public abstract class
AbstractSeriesAggregationScanOperator extends AbstractData
}
private boolean calcFromRawData(TsBlock tsBlock) {
- Pair<Boolean, TsBlock> calcResult =
- calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange,
ascending);
- inputTsBlock = calcResult.getRight();
- return calcResult.getLeft();
+ long startTime = System.nanoTime();
+ try {
+ Pair<Boolean, TsBlock> calcResult =
+ calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange,
ascending);
+ inputTsBlock = calcResult.getRight();
+ return calcResult.getLeft();
+ } finally {
+ operatorContext.recordScanAggregationFromRawDataCost(System.nanoTime() -
startTime);
+ }
}
protected void calcFromStatistics(Statistics timeStatistics, Statistics[]
valueStatistics) {
- for (TreeAggregator aggregator : aggregators) {
- if (aggregator.hasFinalResult()) {
- continue;
+ long startTime = System.nanoTime();
+ try {
+ for (TreeAggregator aggregator : aggregators) {
+ if (aggregator.hasFinalResult()) {
+ continue;
+ }
+ aggregator.processStatistics(timeStatistics, valueStatistics);
}
- aggregator.processStatistics(timeStatistics, valueStatistics);
+ } finally {
+
operatorContext.recordScanAggregationFromStatisticsCost(System.nanoTime() -
startTime);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
index 031c10dc8a4..856961601f9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
@@ -285,28 +285,33 @@ public abstract class AbstractAggTableScanOperator
extends AbstractDataSourceOpe
return new Pair<>(false, inputTsBlock);
}
- updateCurTimeRange(inputTsBlock.getStartTime());
-
- TimeRange curTimeRange = timeIterator.getCurTimeRange();
- // check if the tsBlock does not contain points in current interval
- if (satisfiedTimeRange(inputTsBlock, curTimeRange, ascending)) {
- // skip points that cannot be calculated
- if ((ascending && inputTsBlock.getStartTime() < curTimeRange.getMin())
- || (!ascending && inputTsBlock.getStartTime() >
curTimeRange.getMax())) {
- inputTsBlock = skipPointsOutOfTimeRange(inputTsBlock, curTimeRange,
ascending);
+ long startTime = System.nanoTime();
+ try {
+ updateCurTimeRange(inputTsBlock.getStartTime());
+
+ TimeRange curTimeRange = timeIterator.getCurTimeRange();
+ // check if the tsBlock does not contain points in current interval
+ if (satisfiedTimeRange(inputTsBlock, curTimeRange, ascending)) {
+ // skip points that cannot be calculated
+ if ((ascending && inputTsBlock.getStartTime() < curTimeRange.getMin())
+ || (!ascending && inputTsBlock.getStartTime() >
curTimeRange.getMax())) {
+ inputTsBlock = skipPointsOutOfTimeRange(inputTsBlock, curTimeRange,
ascending);
+ }
+
+ inputTsBlock = process(inputTsBlock, curTimeRange);
}
- inputTsBlock = process(inputTsBlock, curTimeRange);
+ // judge whether the calculation finished
+ boolean isTsBlockOutOfBound =
+ inputTsBlock != null
+ && (ascending
+ ? inputTsBlock.getEndTime() > curTimeRange.getMax()
+ : inputTsBlock.getEndTime() < curTimeRange.getMin());
+ return new Pair<>(
+ isAllAggregatorsHasFinalResult(tableAggregators) ||
isTsBlockOutOfBound, inputTsBlock);
+ } finally {
+ operatorContext.recordScanAggregationFromRawDataCost(System.nanoTime() -
startTime);
}
-
- // judge whether the calculation finished
- boolean isTsBlockOutOfBound =
- inputTsBlock != null
- && (ascending
- ? inputTsBlock.getEndTime() > curTimeRange.getMax()
- : inputTsBlock.getEndTime() < curTimeRange.getMin());
- return new Pair<>(
- isAllAggregatorsHasFinalResult(tableAggregators) ||
isTsBlockOutOfBound, inputTsBlock);
}
private TsBlock process(TsBlock inputTsBlock, TimeRange curTimeRange) {
@@ -394,28 +399,32 @@ public abstract class AbstractAggTableScanOperator
extends AbstractDataSourceOpe
protected void calcFromStatistics(Statistics timeStatistics, Statistics[]
valueStatistics) {
int idx = -1;
+ long startTime = System.nanoTime();
+ try {
+ for (TableAggregator aggregator : tableAggregators) {
+ if (aggregator.hasFinalResult()) {
+ idx += aggregator.getChannelCount();
+ continue;
+ }
- for (TableAggregator aggregator : tableAggregators) {
- if (aggregator.hasFinalResult()) {
- idx += aggregator.getChannelCount();
- continue;
- }
+ Statistics[] statisticsArray = new
Statistics[aggregator.getChannelCount()];
+ for (int i = 0; i < aggregator.getChannelCount(); i++) {
+ idx++;
+
+ TsTableColumnCategory columnSchemaCategory =
+
aggColumnSchemas.get(aggregatorInputChannels.get(idx)).getColumnCategory();
+ statisticsArray[i] =
+ buildStatistics(
+ columnSchemaCategory,
+ timeStatistics,
+ valueStatistics,
+ aggregatorInputChannels.get(idx));
+ }
- Statistics[] statisticsArray = new
Statistics[aggregator.getChannelCount()];
- for (int i = 0; i < aggregator.getChannelCount(); i++) {
- idx++;
-
- TsTableColumnCategory columnSchemaCategory =
-
aggColumnSchemas.get(aggregatorInputChannels.get(idx)).getColumnCategory();
- statisticsArray[i] =
- buildStatistics(
- columnSchemaCategory,
- timeStatistics,
- valueStatistics,
- aggregatorInputChannels.get(idx));
+ aggregator.processStatistics(statisticsArray);
}
-
- aggregator.processStatistics(statisticsArray);
+ } finally {
+
operatorContext.recordScanAggregationFromStatisticsCost(System.nanoTime() -
startTime);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java
new file mode 100644
index 00000000000..0e621334621
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java
@@ -0,0 +1,623 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.fragment;
+
+import org.apache.iotdb.calc.execution.aggregation.Accumulator;
+import org.apache.iotdb.calc.execution.operator.CommonOperatorContext;
+import org.apache.iotdb.calc.execution.operator.Operator;
+import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.TableAccumulator;
+import
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.TableAggregator;
+import org.apache.iotdb.calc.metric.QueryExecutionMetricSet;
+import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.commons.queryengine.plan.planner.plan.parameter.InputLocation;
+import
org.apache.iotdb.commons.queryengine.plan.relational.planner.node.AggregationNode;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.execution.aggregation.TreeAggregator;
+import
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.TagAggregationOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.metrics.DoNothingMetricService;
+import org.apache.iotdb.metrics.type.HistogramSnapshot;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.junit.Test;
+
+import javax.management.ObjectName;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Collections.singletonList;
+import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
+import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class FragmentInstanceContextTest {
+
+ @Test
+ public void testDrainAggregationCostsSeparatelyOnce() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 0), "0");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext context =
createFragmentInstanceContext(instanceId, stateMachine);
+
+ context.recordScanAggregationFromRawDataCost(10);
+ context.recordScanAggregationFromRawDataCost(20);
+ context.recordScanAggregationFromStatisticsCost(30);
+ context.recordAggregationOperatorFromRawDataCost(40);
+
+ assertEquals(30, context.drainScanAggregationFromRawDataCost());
+ assertEquals(30, context.drainScanAggregationFromStatisticsCost());
+ assertEquals(40, context.drainAggregationOperatorFromRawDataCost());
+
+ assertEquals(0, context.drainScanAggregationFromRawDataCost());
+ assertEquals(0, context.drainScanAggregationFromStatisticsCost());
+ assertEquals(0, context.drainAggregationOperatorFromRawDataCost());
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void
testOperatorContextForwardsAggregationCostsToFragmentInstanceContext() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ FragmentInstanceContext context =
newFragmentInstanceContext(instanceNotificationExecutor);
+ DriverContext driverContext = new DriverContext(context, 0);
+ OperatorContext operatorContext =
+ driverContext.addOperatorContext(1, new PlanNodeId("forward"),
"forward");
+
+ operatorContext.recordScanAggregationFromRawDataCost(11);
+ operatorContext.recordScanAggregationFromStatisticsCost(13);
+ operatorContext.recordAggregationOperatorFromRawDataCost(17);
+
+ assertEquals(11, context.drainScanAggregationFromRawDataCost());
+ assertEquals(13, context.drainScanAggregationFromStatisticsCost());
+ assertEquals(17, context.drainAggregationOperatorFromRawDataCost());
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void
testOperatorContextWithoutFragmentInstanceAndCommonContextUseNoOpFallback() {
+ OperatorContext operatorContext =
+ new OperatorContext(1, new PlanNodeId("no-op"), "no-op", new
DriverContext());
+ operatorContext.recordScanAggregationFromRawDataCost(11);
+ operatorContext.recordScanAggregationFromStatisticsCost(13);
+ operatorContext.recordAggregationOperatorFromRawDataCost(17);
+
+ TestCommonOperatorContext commonOperatorContext = new
TestCommonOperatorContext();
+ commonOperatorContext.recordScanAggregationFromRawDataCost(19);
+ commonOperatorContext.recordScanAggregationFromStatisticsCost(23);
+ commonOperatorContext.recordAggregationOperatorFromRawDataCost(29);
+
+ assertEquals(0, commonOperatorContext.getTotalExecutionTimeInNanos());
+ assertNull(commonOperatorContext.getMemoryReservationContext());
+ }
+
+ @Test
+ public void testAggregationCostsFromOperatorsFlushToMetricStages() throws
Exception {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ RecordingMetricService metricService = new RecordingMetricService();
+ QueryExecutionMetricSet metricSet = QueryExecutionMetricSet.getInstance();
+ metricSet.bindTo(metricService);
+ try {
+ FragmentInstanceContext context =
newFragmentInstanceContext(instanceNotificationExecutor);
+ DriverContext driverContext = new DriverContext(context, 0);
+ OperatorContext scanOperatorContext =
+ driverContext.addOperatorContext(1, new PlanNodeId("scan"), "scan");
+ TestSeriesAggregationScanOperator scanOperator =
+ new TestSeriesAggregationScanOperator(scanOperatorContext);
+
+ scanOperator.calculateRaw(longTsBlock());
+ scanOperator.calculateStatistics(
+ Statistics.getStatsByType(TSDataType.INT64),
+ new Statistics[] {Statistics.getStatsByType(TSDataType.INT64)});
+
+ OperatorContext aggregationOperatorContext =
+ driverContext.addOperatorContext(2, new PlanNodeId("aggregation"),
"aggregation");
+
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.AggregationOperator
+ aggregationOperator =
newAggregationOperator(aggregationOperatorContext);
+ aggregationOperator.next();
+
+ context.recordAggregationCostToMetric();
+
+ assertEquals(1, metricService.count("raw_data"));
+ assertTrue(metricService.sum("raw_data") > 0);
+ assertEquals(1, metricService.count("statistics"));
+ assertTrue(metricService.sum("statistics") > 0);
+ assertEquals(1, metricService.count("raw_data_operator"));
+ assertTrue(metricService.sum("raw_data_operator") > 0);
+
+ context.recordAggregationCostToMetric();
+
+ assertEquals(1, metricService.count("raw_data"));
+ assertEquals(1, metricService.count("statistics"));
+ assertEquals(1, metricService.count("raw_data_operator"));
+ } finally {
+ metricSet.unbindFrom(metricService);
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void testAggregationOperatorRecordsTsBlockCostForIntermediateInput()
throws Exception {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ FragmentInstanceContext context =
newFragmentInstanceContext(instanceNotificationExecutor);
+ DriverContext driverContext = new DriverContext(context, 0);
+ OperatorContext operatorContext =
+ driverContext.addOperatorContext(1, new PlanNodeId("aggregation"),
"aggregation");
+
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.AggregationOperator
+ aggregationOperator = newAggregationOperator(operatorContext);
+
+ assertNull(aggregationOperator.next());
+
+ assertTrue(context.drainAggregationOperatorFromRawDataCost() > 0);
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void testTreeAggregationOperatorRecordsTsBlockCost() throws Exception
{
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ FragmentInstanceContext context =
newFragmentInstanceContext(instanceNotificationExecutor);
+ DriverContext driverContext = new DriverContext(context, 0);
+ OperatorContext operatorContext =
+ driverContext.addOperatorContext(1, new
PlanNodeId("tree-aggregation"), "aggregation");
+
org.apache.iotdb.db.queryengine.execution.operator.process.AggregationOperator
+ aggregationOperator =
+ new
org.apache.iotdb.db.queryengine.execution.operator.process.AggregationOperator(
+ operatorContext,
+ singletonList(newTreeFinalAggregator()),
+ new SingleTimeRangeIterator(new TimeRange(0, 10)),
+ singletonList(new SingleTsBlockOperator(operatorContext,
longTsBlock())),
+ false,
+ 1024);
+
+ aggregationOperator.isBlocked().get();
+ aggregationOperator.next();
+
+ assertTrue(context.drainAggregationOperatorFromRawDataCost() > 0);
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void testTagAggregationOperatorRecordsTsBlockCost() throws Exception {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ FragmentInstanceContext context =
newFragmentInstanceContext(instanceNotificationExecutor);
+ DriverContext driverContext = new DriverContext(context, 0);
+ OperatorContext operatorContext =
+ driverContext.addOperatorContext(1, new
PlanNodeId("tag-aggregation"), "tag-aggregation");
+ TagAggregationOperator tagAggregationOperator =
+ new TagAggregationOperator(
+ operatorContext,
+ singletonList(singletonList("tag")),
+ singletonList(singletonList(newTreeFinalAggregator())),
+ singletonList(new SingleTsBlockOperator(operatorContext,
longTsBlock())),
+ 1024);
+
+ tagAggregationOperator.isBlocked().get();
+ tagAggregationOperator.next();
+
+ assertTrue(context.drainAggregationOperatorFromRawDataCost() > 0);
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ private static FragmentInstanceContext newFragmentInstanceContext(
+ ExecutorService instanceNotificationExecutor) {
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 0), "0");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ return createFragmentInstanceContext(instanceId, stateMachine);
+ }
+
+ private static TsBlock longTsBlock() {
+ TsBlockBuilder builder = new
TsBlockBuilder(singletonList(TSDataType.INT64));
+ builder.getTimeColumnBuilder().writeLong(0);
+ builder.getColumnBuilder(0).writeLong(1);
+ builder.declarePosition();
+ return builder.build();
+ }
+
+ private static
org.apache.iotdb.calc.execution.operator.source.relational.aggregation
+ .AggregationOperator
+ newAggregationOperator(OperatorContext operatorContext) {
+ TableAggregator aggregator =
+ new TableAggregator(
+ new TestTableAccumulator(),
+ AggregationNode.Step.INTERMEDIATE,
+ TSDataType.INT64,
+ singletonList(0),
+ OptionalInt.empty());
+ return new
org.apache.iotdb.calc.execution.operator.source.relational.aggregation
+ .AggregationOperator(
+ operatorContext,
+ new SingleTsBlockOperator(operatorContext, longTsBlock()),
+ singletonList(aggregator));
+ }
+
+ private static TreeAggregator newTreeFinalAggregator() {
+ return new TreeAggregator(
+ new TestTreeAccumulator(),
+ AggregationStep.FINAL,
+ singletonList(new InputLocation[] {new InputLocation(0, 0)}));
+ }
+
+ private static class TestSeriesAggregationScanOperator
+ extends AbstractSeriesAggregationScanOperator {
+
+ private TestSeriesAggregationScanOperator(OperatorContext operatorContext)
{
+ super(
+ new PlanNodeId("scan"),
+ operatorContext,
+ null,
+ 1,
+ singletonList(new TreeAggregator(new TestTreeAccumulator(),
AggregationStep.SINGLE)),
+ new SingleTimeRangeIterator(new TimeRange(0, 10)),
+ true,
+ false,
+ null,
+ 1024,
+ 0,
+ true);
+ this.curTimeRange = new TimeRange(0, 10);
+ }
+
+ private void calculateRaw(TsBlock tsBlock) {
+ this.inputTsBlock = tsBlock;
+ calcFromCachedData();
+ }
+
+ private void calculateStatistics(Statistics timeStatistics, Statistics[]
valueStatistics) {
+ calcFromStatistics(timeStatistics, valueStatistics);
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return 0;
+ }
+ }
+
+ private static class SingleTimeRangeIterator implements ITimeRangeIterator {
+ private final TimeRange timeRange;
+ private boolean consumed;
+
+ private SingleTimeRangeIterator(TimeRange timeRange) {
+ this.timeRange = timeRange;
+ }
+
+ @Override
+ public TimeRange getFirstTimeRange() {
+ return timeRange;
+ }
+
+ @Override
+ public boolean hasNextTimeRange() {
+ return !consumed;
+ }
+
+ @Override
+ public TimeRange nextTimeRange() {
+ consumed = true;
+ return timeRange;
+ }
+
+ @Override
+ public boolean isAscending() {
+ return true;
+ }
+
+ @Override
+ public long currentOutputTime() {
+ return timeRange.getMin();
+ }
+
+ @Override
+ public long getTotalIntervalNum() {
+ return 1;
+ }
+ }
+
+ private static class RecordingMetricService extends DoNothingMetricService {
+ private final Map<String, RecordingTimer> timers = new HashMap<>();
+
+ @Override
+ public Timer getOrCreateTimer(String metric, MetricLevel metricLevel,
String... tags) {
+ if (!Metric.AGGREGATION.toString().equals(metric)
+ || tags.length != 2
+ || !Tag.FROM.toString().equals(tags[0])) {
+ return super.getOrCreateTimer(metric, metricLevel, tags);
+ }
+ return timers.computeIfAbsent(tags[1], key -> new RecordingTimer());
+ }
+
+ @Override
+ public void remove(MetricType type, String metric, String... tags) {
+ if (Metric.AGGREGATION.toString().equals(metric)
+ && tags.length == 2
+ && Tag.FROM.toString().equals(tags[0])) {
+ timers.remove(tags[1]);
+ }
+ }
+
+ long count(String from) {
+ RecordingTimer timer = timers.get(from);
+ return timer == null ? 0 : timer.getCount();
+ }
+
+ long sum(String from) {
+ RecordingTimer timer = timers.get(from);
+ return timer == null ? 0 : timer.sum.get();
+ }
+ }
+
+ private static class RecordingTimer implements Timer {
+ private final AtomicLong count = new AtomicLong();
+ private final AtomicLong sum = new AtomicLong();
+
+ @Override
+ public void update(long duration, TimeUnit unit) {
+ count.incrementAndGet();
+ sum.addAndGet(unit.toNanos(duration));
+ }
+
+ @Override
+ public HistogramSnapshot takeSnapshot() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getCount() {
+ return count.get();
+ }
+
+ @Override
+ public void setObjectName(ObjectName objectName) {
+ // no-op
+ }
+ }
+
+ private static class SingleTsBlockOperator implements Operator {
+ private final CommonOperatorContext operatorContext;
+ private final TsBlock tsBlock;
+ private boolean consumed;
+
+ private SingleTsBlockOperator(CommonOperatorContext operatorContext,
TsBlock tsBlock) {
+ this.operatorContext = operatorContext;
+ this.tsBlock = tsBlock;
+ }
+
+ @Override
+ public CommonOperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public TsBlock next() {
+ consumed = true;
+ return tsBlock;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !consumed;
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+
+ @Override
+ public boolean isFinished() {
+ return consumed;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return 0;
+ }
+ }
+
+ private static class TestTableAccumulator implements TableAccumulator {
+ @Override
+ public long getEstimatedSize() {
+ return 0;
+ }
+
+ @Override
+ public TableAccumulator copy() {
+ return new TestTableAccumulator();
+ }
+
+ @Override
+ public void addInput(
+ Column[] arguments,
+
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.AggregationMask
+ mask) {
+ // no-op
+ }
+
+ @Override
+ public void addIntermediate(Column argument) {
+ // no-op
+ }
+
+ @Override
+ public void evaluateIntermediate(ColumnBuilder columnBuilder) {
+ columnBuilder.writeLong(0);
+ }
+
+ @Override
+ public void evaluateFinal(ColumnBuilder columnBuilder) {
+ columnBuilder.writeLong(0);
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ @Override
+ public void addStatistics(Statistics[] statistics) {
+ // no-op
+ }
+
+ @Override
+ public void reset() {
+ // no-op
+ }
+ }
+
+ private static class TestTreeAccumulator implements Accumulator {
+ @Override
+ public void addInput(Column[] columns, org.apache.tsfile.utils.BitMap
bitMap) {
+ // no-op
+ }
+
+ @Override
+ public void addIntermediate(Column[] partialResult) {
+ // no-op
+ }
+
+ @Override
+ public void addStatistics(Statistics statistics) {
+ // no-op
+ }
+
+ @Override
+ public void setFinal(Column finalResult) {
+ // no-op
+ }
+
+ @Override
+ public void outputIntermediate(ColumnBuilder[] tsBlockBuilder) {
+ tsBlockBuilder[0].writeLong(0);
+ }
+
+ @Override
+ public void outputFinal(ColumnBuilder tsBlockBuilder) {
+ tsBlockBuilder.writeLong(0);
+ }
+
+ @Override
+ public void reset() {
+ // no-op
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ @Override
+ public TSDataType[] getIntermediateType() {
+ return new TSDataType[] {TSDataType.INT64};
+ }
+
+ @Override
+ public TSDataType getFinalType() {
+ return TSDataType.INT64;
+ }
+ }
+
+ private static class TestCommonOperatorContext extends CommonOperatorContext
{
+ private TestCommonOperatorContext() {
+ super(0, new PlanNodeId("common"), "common");
+ }
+
+ @Override
+ public MemoryReservationManager getMemoryReservationContext() {
+ return null;
+ }
+
+ @Override
+ public int getFragmentId() {
+ return 0;
+ }
+
+ @Override
+ public int getPipelineId() {
+ return 0;
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return 0;
+ }
+ }
+}