This is an automated email from the ASF dual-hosted git repository. JackieTien97 pushed a commit to branch fix-aggregation-metrics in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 87ac713007ab6234d844e34d91379ba3f8958c5b Author: JackieTien97 <[email protected]> AuthorDate: Wed Jun 24 14:53:55 2026 +0800 Fix aggregation metric recording --- .../execution/operator/CommonOperatorContext.java | 15 + .../aggregation/AggregationOperator.java | 15 +- .../relational/aggregation/TableAggregator.java | 49 +- .../grouped/StreamingAggregationOperator.java | 9 +- .../builder/InMemoryHashAggregationBuilder.java | 9 +- .../iotdb/calc/metric/QueryExecutionMetricSet.java | 15 +- .../execution/aggregation/TreeAggregator.java | 92 ++- .../fragment/FragmentInstanceContext.java | 53 ++ .../execution/operator/OperatorContext.java | 27 + .../operator/process/AggregationOperator.java | 25 +- .../process/RawDataAggregationOperator.java | 129 +++-- .../operator/process/TagAggregationOperator.java | 15 +- .../AbstractSeriesAggregationScanOperator.java | 26 +- .../relational/AbstractAggTableScanOperator.java | 85 +-- .../fragment/FragmentInstanceContextTest.java | 623 +++++++++++++++++++++ 15 files changed, 968 insertions(+), 219 deletions(-) diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java index 2c99fa5d777..0ffe44716ce 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/CommonOperatorContext.java @@ -103,6 +103,21 @@ public abstract class CommonOperatorContext implements Accountable { this.totalExecutionTimeInNanos += executionTimeInNanos; } + public void recordScanAggregationFromRawDataCost(long costTimeInNanos) { + // calc-commons operators can run in tests or standalone contexts that are not backed by a + // DataNode FragmentInstanceContext. DataNode OperatorContext overrides this to forward costs. + } + + public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) { + // calc-commons operators can run in tests or standalone contexts that are not backed by a + // DataNode FragmentInstanceContext. DataNode OperatorContext overrides this to forward costs. + } + + public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) { + // calc-commons operators can run in tests or standalone contexts that are not backed by a + // DataNode FragmentInstanceContext. DataNode OperatorContext overrides this to forward costs. + } + public void recordNextCalled() { this.nextCalledCount++; } diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java index e65ee82517d..047894fa8bc 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/AggregationOperator.java @@ -92,9 +92,7 @@ public class AggregationOperator implements ProcessOperator { return null; } - for (TableAggregator aggregator : aggregators) { - aggregator.processBlock(block); - } + processBlock(block); return null; } else { @@ -127,6 +125,17 @@ public class AggregationOperator implements ProcessOperator { return operatorContext; } + private void processBlock(TsBlock block) { + long startTime = System.nanoTime(); + try { + for (TableAggregator aggregator : aggregators) { + aggregator.processBlock(block); + } + } finally { + operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime); + } + } + @Override public long calculateMaxPeekMemory() { return Math.max( diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java index 1df3f6fb159..c7a7b6b7895 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableAggregator.java @@ -19,7 +19,6 @@ package org.apache.iotdb.calc.execution.operator.source.relational.aggregation; -import org.apache.iotdb.calc.metric.QueryExecutionMetricSet; import org.apache.iotdb.calc.plan.planner.CommonOperatorUtils; import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.AggregationNode; @@ -36,13 +35,9 @@ import java.util.OptionalInt; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; -import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA; public class TableAggregator { - public static final QueryExecutionMetricSet QUERY_EXECUTION_METRICS = - QueryExecutionMetricSet.getInstance(); - private final TableAccumulator accumulator; private final AggregationNode.Step step; private final TSDataType outputType; @@ -70,34 +65,28 @@ public class TableAggregator { } public void processBlock(TsBlock block) { - long startTime = System.nanoTime(); - try { - Column[] arguments = block.getColumns(inputChannels); - - // process count(*) - if (arguments.length == 0) { - arguments = - new Column[] { - new RunLengthEncodedColumn( - CommonOperatorUtils.TIME_COLUMN_TEMPLATE, block.getPositionCount()) - }; - } - - if (step.isInputRaw()) { - // Use select-all AggregationMask here because filter of Agg-Function is not supported now - AggregationMask mask = AggregationMask.createSelectAll(block.getPositionCount()); + Column[] arguments = block.getColumns(inputChannels); + + // process count(*) + if (arguments.length == 0) { + arguments = + new Column[] { + new RunLengthEncodedColumn( + CommonOperatorUtils.TIME_COLUMN_TEMPLATE, block.getPositionCount()) + }; + } - if (maskChannel.isPresent()) { - mask.applyMaskBlock(block.getColumn(maskChannel.getAsInt())); - } + if (step.isInputRaw()) { + // Use select-all AggregationMask here because filter of Agg-Function is not supported now + AggregationMask mask = AggregationMask.createSelectAll(block.getPositionCount()); - accumulator.addInput(arguments, mask); - } else { - accumulator.addIntermediate(arguments[0]); + if (maskChannel.isPresent()) { + mask.applyMaskBlock(block.getColumn(maskChannel.getAsInt())); } - } finally { - QUERY_EXECUTION_METRICS.recordExecutionCost( - AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime); + + accumulator.addInput(arguments, mask); + } else { + accumulator.addIntermediate(arguments[0]); } } diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java index 7609ac32926..9808a1b4eed 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java @@ -176,8 +176,13 @@ public class StreamingAggregationOperator extends AbstractOperator { private void addRowsToAggregators(TsBlock page, int startPosition, int endPosition) { TsBlock region = page.getRegion(startPosition, endPosition - startPosition + 1); - for (TableAggregator aggregator : aggregators) { - aggregator.processBlock(region); + long startTime = System.nanoTime(); + try { + for (TableAggregator aggregator : aggregators) { + aggregator.processBlock(region); + } + } finally { + operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime); } } diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java index f372717a92f..07e4cb9adbb 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/builder/InMemoryHashAggregationBuilder.java @@ -140,8 +140,13 @@ public class InMemoryHashAggregationBuilder implements HashAggregationBuilder { operatorContext.recordSpecifiedInfo(MAX_GROUP_NUMBER, Long.toString(groupCount)); maxGroupNumber = groupCount; } - for (GroupedAggregator groupedAggregator : groupedAggregators) { - groupedAggregator.processBlock(groupCount, groupByIdBlock, block); + long startTime = System.nanoTime(); + try { + for (GroupedAggregator groupedAggregator : groupedAggregators) { + groupedAggregator.processBlock(groupCount, groupByIdBlock, block); + } + } finally { + operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime); } } diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java index 636b11b4cc0..5452e544509 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/metric/QueryExecutionMetricSet.java @@ -143,8 +143,11 @@ public class QueryExecutionMetricSet implements IMetricSet { // region query aggregation public static final String AGGREGATION_FROM_RAW_DATA = "aggregation_from_raw_data"; public static final String AGGREGATION_FROM_STATISTICS = "aggregation_from_statistics"; + public static final String AGGREGATION_OPERATOR_FROM_RAW_DATA = + "aggregation_operator_from_raw_data"; private Timer aggregationFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Timer aggregationFromStatisticsTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer aggregationOperatorFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private void bindQueryAggregation(AbstractMetricService metricService) { aggregationFromRawDataTimer = @@ -156,12 +159,19 @@ public class QueryExecutionMetricSet implements IMetricSet { MetricLevel.IMPORTANT, Tag.FROM.toString(), "statistics"); + aggregationOperatorFromRawDataTimer = + metricService.getOrCreateTimer( + Metric.AGGREGATION.toString(), + MetricLevel.IMPORTANT, + Tag.FROM.toString(), + "raw_data_operator"); } private void unbindQueryAggregation(AbstractMetricService metricService) { aggregationFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER; aggregationFromStatisticsTimer = DoNothingMetricManager.DO_NOTHING_TIMER; - Arrays.asList("raw_data", "statistics") + aggregationOperatorFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + Arrays.asList("raw_data", "statistics", "raw_data_operator") .forEach( from -> metricService.remove( @@ -213,6 +223,9 @@ public class QueryExecutionMetricSet implements IMetricSet { case AGGREGATION_FROM_STATISTICS: aggregationFromStatisticsTimer.update(costTimeInNanos, TimeUnit.NANOSECONDS); break; + case AGGREGATION_OPERATOR_FROM_RAW_DATA: + aggregationOperatorFromRawDataTimer.update(costTimeInNanos, TimeUnit.NANOSECONDS); + break; default: break; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java index 07b482bbdfb..2e8650f50aa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TreeAggregator.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.queryengine.execution.aggregation; import org.apache.iotdb.calc.execution.aggregation.Accumulator; -import org.apache.iotdb.calc.metric.QueryExecutionMetricSet; import org.apache.iotdb.commons.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; @@ -35,8 +34,6 @@ import java.util.Collections; import java.util.List; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA; -import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS; public class TreeAggregator { @@ -44,8 +41,6 @@ public class TreeAggregator { // In some intermediate result input, inputLocation[] should include two columns protected List<InputLocation[]> inputLocationList; protected final AggregationStep step; - public static final QueryExecutionMetricSet QUERY_EXECUTION_METRICS = - QueryExecutionMetricSet.getInstance(); // Used for SeriesAggregateScanOperator public TreeAggregator(Accumulator accumulator, AggregationStep step) { @@ -65,57 +60,44 @@ public class TreeAggregator { // Used for SeriesAggregateScanOperator and RawDataAggregateOperator public void processTsBlock(TsBlock tsBlock, BitMap bitMap) { - long startTime = System.nanoTime(); - try { - checkArgument( - step.isInputRaw(), - "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input"); - for (InputLocation[] inputLocations : inputLocationList) { - Column[] timeAndValueColumn = new Column[1 + inputLocations.length]; - timeAndValueColumn[0] = tsBlock.getTimeColumn(); - for (int i = 0; i < inputLocations.length; i++) { - checkArgument( - inputLocations[i].getTsBlockIndex() == 0, - "RawDataAggregateOperator can only process one tsBlock input."); - int index = inputLocations[i].getValueColumnIndex(); - // for count_time, time column is also its value column - // for max_by, the input column can also be time column. - timeAndValueColumn[1 + i] = - index == -1 ? timeAndValueColumn[0] : tsBlock.getColumn(index); - } - accumulator.addInput(timeAndValueColumn, bitMap); + checkArgument( + step.isInputRaw(), + "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input"); + for (InputLocation[] inputLocations : inputLocationList) { + Column[] timeAndValueColumn = new Column[1 + inputLocations.length]; + timeAndValueColumn[0] = tsBlock.getTimeColumn(); + for (int i = 0; i < inputLocations.length; i++) { + checkArgument( + inputLocations[i].getTsBlockIndex() == 0, + "RawDataAggregateOperator can only process one tsBlock input."); + int index = inputLocations[i].getValueColumnIndex(); + // for count_time, time column is also its value column + // for max_by, the input column can also be time column. + timeAndValueColumn[1 + i] = index == -1 ? timeAndValueColumn[0] : tsBlock.getColumn(index); } - } finally { - QUERY_EXECUTION_METRICS.recordExecutionCost( - AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime); + accumulator.addInput(timeAndValueColumn, bitMap); } } // Used for AggregateOperator public void processTsBlocks(TsBlock[] tsBlock) { - long startTime = System.nanoTime(); - try { - checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input"); - if (step.isInputFinal()) { - checkArgument(inputLocationList.size() == 1, "Final output can only be single column"); - Column finalResult = - tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn( - inputLocationList.get(0)[0].getValueColumnIndex()); - accumulator.setFinal(finalResult); - } else { - for (InputLocation[] inputLocations : inputLocationList) { - Column[] columns = new Column[inputLocations.length]; - for (int i = 0; i < inputLocations.length; i++) { - columns[i] = - tsBlock[inputLocations[i].getTsBlockIndex()].getColumn( - inputLocations[i].getValueColumnIndex()); - } - accumulator.addIntermediate(columns); + checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input"); + if (step.isInputFinal()) { + checkArgument(inputLocationList.size() == 1, "Final output can only be single column"); + Column finalResult = + tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn( + inputLocationList.get(0)[0].getValueColumnIndex()); + accumulator.setFinal(finalResult); + } else { + for (InputLocation[] inputLocations : inputLocationList) { + Column[] columns = new Column[inputLocations.length]; + for (int i = 0; i < inputLocations.length; i++) { + columns[i] = + tsBlock[inputLocations[i].getTsBlockIndex()].getColumn( + inputLocations[i].getValueColumnIndex()); } + accumulator.addIntermediate(columns); } - } finally { - QUERY_EXECUTION_METRICS.recordExecutionCost( - AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime); } } @@ -129,16 +111,10 @@ public class TreeAggregator { /** Used for SeriesAggregateScanOperator. */ public void processStatistics(Statistics timeStatistics, Statistics[] valueStatistics) { - long startTime = System.nanoTime(); - try { - for (InputLocation[] inputLocations : inputLocationList) { - int valueIndex = inputLocations[0].getValueColumnIndex(); - // valueIndex == -1 means it is count_time, we need to use timeStatistics - accumulator.addStatistics(valueIndex == -1 ? timeStatistics : valueStatistics[valueIndex]); - } - } finally { - QUERY_EXECUTION_METRICS.recordExecutionCost( - AGGREGATION_FROM_STATISTICS, System.nanoTime() - startTime); + for (InputLocation[] inputLocations : inputLocationList) { + int valueIndex = inputLocations[0].getValueColumnIndex(); + // valueIndex == -1 means it is count_time, we need to use timeStatistics + accumulator.addStatistics(valueIndex == -1 ? timeStatistics : valueStatistics[valueIndex]); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 47eab012364..03d02456474 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.execution.fragment; import org.apache.iotdb.calc.exception.MemoryNotEnoughException; import org.apache.iotdb.calc.exception.QueryProcessException; +import org.apache.iotdb.calc.metric.QueryExecutionMetricSet; import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.audit.UserEntity; @@ -86,6 +87,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA; +import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS; +import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.AGGREGATION_OPERATOR_FROM_RAW_DATA; import static org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause; import static org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME; import static org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME; @@ -168,6 +172,9 @@ public class FragmentInstanceContext extends QueryContext { private int initQueryDataSourceRetryCount = 0; private final AtomicLong readyQueueTime = new AtomicLong(0); private final AtomicLong blockQueueTime = new AtomicLong(0); + private final AtomicLong scanAggregationFromRawDataCost = new AtomicLong(0); + private final AtomicLong scanAggregationFromStatisticsCost = new AtomicLong(0); + private final AtomicLong aggregationOperatorFromRawDataCost = new AtomicLong(0); private long unclosedSeqFileNum = 0; private long unclosedUnseqFileNum = 0; private long closedSeqFileNum = 0; @@ -1117,6 +1124,8 @@ public class FragmentInstanceContext extends QueryContext { QueryRelatedResourceMetricSet.getInstance().updateFragmentInstanceTime(durationTime); + recordAggregationCostToMetric(); + QueryResourceMetricSet.getInstance() .recordInitQueryResourceRetryCount(getInitQueryDataSourceRetryCount()); @@ -1264,6 +1273,50 @@ public class FragmentInstanceContext extends QueryContext { blockQueueTime.addAndGet(time); } + public void recordScanAggregationFromRawDataCost(long costTimeInNanos) { + addCost(scanAggregationFromRawDataCost, costTimeInNanos); + } + + public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) { + addCost(scanAggregationFromStatisticsCost, costTimeInNanos); + } + + public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) { + addCost(aggregationOperatorFromRawDataCost, costTimeInNanos); + } + + private void addCost(AtomicLong cost, long costTimeInNanos) { + if (costTimeInNanos > 0) { + cost.addAndGet(costTimeInNanos); + } + } + + long drainScanAggregationFromRawDataCost() { + return scanAggregationFromRawDataCost.getAndSet(0); + } + + long drainScanAggregationFromStatisticsCost() { + return scanAggregationFromStatisticsCost.getAndSet(0); + } + + long drainAggregationOperatorFromRawDataCost() { + return aggregationOperatorFromRawDataCost.getAndSet(0); + } + + @TestOnly + void recordAggregationCostToMetric() { + recordAggregationCost(AGGREGATION_FROM_RAW_DATA, drainScanAggregationFromRawDataCost()); + recordAggregationCost(AGGREGATION_FROM_STATISTICS, drainScanAggregationFromStatisticsCost()); + recordAggregationCost( + AGGREGATION_OPERATOR_FROM_RAW_DATA, drainAggregationOperatorFromRawDataCost()); + } + + private void recordAggregationCost(String stage, long costTimeInNanos) { + if (costTimeInNanos > 0) { + QueryExecutionMetricSet.getInstance().recordExecutionCost(stage, costTimeInNanos); + } + } + public long getReadyQueueTime() { return readyQueueTime.get(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java index d9761552e28..584cb4fe74c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java @@ -71,6 +71,33 @@ public class OperatorContext extends CommonOperatorContext { return getInstanceContext().getSessionInfo(); } + @Override + public void recordScanAggregationFromRawDataCost(long costTimeInNanos) { + if (driverContext != null && driverContext.getFragmentInstanceContext() != null) { + driverContext + .getFragmentInstanceContext() + .recordScanAggregationFromRawDataCost(costTimeInNanos); + } + } + + @Override + public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) { + if (driverContext != null && driverContext.getFragmentInstanceContext() != null) { + driverContext + .getFragmentInstanceContext() + .recordScanAggregationFromStatisticsCost(costTimeInNanos); + } + } + + @Override + public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) { + if (driverContext != null && driverContext.getFragmentInstanceContext() != null) { + driverContext + .getFragmentInstanceContext() + .recordAggregationOperatorFromRawDataCost(costTimeInNanos); + } + } + @Override public MemoryReservationManager getMemoryReservationContext() { return getInstanceContext().getMemoryReservationContext(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java index 51f6e1c8117..eec1c017430 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java @@ -154,19 +154,24 @@ public class AggregationOperator extends AbstractConsumeAllOperator { private void calculateNextAggregationResult() { // Consume current input tsBlocks - for (TreeAggregator aggregator : aggregators) { - aggregator.processTsBlocks(inputTsBlocks); - } + long startTime = System.nanoTime(); + try { + for (TreeAggregator aggregator : aggregators) { + aggregator.processTsBlocks(inputTsBlocks); + } - for (int i = 0; i < inputOperatorsCount; i++) { - inputTsBlocks[i] = inputTsBlocks[i].skipFirst(); - if (inputTsBlocks[i].isEmpty()) { - inputTsBlocks[i] = null; + for (int i = 0; i < inputOperatorsCount; i++) { + inputTsBlocks[i] = inputTsBlocks[i].skipFirst(); + if (inputTsBlocks[i].isEmpty()) { + inputTsBlocks[i] = null; + } } - } - // Update result using aggregators - updateResultTsBlock(); + // Update result using aggregators + updateResultTsBlock(); + } finally { + operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime); + } } private void updateResultTsBlock() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java index 3018adfd5e1..385cc48e67c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java @@ -150,78 +150,83 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator { @SuppressWarnings({"squid:S3776", "squid:S135"}) private boolean calculateFromRawData() { - // if window is not initialized, we should init window status and reset aggregators - if (!windowManager.isCurWindowInit() && !skipPreviousWindowAndInitCurWindow()) { - return false; - } - - // If current window has been initialized, we should judge whether inputTsBlock is empty - if (inputTsBlock == null || inputTsBlock.isEmpty()) { - return false; - } - - if (windowManager.satisfiedCurWindow(inputTsBlock)) { - - // Get the indexes in tsBlock which needs to be processed by aggregator, and the last row - // needed to be processed. - int tsBlockSize = inputTsBlock.getPositionCount(); - IWindow curWindow = windowManager.getCurWindow(); - - Column[] controlAndTimeColumn = new Column[2]; - controlAndTimeColumn[0] = curWindow.getControlColumn(inputTsBlock); - controlAndTimeColumn[1] = inputTsBlock.getTimeColumn(); + long startTime = System.nanoTime(); + try { + // if window is not initialized, we should init window status and reset aggregators + if (!windowManager.isCurWindowInit() && !skipPreviousWindowAndInitCurWindow()) { + return false; + } - BitMap needProcess = new BitMap(tsBlockSize); - int lastIndexToProcess = -1; - boolean hasSkip = false; + // If current window has been initialized, we should judge whether inputTsBlock is empty + if (inputTsBlock == null || inputTsBlock.isEmpty()) { + return false; + } - for (int i = 0; i < tsBlockSize; i++) { - if (windowManager.isIgnoringNull() && controlAndTimeColumn[0].isNull(i)) { + if (windowManager.satisfiedCurWindow(inputTsBlock)) { + + // Get the indexes in tsBlock which needs to be processed by aggregator, and the last row + // needed to be processed. + int tsBlockSize = inputTsBlock.getPositionCount(); + IWindow curWindow = windowManager.getCurWindow(); + + Column[] controlAndTimeColumn = new Column[2]; + controlAndTimeColumn[0] = curWindow.getControlColumn(inputTsBlock); + controlAndTimeColumn[1] = inputTsBlock.getTimeColumn(); + + BitMap needProcess = new BitMap(tsBlockSize); + int lastIndexToProcess = -1; + boolean hasSkip = false; + + for (int i = 0; i < tsBlockSize; i++) { + if (windowManager.isIgnoringNull() && controlAndTimeColumn[0].isNull(i)) { + lastIndexToProcess = i; + hasSkip = true; + continue; + } + if (!curWindow.satisfy(controlAndTimeColumn[0], i)) { + break; + } + needProcess.mark(i); + curWindow.mergeOnePoint(controlAndTimeColumn, i); lastIndexToProcess = i; - hasSkip = true; - continue; } - if (!curWindow.satisfy(controlAndTimeColumn[0], i)) { - break; + + // if no row needs to skip, just send a null parameter. + if (!hasSkip) { + needProcess = null; } - needProcess.mark(i); - curWindow.mergeOnePoint(controlAndTimeColumn, i); - lastIndexToProcess = i; - } - // if no row needs to skip, just send a null parameter. - if (!hasSkip) { - needProcess = null; - } + TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1); + for (TreeAggregator aggregator : aggregators) { + // Current agg method has been calculated + if (aggregator.hasFinalResult()) { + continue; + } - TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1); - for (TreeAggregator aggregator : aggregators) { - // Current agg method has been calculated - if (aggregator.hasFinalResult()) { - continue; + aggregator.processTsBlock(inputRegion, needProcess); + } + int lastReadRowIndex = lastIndexToProcess + 1; + // If lastReadRowIndex is not zero, some of tsBlock is consumed and result is cached in + // aggregators. + if (lastReadRowIndex != 0) { + hasCachedDataInAggregator = true; + } + if (lastReadRowIndex >= inputTsBlock.getPositionCount()) { + inputTsBlock = null; + // For the last index of TsBlock, if we can know the aggregation calculation is over + // we can directly updateResultTsBlock and return true + return isAllAggregatorsHasFinalResult(aggregators); + } else { + inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex); + return true; } - - aggregator.processTsBlock(inputRegion, needProcess); - } - int lastReadRowIndex = lastIndexToProcess + 1; - // If lastReadRowIndex is not zero, some of tsBlock is consumed and result is cached in - // aggregators. - if (lastReadRowIndex != 0) { - hasCachedDataInAggregator = true; - } - if (lastReadRowIndex >= inputTsBlock.getPositionCount()) { - inputTsBlock = null; - // For the last index of TsBlock, if we can know the aggregation calculation is over - // we can directly updateResultTsBlock and return true - return isAllAggregatorsHasFinalResult(aggregators); - } else { - inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex); - return true; } - } - boolean isTsBlockOutOfBound = windowManager.isTsBlockOutOfBound(inputTsBlock); - return isAllAggregatorsHasFinalResult(aggregators) || isTsBlockOutOfBound; + boolean isTsBlockOutOfBound = windowManager.isTsBlockOutOfBound(inputTsBlock); + return isAllAggregatorsHasFinalResult(aggregators) || isTsBlockOutOfBound; + } finally { + operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime); + } } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java index adab0fb1d30..afa5c4c86ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java @@ -123,12 +123,17 @@ public class TagAggregationOperator extends AbstractConsumeAllOperator { } private void aggregate(List<TreeAggregator> aggregators, TsBlock[] rowBlocks) { - for (TreeAggregator aggregator : aggregators) { - if (aggregator == null) { - continue; + long startTime = System.nanoTime(); + try { + for (TreeAggregator aggregator : aggregators) { + if (aggregator == null) { + continue; + } + aggregator.reset(); + aggregator.processTsBlocks(rowBlocks); } - aggregator.reset(); - aggregator.processTsBlocks(rowBlocks); + } finally { + operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java index c63cd24d50e..3f9db7bc165 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java @@ -245,18 +245,28 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData } private boolean calcFromRawData(TsBlock tsBlock) { - Pair<Boolean, TsBlock> calcResult = - calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, ascending); - inputTsBlock = calcResult.getRight(); - return calcResult.getLeft(); + long startTime = System.nanoTime(); + try { + Pair<Boolean, TsBlock> calcResult = + calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, ascending); + inputTsBlock = calcResult.getRight(); + return calcResult.getLeft(); + } finally { + operatorContext.recordScanAggregationFromRawDataCost(System.nanoTime() - startTime); + } } protected void calcFromStatistics(Statistics timeStatistics, Statistics[] valueStatistics) { - for (TreeAggregator aggregator : aggregators) { - if (aggregator.hasFinalResult()) { - continue; + long startTime = System.nanoTime(); + try { + for (TreeAggregator aggregator : aggregators) { + if (aggregator.hasFinalResult()) { + continue; + } + aggregator.processStatistics(timeStatistics, valueStatistics); } - aggregator.processStatistics(timeStatistics, valueStatistics); + } finally { + operatorContext.recordScanAggregationFromStatisticsCost(System.nanoTime() - startTime); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java index 031c10dc8a4..856961601f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java @@ -285,28 +285,33 @@ public abstract class AbstractAggTableScanOperator extends AbstractDataSourceOpe return new Pair<>(false, inputTsBlock); } - updateCurTimeRange(inputTsBlock.getStartTime()); - - TimeRange curTimeRange = timeIterator.getCurTimeRange(); - // check if the tsBlock does not contain points in current interval - if (satisfiedTimeRange(inputTsBlock, curTimeRange, ascending)) { - // skip points that cannot be calculated - if ((ascending && inputTsBlock.getStartTime() < curTimeRange.getMin()) - || (!ascending && inputTsBlock.getStartTime() > curTimeRange.getMax())) { - inputTsBlock = skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, ascending); + long startTime = System.nanoTime(); + try { + updateCurTimeRange(inputTsBlock.getStartTime()); + + TimeRange curTimeRange = timeIterator.getCurTimeRange(); + // check if the tsBlock does not contain points in current interval + if (satisfiedTimeRange(inputTsBlock, curTimeRange, ascending)) { + // skip points that cannot be calculated + if ((ascending && inputTsBlock.getStartTime() < curTimeRange.getMin()) + || (!ascending && inputTsBlock.getStartTime() > curTimeRange.getMax())) { + inputTsBlock = skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, ascending); + } + + inputTsBlock = process(inputTsBlock, curTimeRange); } - inputTsBlock = process(inputTsBlock, curTimeRange); + // judge whether the calculation finished + boolean isTsBlockOutOfBound = + inputTsBlock != null + && (ascending + ? inputTsBlock.getEndTime() > curTimeRange.getMax() + : inputTsBlock.getEndTime() < curTimeRange.getMin()); + return new Pair<>( + isAllAggregatorsHasFinalResult(tableAggregators) || isTsBlockOutOfBound, inputTsBlock); + } finally { + operatorContext.recordScanAggregationFromRawDataCost(System.nanoTime() - startTime); } - - // judge whether the calculation finished - boolean isTsBlockOutOfBound = - inputTsBlock != null - && (ascending - ? inputTsBlock.getEndTime() > curTimeRange.getMax() - : inputTsBlock.getEndTime() < curTimeRange.getMin()); - return new Pair<>( - isAllAggregatorsHasFinalResult(tableAggregators) || isTsBlockOutOfBound, inputTsBlock); } private TsBlock process(TsBlock inputTsBlock, TimeRange curTimeRange) { @@ -394,28 +399,32 @@ public abstract class AbstractAggTableScanOperator extends AbstractDataSourceOpe protected void calcFromStatistics(Statistics timeStatistics, Statistics[] valueStatistics) { int idx = -1; + long startTime = System.nanoTime(); + try { + for (TableAggregator aggregator : tableAggregators) { + if (aggregator.hasFinalResult()) { + idx += aggregator.getChannelCount(); + continue; + } - for (TableAggregator aggregator : tableAggregators) { - if (aggregator.hasFinalResult()) { - idx += aggregator.getChannelCount(); - continue; - } + Statistics[] statisticsArray = new Statistics[aggregator.getChannelCount()]; + for (int i = 0; i < aggregator.getChannelCount(); i++) { + idx++; + + TsTableColumnCategory columnSchemaCategory = + aggColumnSchemas.get(aggregatorInputChannels.get(idx)).getColumnCategory(); + statisticsArray[i] = + buildStatistics( + columnSchemaCategory, + timeStatistics, + valueStatistics, + aggregatorInputChannels.get(idx)); + } - Statistics[] statisticsArray = new Statistics[aggregator.getChannelCount()]; - for (int i = 0; i < aggregator.getChannelCount(); i++) { - idx++; - - TsTableColumnCategory columnSchemaCategory = - aggColumnSchemas.get(aggregatorInputChannels.get(idx)).getColumnCategory(); - statisticsArray[i] = - buildStatistics( - columnSchemaCategory, - timeStatistics, - valueStatistics, - aggregatorInputChannels.get(idx)); + aggregator.processStatistics(statisticsArray); } - - aggregator.processStatistics(statisticsArray); + } finally { + operatorContext.recordScanAggregationFromStatisticsCost(System.nanoTime() - startTime); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java new file mode 100644 index 00000000000..0e621334621 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java @@ -0,0 +1,623 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.fragment; + +import org.apache.iotdb.calc.execution.aggregation.Accumulator; +import org.apache.iotdb.calc.execution.operator.CommonOperatorContext; +import org.apache.iotdb.calc.execution.operator.Operator; +import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.TableAccumulator; +import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.TableAggregator; +import org.apache.iotdb.calc.metric.QueryExecutionMetricSet; +import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.parameter.InputLocation; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.AggregationNode; +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.execution.aggregation.TreeAggregator; +import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITimeRangeIterator; +import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.TagAggregationOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; +import org.apache.iotdb.metrics.DoNothingMetricService; +import org.apache.iotdb.metrics.type.HistogramSnapshot; +import org.apache.iotdb.metrics.type.Timer; +import org.apache.iotdb.metrics.utils.MetricLevel; +import org.apache.iotdb.metrics.utils.MetricType; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.junit.Test; + +import javax.management.ObjectName; + +import java.util.HashMap; +import java.util.Map; +import java.util.OptionalInt; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.Collections.singletonList; +import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID; +import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class FragmentInstanceContextTest { + + @Test + public void testDrainAggregationCostsSeparatelyOnce() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 0), "0"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext context = createFragmentInstanceContext(instanceId, stateMachine); + + context.recordScanAggregationFromRawDataCost(10); + context.recordScanAggregationFromRawDataCost(20); + context.recordScanAggregationFromStatisticsCost(30); + context.recordAggregationOperatorFromRawDataCost(40); + + assertEquals(30, context.drainScanAggregationFromRawDataCost()); + assertEquals(30, context.drainScanAggregationFromStatisticsCost()); + assertEquals(40, context.drainAggregationOperatorFromRawDataCost()); + + assertEquals(0, context.drainScanAggregationFromRawDataCost()); + assertEquals(0, context.drainScanAggregationFromStatisticsCost()); + assertEquals(0, context.drainAggregationOperatorFromRawDataCost()); + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void testOperatorContextForwardsAggregationCostsToFragmentInstanceContext() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + FragmentInstanceContext context = newFragmentInstanceContext(instanceNotificationExecutor); + DriverContext driverContext = new DriverContext(context, 0); + OperatorContext operatorContext = + driverContext.addOperatorContext(1, new PlanNodeId("forward"), "forward"); + + operatorContext.recordScanAggregationFromRawDataCost(11); + operatorContext.recordScanAggregationFromStatisticsCost(13); + operatorContext.recordAggregationOperatorFromRawDataCost(17); + + assertEquals(11, context.drainScanAggregationFromRawDataCost()); + assertEquals(13, context.drainScanAggregationFromStatisticsCost()); + assertEquals(17, context.drainAggregationOperatorFromRawDataCost()); + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void testOperatorContextWithoutFragmentInstanceAndCommonContextUseNoOpFallback() { + OperatorContext operatorContext = + new OperatorContext(1, new PlanNodeId("no-op"), "no-op", new DriverContext()); + operatorContext.recordScanAggregationFromRawDataCost(11); + operatorContext.recordScanAggregationFromStatisticsCost(13); + operatorContext.recordAggregationOperatorFromRawDataCost(17); + + TestCommonOperatorContext commonOperatorContext = new TestCommonOperatorContext(); + commonOperatorContext.recordScanAggregationFromRawDataCost(19); + commonOperatorContext.recordScanAggregationFromStatisticsCost(23); + commonOperatorContext.recordAggregationOperatorFromRawDataCost(29); + + assertEquals(0, commonOperatorContext.getTotalExecutionTimeInNanos()); + assertNull(commonOperatorContext.getMemoryReservationContext()); + } + + @Test + public void testAggregationCostsFromOperatorsFlushToMetricStages() throws Exception { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + RecordingMetricService metricService = new RecordingMetricService(); + QueryExecutionMetricSet metricSet = QueryExecutionMetricSet.getInstance(); + metricSet.bindTo(metricService); + try { + FragmentInstanceContext context = newFragmentInstanceContext(instanceNotificationExecutor); + DriverContext driverContext = new DriverContext(context, 0); + OperatorContext scanOperatorContext = + driverContext.addOperatorContext(1, new PlanNodeId("scan"), "scan"); + TestSeriesAggregationScanOperator scanOperator = + new TestSeriesAggregationScanOperator(scanOperatorContext); + + scanOperator.calculateRaw(longTsBlock()); + scanOperator.calculateStatistics( + Statistics.getStatsByType(TSDataType.INT64), + new Statistics[] {Statistics.getStatsByType(TSDataType.INT64)}); + + OperatorContext aggregationOperatorContext = + driverContext.addOperatorContext(2, new PlanNodeId("aggregation"), "aggregation"); + org.apache.iotdb.calc.execution.operator.source.relational.aggregation.AggregationOperator + aggregationOperator = newAggregationOperator(aggregationOperatorContext); + aggregationOperator.next(); + + context.recordAggregationCostToMetric(); + + assertEquals(1, metricService.count("raw_data")); + assertTrue(metricService.sum("raw_data") > 0); + assertEquals(1, metricService.count("statistics")); + assertTrue(metricService.sum("statistics") > 0); + assertEquals(1, metricService.count("raw_data_operator")); + assertTrue(metricService.sum("raw_data_operator") > 0); + + context.recordAggregationCostToMetric(); + + assertEquals(1, metricService.count("raw_data")); + assertEquals(1, metricService.count("statistics")); + assertEquals(1, metricService.count("raw_data_operator")); + } finally { + metricSet.unbindFrom(metricService); + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void testAggregationOperatorRecordsTsBlockCostForIntermediateInput() throws Exception { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + FragmentInstanceContext context = newFragmentInstanceContext(instanceNotificationExecutor); + DriverContext driverContext = new DriverContext(context, 0); + OperatorContext operatorContext = + driverContext.addOperatorContext(1, new PlanNodeId("aggregation"), "aggregation"); + org.apache.iotdb.calc.execution.operator.source.relational.aggregation.AggregationOperator + aggregationOperator = newAggregationOperator(operatorContext); + + assertNull(aggregationOperator.next()); + + assertTrue(context.drainAggregationOperatorFromRawDataCost() > 0); + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void testTreeAggregationOperatorRecordsTsBlockCost() throws Exception { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + FragmentInstanceContext context = newFragmentInstanceContext(instanceNotificationExecutor); + DriverContext driverContext = new DriverContext(context, 0); + OperatorContext operatorContext = + driverContext.addOperatorContext(1, new PlanNodeId("tree-aggregation"), "aggregation"); + org.apache.iotdb.db.queryengine.execution.operator.process.AggregationOperator + aggregationOperator = + new org.apache.iotdb.db.queryengine.execution.operator.process.AggregationOperator( + operatorContext, + singletonList(newTreeFinalAggregator()), + new SingleTimeRangeIterator(new TimeRange(0, 10)), + singletonList(new SingleTsBlockOperator(operatorContext, longTsBlock())), + false, + 1024); + + aggregationOperator.isBlocked().get(); + aggregationOperator.next(); + + assertTrue(context.drainAggregationOperatorFromRawDataCost() > 0); + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void testTagAggregationOperatorRecordsTsBlockCost() throws Exception { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + FragmentInstanceContext context = newFragmentInstanceContext(instanceNotificationExecutor); + DriverContext driverContext = new DriverContext(context, 0); + OperatorContext operatorContext = + driverContext.addOperatorContext(1, new PlanNodeId("tag-aggregation"), "tag-aggregation"); + TagAggregationOperator tagAggregationOperator = + new TagAggregationOperator( + operatorContext, + singletonList(singletonList("tag")), + singletonList(singletonList(newTreeFinalAggregator())), + singletonList(new SingleTsBlockOperator(operatorContext, longTsBlock())), + 1024); + + tagAggregationOperator.isBlocked().get(); + tagAggregationOperator.next(); + + assertTrue(context.drainAggregationOperatorFromRawDataCost() > 0); + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + private static FragmentInstanceContext newFragmentInstanceContext( + ExecutorService instanceNotificationExecutor) { + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 0), "0"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + return createFragmentInstanceContext(instanceId, stateMachine); + } + + private static TsBlock longTsBlock() { + TsBlockBuilder builder = new TsBlockBuilder(singletonList(TSDataType.INT64)); + builder.getTimeColumnBuilder().writeLong(0); + builder.getColumnBuilder(0).writeLong(1); + builder.declarePosition(); + return builder.build(); + } + + private static org.apache.iotdb.calc.execution.operator.source.relational.aggregation + .AggregationOperator + newAggregationOperator(OperatorContext operatorContext) { + TableAggregator aggregator = + new TableAggregator( + new TestTableAccumulator(), + AggregationNode.Step.INTERMEDIATE, + TSDataType.INT64, + singletonList(0), + OptionalInt.empty()); + return new org.apache.iotdb.calc.execution.operator.source.relational.aggregation + .AggregationOperator( + operatorContext, + new SingleTsBlockOperator(operatorContext, longTsBlock()), + singletonList(aggregator)); + } + + private static TreeAggregator newTreeFinalAggregator() { + return new TreeAggregator( + new TestTreeAccumulator(), + AggregationStep.FINAL, + singletonList(new InputLocation[] {new InputLocation(0, 0)})); + } + + private static class TestSeriesAggregationScanOperator + extends AbstractSeriesAggregationScanOperator { + + private TestSeriesAggregationScanOperator(OperatorContext operatorContext) { + super( + new PlanNodeId("scan"), + operatorContext, + null, + 1, + singletonList(new TreeAggregator(new TestTreeAccumulator(), AggregationStep.SINGLE)), + new SingleTimeRangeIterator(new TimeRange(0, 10)), + true, + false, + null, + 1024, + 0, + true); + this.curTimeRange = new TimeRange(0, 10); + } + + private void calculateRaw(TsBlock tsBlock) { + this.inputTsBlock = tsBlock; + calcFromCachedData(); + } + + private void calculateStatistics(Statistics timeStatistics, Statistics[] valueStatistics) { + calcFromStatistics(timeStatistics, valueStatistics); + } + + @Override + public long ramBytesUsed() { + return 0; + } + } + + private static class SingleTimeRangeIterator implements ITimeRangeIterator { + private final TimeRange timeRange; + private boolean consumed; + + private SingleTimeRangeIterator(TimeRange timeRange) { + this.timeRange = timeRange; + } + + @Override + public TimeRange getFirstTimeRange() { + return timeRange; + } + + @Override + public boolean hasNextTimeRange() { + return !consumed; + } + + @Override + public TimeRange nextTimeRange() { + consumed = true; + return timeRange; + } + + @Override + public boolean isAscending() { + return true; + } + + @Override + public long currentOutputTime() { + return timeRange.getMin(); + } + + @Override + public long getTotalIntervalNum() { + return 1; + } + } + + private static class RecordingMetricService extends DoNothingMetricService { + private final Map<String, RecordingTimer> timers = new HashMap<>(); + + @Override + public Timer getOrCreateTimer(String metric, MetricLevel metricLevel, String... tags) { + if (!Metric.AGGREGATION.toString().equals(metric) + || tags.length != 2 + || !Tag.FROM.toString().equals(tags[0])) { + return super.getOrCreateTimer(metric, metricLevel, tags); + } + return timers.computeIfAbsent(tags[1], key -> new RecordingTimer()); + } + + @Override + public void remove(MetricType type, String metric, String... tags) { + if (Metric.AGGREGATION.toString().equals(metric) + && tags.length == 2 + && Tag.FROM.toString().equals(tags[0])) { + timers.remove(tags[1]); + } + } + + long count(String from) { + RecordingTimer timer = timers.get(from); + return timer == null ? 0 : timer.getCount(); + } + + long sum(String from) { + RecordingTimer timer = timers.get(from); + return timer == null ? 0 : timer.sum.get(); + } + } + + private static class RecordingTimer implements Timer { + private final AtomicLong count = new AtomicLong(); + private final AtomicLong sum = new AtomicLong(); + + @Override + public void update(long duration, TimeUnit unit) { + count.incrementAndGet(); + sum.addAndGet(unit.toNanos(duration)); + } + + @Override + public HistogramSnapshot takeSnapshot() { + throw new UnsupportedOperationException(); + } + + @Override + public long getCount() { + return count.get(); + } + + @Override + public void setObjectName(ObjectName objectName) { + // no-op + } + } + + private static class SingleTsBlockOperator implements Operator { + private final CommonOperatorContext operatorContext; + private final TsBlock tsBlock; + private boolean consumed; + + private SingleTsBlockOperator(CommonOperatorContext operatorContext, TsBlock tsBlock) { + this.operatorContext = operatorContext; + this.tsBlock = tsBlock; + } + + @Override + public CommonOperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() { + consumed = true; + return tsBlock; + } + + @Override + public boolean hasNext() { + return !consumed; + } + + @Override + public void close() { + // no-op + } + + @Override + public boolean isFinished() { + return consumed; + } + + @Override + public long calculateMaxPeekMemory() { + return 0; + } + + @Override + public long calculateMaxReturnSize() { + return 0; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } + } + + private static class TestTableAccumulator implements TableAccumulator { + @Override + public long getEstimatedSize() { + return 0; + } + + @Override + public TableAccumulator copy() { + return new TestTableAccumulator(); + } + + @Override + public void addInput( + Column[] arguments, + org.apache.iotdb.calc.execution.operator.source.relational.aggregation.AggregationMask + mask) { + // no-op + } + + @Override + public void addIntermediate(Column argument) { + // no-op + } + + @Override + public void evaluateIntermediate(ColumnBuilder columnBuilder) { + columnBuilder.writeLong(0); + } + + @Override + public void evaluateFinal(ColumnBuilder columnBuilder) { + columnBuilder.writeLong(0); + } + + @Override + public boolean hasFinalResult() { + return false; + } + + @Override + public void addStatistics(Statistics[] statistics) { + // no-op + } + + @Override + public void reset() { + // no-op + } + } + + private static class TestTreeAccumulator implements Accumulator { + @Override + public void addInput(Column[] columns, org.apache.tsfile.utils.BitMap bitMap) { + // no-op + } + + @Override + public void addIntermediate(Column[] partialResult) { + // no-op + } + + @Override + public void addStatistics(Statistics statistics) { + // no-op + } + + @Override + public void setFinal(Column finalResult) { + // no-op + } + + @Override + public void outputIntermediate(ColumnBuilder[] tsBlockBuilder) { + tsBlockBuilder[0].writeLong(0); + } + + @Override + public void outputFinal(ColumnBuilder tsBlockBuilder) { + tsBlockBuilder.writeLong(0); + } + + @Override + public void reset() { + // no-op + } + + @Override + public boolean hasFinalResult() { + return false; + } + + @Override + public TSDataType[] getIntermediateType() { + return new TSDataType[] {TSDataType.INT64}; + } + + @Override + public TSDataType getFinalType() { + return TSDataType.INT64; + } + } + + private static class TestCommonOperatorContext extends CommonOperatorContext { + private TestCommonOperatorContext() { + super(0, new PlanNodeId("common"), "common"); + } + + @Override + public MemoryReservationManager getMemoryReservationContext() { + return null; + } + + @Override + public int getFragmentId() { + return 0; + } + + @Override + public int getPipelineId() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } + } +}
