This is an automated email from the ASF dual-hosted git repository. JackieTien97 pushed a commit to branch codex/cp-pr-18018-1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fa7ad3493969268fa6eee4a5570a88e27120df72 Author: JackieTien97 <[email protected]> AuthorDate: Wed Jun 24 17:08:42 2026 +0800 Optimize aggregation metric recording --- .../execution/aggregation/Aggregator.java | 92 ++++------ .../fragment/FragmentInstanceContext.java | 52 ++++++ .../execution/operator/OperatorContext.java | 24 +++ .../operator/process/AggregationOperator.java | 25 +-- .../process/RawDataAggregationOperator.java | 129 ++++++------- .../operator/process/TagAggregationOperator.java | 15 +- .../AbstractSeriesAggregationScanOperator.java | 26 ++- .../metric/QueryExecutionMetricSet.java | 15 +- .../fragment/FragmentInstanceContextTest.java | 200 +++++++++++++++++++++ 9 files changed, 434 insertions(+), 144 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java index 6992d80b53f..5661f54a85a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Aggregator.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.queryengine.execution.aggregation; -import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; @@ -34,8 +33,6 @@ import java.util.Collections; import java.util.List; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA; -import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS; public class Aggregator { @@ -43,8 +40,6 @@ public class Aggregator { // In some intermediate result input, inputLocation[] should include two columns protected List<InputLocation[]> inputLocationList; protected final AggregationStep step; - protected static final QueryExecutionMetricSet QUERY_EXECUTION_METRICS = - QueryExecutionMetricSet.getInstance(); // Used for SeriesAggregateScanOperator public Aggregator(Accumulator accumulator, AggregationStep step) { @@ -64,57 +59,44 @@ public class Aggregator { // Used for SeriesAggregateScanOperator and RawDataAggregateOperator public void processTsBlock(TsBlock tsBlock, BitMap bitMap) { - long startTime = System.nanoTime(); - try { - checkArgument( - step.isInputRaw(), - "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input"); - for (InputLocation[] inputLocations : inputLocationList) { - Column[] timeAndValueColumn = new Column[1 + inputLocations.length]; - timeAndValueColumn[0] = tsBlock.getTimeColumn(); - for (int i = 0; i < inputLocations.length; i++) { - checkArgument( - inputLocations[i].getTsBlockIndex() == 0, - "RawDataAggregateOperator can only process one tsBlock input."); - int index = inputLocations[i].getValueColumnIndex(); - // for count_time, time column is also its value column - // for max_by, the input column can also be time column. - timeAndValueColumn[1 + i] = - index == -1 ? timeAndValueColumn[0] : tsBlock.getColumn(index); - } - accumulator.addInput(timeAndValueColumn, bitMap); + checkArgument( + step.isInputRaw(), + "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input"); + for (InputLocation[] inputLocations : inputLocationList) { + Column[] timeAndValueColumn = new Column[1 + inputLocations.length]; + timeAndValueColumn[0] = tsBlock.getTimeColumn(); + for (int i = 0; i < inputLocations.length; i++) { + checkArgument( + inputLocations[i].getTsBlockIndex() == 0, + "RawDataAggregateOperator can only process one tsBlock input."); + int index = inputLocations[i].getValueColumnIndex(); + // for count_time, time column is also its value column + // for max_by, the input column can also be time column. + timeAndValueColumn[1 + i] = index == -1 ? timeAndValueColumn[0] : tsBlock.getColumn(index); } - } finally { - QUERY_EXECUTION_METRICS.recordExecutionCost( - AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime); + accumulator.addInput(timeAndValueColumn, bitMap); } } // Used for AggregateOperator public void processTsBlocks(TsBlock[] tsBlock) { - long startTime = System.nanoTime(); - try { - checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input"); - if (step.isInputFinal()) { - checkArgument(inputLocationList.size() == 1, "Final output can only be single column"); - Column finalResult = - tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn( - inputLocationList.get(0)[0].getValueColumnIndex()); - accumulator.setFinal(finalResult); - } else { - for (InputLocation[] inputLocations : inputLocationList) { - Column[] columns = new Column[inputLocations.length]; - for (int i = 0; i < inputLocations.length; i++) { - columns[i] = - tsBlock[inputLocations[i].getTsBlockIndex()].getColumn( - inputLocations[i].getValueColumnIndex()); - } - accumulator.addIntermediate(columns); + checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input"); + if (step.isInputFinal()) { + checkArgument(inputLocationList.size() == 1, "Final output can only be single column"); + Column finalResult = + tsBlock[inputLocationList.get(0)[0].getTsBlockIndex()].getColumn( + inputLocationList.get(0)[0].getValueColumnIndex()); + accumulator.setFinal(finalResult); + } else { + for (InputLocation[] inputLocations : inputLocationList) { + Column[] columns = new Column[inputLocations.length]; + for (int i = 0; i < inputLocations.length; i++) { + columns[i] = + tsBlock[inputLocations[i].getTsBlockIndex()].getColumn( + inputLocations[i].getValueColumnIndex()); } + accumulator.addIntermediate(columns); } - } finally { - QUERY_EXECUTION_METRICS.recordExecutionCost( - AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime); } } @@ -128,16 +110,10 @@ public class Aggregator { /** Used for SeriesAggregateScanOperator. */ public void processStatistics(Statistics timeStatistics, Statistics[] valueStatistics) { - long startTime = System.nanoTime(); - try { - for (InputLocation[] inputLocations : inputLocationList) { - int valueIndex = inputLocations[0].getValueColumnIndex(); - // valueIndex == -1 means it is count_time, we need to use timeStatistics - accumulator.addStatistics(valueIndex == -1 ? timeStatistics : valueStatistics[valueIndex]); - } - } finally { - QUERY_EXECUTION_METRICS.recordExecutionCost( - AGGREGATION_FROM_STATISTICS, System.nanoTime() - startTime); + for (InputLocation[] inputLocations : inputLocationList) { + int valueIndex = inputLocations[0].getValueColumnIndex(); + // valueIndex == -1 means it is count_time, we need to use timeStatistics + accumulator.addStatistics(valueIndex == -1 ? timeStatistics : valueStatistics[valueIndex]); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index a224e5b2c35..d579b7e15c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException; import org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet; +import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet; import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet; @@ -77,6 +78,9 @@ import java.util.stream.Collectors; import static org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME; import static org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME; +import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.AGGREGATION_FROM_RAW_DATA; +import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.AGGREGATION_FROM_STATISTICS; +import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.AGGREGATION_OPERATOR_FROM_RAW_DATA; import static org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.EMPTY_QUERY_DATA_SOURCE; import static org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.UNFINISHED_QUERY_DATA_SOURCE; @@ -149,6 +153,9 @@ public class FragmentInstanceContext extends QueryContext { private int initQueryDataSourceRetryCount = 0; private final AtomicLong readyQueueTime = new AtomicLong(0); private final AtomicLong blockQueueTime = new AtomicLong(0); + private final AtomicLong scanAggregationFromRawDataCost = new AtomicLong(0); + private final AtomicLong scanAggregationFromStatisticsCost = new AtomicLong(0); + private final AtomicLong aggregationOperatorFromRawDataCost = new AtomicLong(0); private long unclosedSeqFileNum = 0; private long unclosedUnseqFileNum = 0; private long closedSeqFileNum = 0; @@ -970,6 +977,8 @@ public class FragmentInstanceContext extends QueryContext { QueryRelatedResourceMetricSet.getInstance().updateFragmentInstanceTime(durationTime); + recordAggregationCostToMetric(); + QueryResourceMetricSet.getInstance() .recordInitQueryResourceRetryCount(getInitQueryDataSourceRetryCount()); @@ -1115,6 +1124,49 @@ public class FragmentInstanceContext extends QueryContext { blockQueueTime.addAndGet(time); } + public void recordScanAggregationFromRawDataCost(long costTimeInNanos) { + addCost(scanAggregationFromRawDataCost, costTimeInNanos); + } + + public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) { + addCost(scanAggregationFromStatisticsCost, costTimeInNanos); + } + + public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) { + addCost(aggregationOperatorFromRawDataCost, costTimeInNanos); + } + + private void addCost(AtomicLong cost, long costTimeInNanos) { + if (costTimeInNanos > 0) { + cost.addAndGet(costTimeInNanos); + } + } + + long drainScanAggregationFromRawDataCost() { + return scanAggregationFromRawDataCost.getAndSet(0); + } + + long drainScanAggregationFromStatisticsCost() { + return scanAggregationFromStatisticsCost.getAndSet(0); + } + + long drainAggregationOperatorFromRawDataCost() { + return aggregationOperatorFromRawDataCost.getAndSet(0); + } + + void recordAggregationCostToMetric() { + recordAggregationCost(AGGREGATION_FROM_RAW_DATA, drainScanAggregationFromRawDataCost()); + recordAggregationCost(AGGREGATION_FROM_STATISTICS, drainScanAggregationFromStatisticsCost()); + recordAggregationCost( + AGGREGATION_OPERATOR_FROM_RAW_DATA, drainAggregationOperatorFromRawDataCost()); + } + + private void recordAggregationCost(String stage, long costTimeInNanos) { + if (costTimeInNanos > 0) { + QueryExecutionMetricSet.getInstance().recordExecutionCost(stage, costTimeInNanos); + } + } + public long getReadyQueueTime() { return readyQueueTime.get(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java index 49be9ad1cc0..90bded8a0ee 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java @@ -119,6 +119,30 @@ public class OperatorContext implements Accountable { return getInstanceContext().getSessionInfo(); } + public void recordScanAggregationFromRawDataCost(long costTimeInNanos) { + if (driverContext != null && driverContext.getFragmentInstanceContext() != null) { + driverContext + .getFragmentInstanceContext() + .recordScanAggregationFromRawDataCost(costTimeInNanos); + } + } + + public void recordScanAggregationFromStatisticsCost(long costTimeInNanos) { + if (driverContext != null && driverContext.getFragmentInstanceContext() != null) { + driverContext + .getFragmentInstanceContext() + .recordScanAggregationFromStatisticsCost(costTimeInNanos); + } + } + + public void recordAggregationOperatorFromRawDataCost(long costTimeInNanos) { + if (driverContext != null && driverContext.getFragmentInstanceContext() != null) { + driverContext + .getFragmentInstanceContext() + .recordAggregationOperatorFromRawDataCost(costTimeInNanos); + } + } + public PlanNodeId getPlanNodeId() { return planNodeId; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java index ff1de243b27..ae36cc39c6b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationOperator.java @@ -153,19 +153,24 @@ public class AggregationOperator extends AbstractConsumeAllOperator { private void calculateNextAggregationResult() { // Consume current input tsBlocks - for (Aggregator aggregator : aggregators) { - aggregator.processTsBlocks(inputTsBlocks); - } + long startTime = System.nanoTime(); + try { + for (Aggregator aggregator : aggregators) { + aggregator.processTsBlocks(inputTsBlocks); + } - for (int i = 0; i < inputOperatorsCount; i++) { - inputTsBlocks[i] = inputTsBlocks[i].skipFirst(); - if (inputTsBlocks[i].isEmpty()) { - inputTsBlocks[i] = null; + for (int i = 0; i < inputOperatorsCount; i++) { + inputTsBlocks[i] = inputTsBlocks[i].skipFirst(); + if (inputTsBlocks[i].isEmpty()) { + inputTsBlocks[i] = null; + } } - } - // Update result using aggregators - updateResultTsBlock(); + // Update result using aggregators + updateResultTsBlock(); + } finally { + operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime); + } } private void updateResultTsBlock() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java index 69f67503cc4..3ca36ff7d67 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/RawDataAggregationOperator.java @@ -150,78 +150,83 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator { @SuppressWarnings({"squid:S3776", "squid:S135"}) private boolean calculateFromRawData() { - // if window is not initialized, we should init window status and reset aggregators - if (!windowManager.isCurWindowInit() && !skipPreviousWindowAndInitCurWindow()) { - return false; - } - - // If current window has been initialized, we should judge whether inputTsBlock is empty - if (inputTsBlock == null || inputTsBlock.isEmpty()) { - return false; - } - - if (windowManager.satisfiedCurWindow(inputTsBlock)) { - - // Get the indexes in tsBlock which needs to be processed by aggregator, and the last row - // needed to be processed. - int tsBlockSize = inputTsBlock.getPositionCount(); - IWindow curWindow = windowManager.getCurWindow(); - - Column[] controlAndTimeColumn = new Column[2]; - controlAndTimeColumn[0] = curWindow.getControlColumn(inputTsBlock); - controlAndTimeColumn[1] = inputTsBlock.getTimeColumn(); + long startTime = System.nanoTime(); + try { + // if window is not initialized, we should init window status and reset aggregators + if (!windowManager.isCurWindowInit() && !skipPreviousWindowAndInitCurWindow()) { + return false; + } - BitMap needProcess = new BitMap(tsBlockSize); - int lastIndexToProcess = -1; - boolean hasSkip = false; + // If current window has been initialized, we should judge whether inputTsBlock is empty + if (inputTsBlock == null || inputTsBlock.isEmpty()) { + return false; + } - for (int i = 0; i < tsBlockSize; i++) { - if (windowManager.isIgnoringNull() && controlAndTimeColumn[0].isNull(i)) { + if (windowManager.satisfiedCurWindow(inputTsBlock)) { + + // Get the indexes in tsBlock which needs to be processed by aggregator, and the last row + // needed to be processed. + int tsBlockSize = inputTsBlock.getPositionCount(); + IWindow curWindow = windowManager.getCurWindow(); + + Column[] controlAndTimeColumn = new Column[2]; + controlAndTimeColumn[0] = curWindow.getControlColumn(inputTsBlock); + controlAndTimeColumn[1] = inputTsBlock.getTimeColumn(); + + BitMap needProcess = new BitMap(tsBlockSize); + int lastIndexToProcess = -1; + boolean hasSkip = false; + + for (int i = 0; i < tsBlockSize; i++) { + if (windowManager.isIgnoringNull() && controlAndTimeColumn[0].isNull(i)) { + lastIndexToProcess = i; + hasSkip = true; + continue; + } + if (!curWindow.satisfy(controlAndTimeColumn[0], i)) { + break; + } + needProcess.mark(i); + curWindow.mergeOnePoint(controlAndTimeColumn, i); lastIndexToProcess = i; - hasSkip = true; - continue; } - if (!curWindow.satisfy(controlAndTimeColumn[0], i)) { - break; + + // if no row needs to skip, just send a null parameter. + if (!hasSkip) { + needProcess = null; } - needProcess.mark(i); - curWindow.mergeOnePoint(controlAndTimeColumn, i); - lastIndexToProcess = i; - } - // if no row needs to skip, just send a null parameter. - if (!hasSkip) { - needProcess = null; - } + TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1); + for (Aggregator aggregator : aggregators) { + // Current agg method has been calculated + if (aggregator.hasFinalResult()) { + continue; + } - TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1); - for (Aggregator aggregator : aggregators) { - // Current agg method has been calculated - if (aggregator.hasFinalResult()) { - continue; + aggregator.processTsBlock(inputRegion, needProcess); + } + int lastReadRowIndex = lastIndexToProcess + 1; + // If lastReadRowIndex is not zero, some of tsBlock is consumed and result is cached in + // aggregators. + if (lastReadRowIndex != 0) { + hasCachedDataInAggregator = true; + } + if (lastReadRowIndex >= inputTsBlock.getPositionCount()) { + inputTsBlock = null; + // For the last index of TsBlock, if we can know the aggregation calculation is over + // we can directly updateResultTsBlock and return true + return isAllAggregatorsHasFinalResult(aggregators); + } else { + inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex); + return true; } - - aggregator.processTsBlock(inputRegion, needProcess); - } - int lastReadRowIndex = lastIndexToProcess + 1; - // If lastReadRowIndex is not zero, some of tsBlock is consumed and result is cached in - // aggregators. - if (lastReadRowIndex != 0) { - hasCachedDataInAggregator = true; - } - if (lastReadRowIndex >= inputTsBlock.getPositionCount()) { - inputTsBlock = null; - // For the last index of TsBlock, if we can know the aggregation calculation is over - // we can directly updateResultTsBlock and return true - return isAllAggregatorsHasFinalResult(aggregators); - } else { - inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex); - return true; } - } - boolean isTsBlockOutOfBound = windowManager.isTsBlockOutOfBound(inputTsBlock); - return isAllAggregatorsHasFinalResult(aggregators) || isTsBlockOutOfBound; + boolean isTsBlockOutOfBound = windowManager.isTsBlockOutOfBound(inputTsBlock); + return isAllAggregatorsHasFinalResult(aggregators) || isTsBlockOutOfBound; + } finally { + operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime); + } } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java index d7399fb4554..243b9ee0744 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TagAggregationOperator.java @@ -122,12 +122,17 @@ public class TagAggregationOperator extends AbstractConsumeAllOperator { } private void aggregate(List<Aggregator> aggregators, TsBlock[] rowBlocks) { - for (Aggregator aggregator : aggregators) { - if (aggregator == null) { - continue; + long startTime = System.nanoTime(); + try { + for (Aggregator aggregator : aggregators) { + if (aggregator == null) { + continue; + } + aggregator.reset(); + aggregator.processTsBlocks(rowBlocks); } - aggregator.reset(); - aggregator.processTsBlocks(rowBlocks); + } finally { + operatorContext.recordAggregationOperatorFromRawDataCost(System.nanoTime() - startTime); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java index fe24513ae43..0d69b933ee7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java @@ -244,18 +244,28 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData } private boolean calcFromRawData(TsBlock tsBlock) { - Pair<Boolean, TsBlock> calcResult = - calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, ascending); - inputTsBlock = calcResult.getRight(); - return calcResult.getLeft(); + long startTime = System.nanoTime(); + try { + Pair<Boolean, TsBlock> calcResult = + calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, ascending); + inputTsBlock = calcResult.getRight(); + return calcResult.getLeft(); + } finally { + operatorContext.recordScanAggregationFromRawDataCost(System.nanoTime() - startTime); + } } protected void calcFromStatistics(Statistics timeStatistics, Statistics[] valueStatistics) { - for (Aggregator aggregator : aggregators) { - if (aggregator.hasFinalResult()) { - continue; + long startTime = System.nanoTime(); + try { + for (Aggregator aggregator : aggregators) { + if (aggregator.hasFinalResult()) { + continue; + } + aggregator.processStatistics(timeStatistics, valueStatistics); } - aggregator.processStatistics(timeStatistics, valueStatistics); + } finally { + operatorContext.recordScanAggregationFromStatisticsCost(System.nanoTime() - startTime); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryExecutionMetricSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryExecutionMetricSet.java index 27a238d3d8c..8766dc60624 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryExecutionMetricSet.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/QueryExecutionMetricSet.java @@ -143,8 +143,11 @@ public class QueryExecutionMetricSet implements IMetricSet { // region query aggregation public static final String AGGREGATION_FROM_RAW_DATA = "aggregation_from_raw_data"; public static final String AGGREGATION_FROM_STATISTICS = "aggregation_from_statistics"; + public static final String AGGREGATION_OPERATOR_FROM_RAW_DATA = + "aggregation_operator_from_raw_data"; private Timer aggregationFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Timer aggregationFromStatisticsTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer aggregationOperatorFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private void bindQueryAggregation(AbstractMetricService metricService) { aggregationFromRawDataTimer = @@ -156,12 +159,19 @@ public class QueryExecutionMetricSet implements IMetricSet { MetricLevel.IMPORTANT, Tag.FROM.toString(), "statistics"); + aggregationOperatorFromRawDataTimer = + metricService.getOrCreateTimer( + Metric.AGGREGATION.toString(), + MetricLevel.IMPORTANT, + Tag.FROM.toString(), + "raw_data_operator"); } private void unbindQueryAggregation(AbstractMetricService metricService) { aggregationFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER; aggregationFromStatisticsTimer = DoNothingMetricManager.DO_NOTHING_TIMER; - Arrays.asList("raw_data", "statistics") + aggregationOperatorFromRawDataTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + Arrays.asList("raw_data", "statistics", "raw_data_operator") .forEach( from -> metricService.remove( @@ -213,6 +223,9 @@ public class QueryExecutionMetricSet implements IMetricSet { case AGGREGATION_FROM_STATISTICS: aggregationFromStatisticsTimer.update(costTimeInNanos, TimeUnit.NANOSECONDS); break; + case AGGREGATION_OPERATOR_FROM_RAW_DATA: + aggregationOperatorFromRawDataTimer.update(costTimeInNanos, TimeUnit.NANOSECONDS); + break; default: break; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java new file mode 100644 index 00000000000..6f085e6235b --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContextTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.fragment; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.metrics.DoNothingMetricService; +import org.apache.iotdb.metrics.type.HistogramSnapshot; +import org.apache.iotdb.metrics.type.Timer; +import org.apache.iotdb.metrics.utils.MetricLevel; +import org.apache.iotdb.metrics.utils.MetricType; + +import org.junit.Test; + +import javax.management.ObjectName; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.junit.Assert.assertEquals; + +public class FragmentInstanceContextTest { + + @Test + public void testDrainAggregationCostsSeparatelyOnce() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + FragmentInstanceContext context = newFragmentInstanceContext(instanceNotificationExecutor); + + context.recordScanAggregationFromRawDataCost(10); + context.recordScanAggregationFromRawDataCost(20); + context.recordScanAggregationFromStatisticsCost(30); + context.recordAggregationOperatorFromRawDataCost(40); + + assertEquals(30, context.drainScanAggregationFromRawDataCost()); + assertEquals(30, context.drainScanAggregationFromStatisticsCost()); + assertEquals(40, context.drainAggregationOperatorFromRawDataCost()); + + assertEquals(0, context.drainScanAggregationFromRawDataCost()); + assertEquals(0, context.drainScanAggregationFromStatisticsCost()); + assertEquals(0, context.drainAggregationOperatorFromRawDataCost()); + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void testOperatorContextForwardsAggregationCostsToFragmentInstanceContext() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + FragmentInstanceContext context = newFragmentInstanceContext(instanceNotificationExecutor); + DriverContext driverContext = new DriverContext(context, 0); + OperatorContext operatorContext = + driverContext.addOperatorContext(1, new PlanNodeId("aggregation"), "aggregation"); + + operatorContext.recordScanAggregationFromRawDataCost(11); + operatorContext.recordScanAggregationFromStatisticsCost(13); + operatorContext.recordAggregationOperatorFromRawDataCost(17); + + assertEquals(11, context.drainScanAggregationFromRawDataCost()); + assertEquals(13, context.drainScanAggregationFromStatisticsCost()); + assertEquals(17, context.drainAggregationOperatorFromRawDataCost()); + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void testAggregationCostsFlushToMetricStagesOnce() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + RecordingMetricService metricService = new RecordingMetricService(); + QueryExecutionMetricSet metricSet = QueryExecutionMetricSet.getInstance(); + metricSet.bindTo(metricService); + try { + FragmentInstanceContext context = newFragmentInstanceContext(instanceNotificationExecutor); + + context.recordScanAggregationFromRawDataCost(10); + context.recordScanAggregationFromStatisticsCost(20); + context.recordAggregationOperatorFromRawDataCost(30); + + context.recordAggregationCostToMetric(); + + assertEquals(1, metricService.count("raw_data")); + assertEquals(10, metricService.sum("raw_data")); + assertEquals(1, metricService.count("statistics")); + assertEquals(20, metricService.sum("statistics")); + assertEquals(1, metricService.count("raw_data_operator")); + assertEquals(30, metricService.sum("raw_data_operator")); + + context.recordAggregationCostToMetric(); + + assertEquals(1, metricService.count("raw_data")); + assertEquals(1, metricService.count("statistics")); + assertEquals(1, metricService.count("raw_data_operator")); + } finally { + metricSet.unbindFrom(metricService); + instanceNotificationExecutor.shutdown(); + } + } + + private static FragmentInstanceContext newFragmentInstanceContext( + ExecutorService instanceNotificationExecutor) { + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(new QueryId("test_query"), 0), "0"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + return createFragmentInstanceContext(instanceId, stateMachine); + } + + private static class RecordingMetricService extends DoNothingMetricService { + private final Map<String, RecordingTimer> timers = new HashMap<>(); + + @Override + public Timer getOrCreateTimer(String metric, MetricLevel metricLevel, String... tags) { + if (!Metric.AGGREGATION.toString().equals(metric) + || tags.length != 2 + || !Tag.FROM.toString().equals(tags[0])) { + return super.getOrCreateTimer(metric, metricLevel, tags); + } + return timers.computeIfAbsent(tags[1], key -> new RecordingTimer()); + } + + @Override + public void remove(MetricType type, String metric, String... tags) { + if (Metric.AGGREGATION.toString().equals(metric) + && tags.length == 2 + && Tag.FROM.toString().equals(tags[0])) { + timers.remove(tags[1]); + } + } + + long count(String from) { + RecordingTimer timer = timers.get(from); + return timer == null ? 0 : timer.getCount(); + } + + long sum(String from) { + RecordingTimer timer = timers.get(from); + return timer == null ? 0 : timer.sum.get(); + } + } + + private static class RecordingTimer implements Timer { + private final AtomicLong count = new AtomicLong(); + private final AtomicLong sum = new AtomicLong(); + + @Override + public void update(long duration, TimeUnit unit) { + count.incrementAndGet(); + sum.addAndGet(unit.toNanos(duration)); + } + + @Override + public HistogramSnapshot takeSnapshot() { + throw new UnsupportedOperationException(); + } + + @Override + public long getCount() { + return count.get(); + } + + @Override + public void setObjectName(ObjectName objectName) { + // no-op + } + } +}
