This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch QueryMetrics in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d15fbff6c7475f36019a5c9dfa37a1eb3b314590 Author: JackieTien97 <[email protected]> AuthorDate: Mon Nov 7 14:45:17 2022 +0800 Add statistics for query --- .../fragment/FragmentInstanceContext.java | 10 ++++- .../fragment/FragmentInstanceManager.java | 7 ++++ .../db/mpp/execution/operator/OperatorContext.java | 11 ++++- .../operator/process/FilterAndProjectOperator.java | 34 +++++++++------ .../process/RawDataAggregationOperator.java | 11 ++++- .../process/SingleInputAggregationOperator.java | 23 ++++++++--- .../process/SlidingWindowAggregationOperator.java | 13 +++++- .../AbstractSeriesAggregationScanOperator.java | 42 ++++++++++--------- .../operator/source/AlignedSeriesScanOperator.java | 6 ++- .../operator/source/SeriesScanOperator.java | 6 ++- .../execution/operator/source/SeriesScanUtil.java | 3 ++ .../iotdb/db/mpp/statistics/QueryStatistics.java | 16 ++++++++ .../apache/iotdb/db/utils/QueryDataSetUtils.java | 48 +++++++++++++--------- 13 files changed, 167 insertions(+), 63 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java index 286623529f..6ecb01df2f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.common.SessionInfo; import org.apache.iotdb.db.mpp.execution.driver.DriverContext; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.db.query.context.QueryContext; import org.slf4j.Logger; @@ -42,6 +43,9 @@ public class FragmentInstanceContext extends QueryContext { private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceContext.class); private static final long END_TIME_INITIAL_VALUE = -1L; + + private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance(); + private final FragmentInstanceId id; // TODO if we split one fragment instance into multiple pipelines to run, we need to replace it @@ -157,7 +161,7 @@ public class FragmentInstanceContext extends QueryContext { } OperatorContext operatorContext = - new OperatorContext(operatorId, planNodeId, operatorType, this); + new OperatorContext(operatorId, planNodeId, operatorType, this, QUERY_STATISTICS); operatorContexts.add(operatorContext); return operatorContext; } @@ -223,4 +227,8 @@ public class FragmentInstanceContext extends QueryContext { public SessionInfo getSessionInfo() { return sessionInfo; } + + public void addOperationTime(String key, long costTimeInNanos) { + QUERY_STATISTICS.addCost(key, costTimeInNanos); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java index 1d7499560f..318476e43d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler; import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler; import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanner; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.db.utils.SetThreadName; import io.airlift.stats.CounterStat; @@ -47,6 +48,7 @@ import java.util.concurrent.TimeoutException; import static java.util.Objects.requireNonNull; import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceExecution.createFragmentInstanceExecution; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.LOCAL_EXECUTION_PLANNER; public class FragmentInstanceManager { @@ -68,6 +70,8 @@ public class FragmentInstanceManager { private static final long QUERY_TIMEOUT_MS = IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(); + private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance(); + public static FragmentInstanceManager getInstance() { return FragmentInstanceManager.InstanceHolder.INSTANCE; } @@ -96,6 +100,7 @@ public class FragmentInstanceManager { FragmentInstance instance, DataRegion dataRegion) { FragmentInstanceId instanceId = instance.getId(); + long startTime = System.nanoTime(); try (SetThreadName fragmentInstanceName = new SetThreadName(instanceId.getFullId())) { FragmentInstanceExecution execution = instanceExecution.computeIfAbsent( @@ -135,6 +140,8 @@ public class FragmentInstanceManager { }); return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId); + } finally { + QUERY_STATISTICS.addCost(LOCAL_EXECUTION_PLANNER, System.nanoTime() - startTime); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java index 447066f4c8..bc4a718422 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/OperatorContext.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator; import org.apache.iotdb.db.mpp.common.SessionInfo; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import io.airlift.units.Duration; @@ -38,17 +39,21 @@ public class OperatorContext { private final String operatorType; private final FragmentInstanceContext instanceContext; + private final QueryStatistics queryStatistics; + private Duration maxRunTime; public OperatorContext( int operatorId, PlanNodeId planNodeId, String operatorType, - FragmentInstanceContext instanceContext) { + FragmentInstanceContext instanceContext, + QueryStatistics queryStatistics) { this.operatorId = operatorId; this.planNodeId = planNodeId; this.operatorType = operatorType; this.instanceContext = instanceContext; + this.queryStatistics = queryStatistics; } public int getOperatorId() { @@ -75,6 +80,10 @@ public class OperatorContext { return instanceContext.getSessionInfo(); } + public void addOperatorTime(String key, long costTimeInNanos) { + queryStatistics.addCost(key, costTimeInNanos); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java index eed0895a47..4c1ba15a47 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java @@ -43,19 +43,21 @@ import com.google.common.util.concurrent.ListenableFuture; import java.util.ArrayList; import java.util.List; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.FILTER_AND_PROJECT_OPERATOR; + public class FilterAndProjectOperator implements ProcessOperator { private final Operator inputOperator; - private List<LeafColumnTransformer> filterLeafColumnTransformerList; + private final List<LeafColumnTransformer> filterLeafColumnTransformerList; - private ColumnTransformer filterOutputTransformer; + private final ColumnTransformer filterOutputTransformer; - private List<ColumnTransformer> commonTransformerList; + private final List<ColumnTransformer> commonTransformerList; - private List<LeafColumnTransformer> projectLeafColumnTransformerList; + private final List<LeafColumnTransformer> projectLeafColumnTransformerList; - private List<ColumnTransformer> projectOutputTransformerList; + private final List<ColumnTransformer> projectOutputTransformerList; private final TsBlockBuilder filterTsBlockBuilder; @@ -101,17 +103,23 @@ public class FilterAndProjectOperator implements ProcessOperator { return null; } - if (!hasFilter) { - return getTransformedTsBlock(input); - } + long startTime = System.nanoTime(); - TsBlock filterResult = getFilterTsBlock(input); + try { + if (!hasFilter) { + return getTransformedTsBlock(input); + } + + TsBlock filterResult = getFilterTsBlock(input); - // contains non-mappable udf, we leave calculation for TransformOperator - if (hasNonMappableUDF) { - return filterResult; + // contains non-mappable udf, we leave calculation for TransformOperator + if (hasNonMappableUDF) { + return filterResult; + } + return getTransformedTsBlock(filterResult); + } finally { + operatorContext.addOperatorTime(FILTER_AND_PROJECT_OPERATOR, System.nanoTime() - startTime); } - return getTransformedTsBlock(filterResult); } /** diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java index 30488c18d3..bce16f89d5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java @@ -68,7 +68,11 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator { @Override protected boolean calculateNextAggregationResult() { + long startTime = System.nanoTime(); + while (!calculateFromRawData()) { + long endTime = System.nanoTime(); + costTime += (endTime - startTime); inputTsBlock = null; // NOTE: child.next() can only be invoked once @@ -79,19 +83,24 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator { // if child still has next but can't be invoked now return false; } else { + startTime = System.nanoTime(); // If there are no points belong to last window, the last window will not // initialize window and aggregators if (!windowManager.isCurWindowInit()) { initWindowAndAggregators(); } + endTime = System.nanoTime(); + costTime += (endTime - startTime); + startTime = endTime; break; } + startTime = System.nanoTime(); } updateResultTsBlock(); // Step into next window windowManager.next(); - + costTime += (System.nanoTime() - startTime); return true; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java index 16071aea1e..894673d577 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java @@ -33,6 +33,8 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SINGLE_INPUT_AGG_OPERATOR; + public abstract class SingleInputAggregationOperator implements ProcessOperator { protected final OperatorContext operatorContext; @@ -50,6 +52,8 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator protected final long maxRetainedSize; protected final long maxReturnSize; + protected long costTime = 0L; + public SingleInputAggregationOperator( OperatorContext operatorContext, List<Aggregator> aggregators, @@ -97,12 +101,19 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator } } - if (resultTsBlockBuilder.getPositionCount() > 0) { - TsBlock resultTsBlock = resultTsBlockBuilder.build(); - resultTsBlockBuilder.reset(); - return resultTsBlock; - } else { - return null; + start = System.nanoTime(); + try { + if (resultTsBlockBuilder.getPositionCount() > 0) { + TsBlock resultTsBlock = resultTsBlockBuilder.build(); + resultTsBlockBuilder.reset(); + return resultTsBlock; + } else { + return null; + } + } finally { + operatorContext.addOperatorTime( + SINGLE_INPUT_AGG_OPERATOR, costTime + (System.nanoTime() - start)); + costTime = 0; } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java index 76499849cb..8a8ec2c813 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java @@ -65,6 +65,7 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper @Override protected boolean calculateNextAggregationResult() { + long startTime = System.nanoTime(); if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) { // move to next time window curTimeRange = timeRangeIterator.nextTimeRange(); @@ -75,7 +76,12 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper } } + long endTime = System.nanoTime(); + costTime += (endTime - startTime); + startTime = endTime; + while (!isCalculationDone()) { + costTime += (System.nanoTime() - startTime); if (inputTsBlock == null) { // NOTE: child.next() can only be invoked once if (child.hasNext() && canCallNext) { @@ -85,15 +91,20 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper // if child still has next but can't be invoked now return false; } else { + startTime = System.nanoTime(); break; } } - + startTime = System.nanoTime(); calculateFromCachedData(); + endTime = System.nanoTime(); + costTime += (endTime - startTime); + startTime = endTime; } // update result using aggregators updateResultTsBlock(); + costTime += (System.nanoTime() - startTime); return true; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java index 15ad856245..c565842d66 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java @@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.AGG_SCAN_OPERATOR; public abstract class AbstractSeriesAggregationScanOperator implements DataSourceOperator { @@ -141,28 +142,31 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc // start stopwatch long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); long start = System.nanoTime(); + try { + while (System.nanoTime() - start < maxRuntime + && timeRangeIterator.hasNextTimeRange() + && !resultTsBlockBuilder.isFull()) { + // move to next time window + curTimeRange = timeRangeIterator.nextTimeRange(); + + // clear previous aggregation result + for (Aggregator aggregator : aggregators) { + aggregator.updateTimeRange(curTimeRange); + } - while (System.nanoTime() - start < maxRuntime - && timeRangeIterator.hasNextTimeRange() - && !resultTsBlockBuilder.isFull()) { - // move to next time window - curTimeRange = timeRangeIterator.nextTimeRange(); - - // clear previous aggregation result - for (Aggregator aggregator : aggregators) { - aggregator.updateTimeRange(curTimeRange); + // calculate aggregation result on current time window + calculateNextAggregationResult(); } - // calculate aggregation result on current time window - calculateNextAggregationResult(); - } - - if (resultTsBlockBuilder.getPositionCount() > 0) { - TsBlock resultTsBlock = resultTsBlockBuilder.build(); - resultTsBlockBuilder.reset(); - return resultTsBlock; - } else { - return null; + if (resultTsBlockBuilder.getPositionCount() > 0) { + TsBlock resultTsBlock = resultTsBlockBuilder.build(); + resultTsBlockBuilder.reset(); + return resultTsBlock; + } else { + return null; + } + } finally { + operatorContext.addOperatorTime(AGG_SCAN_OPERATOR, System.nanoTime() - start); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java index 8406437802..e22d7b1f65 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java @@ -29,6 +29,8 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import java.io.IOException; import java.util.HashSet; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.ALIGNED_SERIES_SCAN_OPERATOR; + public class AlignedSeriesScanOperator implements DataSourceOperator { private final OperatorContext operatorContext; @@ -81,7 +83,7 @@ public class AlignedSeriesScanOperator implements DataSourceOperator { @Override public boolean hasNext() { - + long startTime = System.nanoTime(); try { if (hasCachedTsBlock) { return true; @@ -115,6 +117,8 @@ public class AlignedSeriesScanOperator implements DataSourceOperator { return hasCachedTsBlock; } catch (IOException e) { throw new RuntimeException("Error happened while scanning the file", e); + } finally { + operatorContext.addOperatorTime(ALIGNED_SERIES_SCAN_OPERATOR, System.nanoTime() - startTime); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java index 05685f758d..a993230378 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java @@ -30,6 +30,8 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import java.io.IOException; import java.util.Set; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SERIES_SCAN_OPERATOR; + public class SeriesScanOperator implements DataSourceOperator { private final OperatorContext operatorContext; @@ -82,7 +84,7 @@ public class SeriesScanOperator implements DataSourceOperator { @Override public boolean hasNext() { - + long startTime = System.nanoTime(); try { if (hasCachedTsBlock) { return true; @@ -116,6 +118,8 @@ public class SeriesScanOperator implements DataSourceOperator { return hasCachedTsBlock; } catch (IOException e) { throw new RuntimeException("Error happened while scanning the file", e); + } finally { + operatorContext.addOperatorTime(SERIES_SCAN_OPERATOR, System.nanoTime() - startTime); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java index 22fd2cbabc..160a794230 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java @@ -58,6 +58,7 @@ import java.util.function.ToLongFunction; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.PAGE_READER; public class SeriesScanUtil { private final FragmentInstanceContext context; @@ -1124,10 +1125,12 @@ public class SeriesScanUtil { } TsBlock getAllSatisfiedPageData(boolean ascending) throws IOException { + long startTime = System.nanoTime(); TsBlock tsBlock = data.getAllSatisfiedData(); if (!ascending) { tsBlock.reverse(); } + context.addOperationTime(PAGE_READER, System.nanoTime() - startTime); return tsBlock; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java index e61e22f3ff..abe9b512bc 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java @@ -46,6 +46,22 @@ public class QueryStatistics { private final Map<String, OperationStatistic> operationStatistics = new ConcurrentHashMap<>(); + public static final String LOCAL_EXECUTION_PLANNER = "LocalExecutionPlanner"; + + public static final String QUERY_EXECUTION = "QueryExecution"; + + public static final String SERIES_SCAN_OPERATOR = "SeriesScanOperator"; + + public static final String ALIGNED_SERIES_SCAN_OPERATOR = "AlignedSeriesScanOperator"; + + public static final String AGG_SCAN_OPERATOR = "AbstractSeriesAggregationScanOperator"; + + public static final String FILTER_AND_PROJECT_OPERATOR = "FilterAndProjectOperator"; + + public static final String SINGLE_INPUT_AGG_OPERATOR = "SingleInputAggregationOperator"; + + public static final String PAGE_READER = "IPageReader"; + private QueryStatistics() { ScheduledExecutorService scheduledExecutor = IoTDBThreadPoolFactory.newScheduledThreadPool(1, "Query-Statistics-Print"); diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java index e0575880ea..a842859335 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.utils; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution; +import org.apache.iotdb.db.mpp.statistics.QueryStatistics; import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; @@ -44,11 +45,15 @@ import java.util.LinkedList; import java.util.List; import java.util.Optional; +import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.QUERY_EXECUTION; + /** TimeValuePairUtils to convert between thrift format and TsFile format. */ public class QueryDataSetUtils { private static final int FLAG = 0x01; + private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance(); + private QueryDataSetUtils() {} @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning @@ -381,27 +386,32 @@ public class QueryDataSetUtils { // To fetch required amounts of data and combine them through List public static Pair<List<ByteBuffer>, Boolean> convertQueryResultByFetchSize( IQueryExecution queryExecution, int fetchSize) throws IoTDBException { - int rowCount = 0; - List<ByteBuffer> res = new ArrayList<>(); - while (rowCount < fetchSize) { - Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult(); - if (!optionalByteBuffer.isPresent()) { - break; - } - ByteBuffer byteBuffer = optionalByteBuffer.get(); - byteBuffer.mark(); - int valueColumnCount = byteBuffer.getInt(); - for (int i = 0; i < valueColumnCount; i++) { - byteBuffer.get(); - } - int positionCount = byteBuffer.getInt(); - byteBuffer.reset(); - if (positionCount != 0) { - res.add(byteBuffer); + long startTime = System.nanoTime(); + try { + int rowCount = 0; + List<ByteBuffer> res = new ArrayList<>(); + while (rowCount < fetchSize) { + Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult(); + if (!optionalByteBuffer.isPresent()) { + break; + } + ByteBuffer byteBuffer = optionalByteBuffer.get(); + byteBuffer.mark(); + int valueColumnCount = byteBuffer.getInt(); + for (int i = 0; i < valueColumnCount; i++) { + byteBuffer.get(); + } + int positionCount = byteBuffer.getInt(); + byteBuffer.reset(); + if (positionCount != 0) { + res.add(byteBuffer); + } + rowCount += positionCount; } - rowCount += positionCount; + return new Pair<>(res, !queryExecution.hasNextResult()); + } finally { + QUERY_STATISTICS.addCost(QUERY_EXECUTION, System.nanoTime() - startTime); } - return new Pair<>(res, !queryExecution.hasNextResult()); } public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {
