This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch QueryImprovement in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f9a678e4bf55d6aee8741f86352bf09566cf5551 Author: JackieTien97 <[email protected]> AuthorDate: Sat Jan 29 10:21:01 2022 +0800 Refine the lock granularity of the query --- .../org/apache/iotdb/db/metadata/MManager.java | 18 ++--- .../query/dataset/groupby/GroupByFillDataSet.java | 23 +++--- .../groupby/GroupByWithValueFilterDataSet.java | 11 +-- .../groupby/GroupByWithoutValueFilterDataSet.java | 47 ++++++------ .../db/query/executor/AggregationExecutor.java | 38 ++++----- .../iotdb/db/query/executor/FillQueryExecutor.java | 89 +++++++++++----------- .../iotdb/db/query/executor/LastQueryExecutor.java | 32 ++++---- .../db/query/executor/RawDataQueryExecutor.java | 72 ++++++++--------- .../query/timegenerator/ServerTimeGenerator.java | 3 +- 9 files changed, 169 insertions(+), 164 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index 4c66193..ad55efd 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -843,18 +843,18 @@ public class MManager { // init QueryDataSource Cache QueryResourceManager.getInstance() .initQueryDataSourceCache(processorToSeriesMap, context, null); - - allMatchedNodes = - allMatchedNodes.stream() - .sorted( - Comparator.comparingLong( - (MeasurementMNode mNode) -> MTree.getLastTimeStamp(mNode, context)) - .reversed() - .thenComparing(MNode::getFullPath)) - .collect(toList()); } finally { StorageEngine.getInstance().mergeUnLock(list); } + + allMatchedNodes = + allMatchedNodes.stream() + .sorted( + Comparator.comparingLong( + (MeasurementMNode mNode) -> MTree.getLastTimeStamp(mNode, context)) + .reversed() + .thenComparing(MNode::getFullPath)) + .collect(toList()); } catch (StorageEngineException | QueryProcessException e) { throw new MetadataException(e); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java index 23e6d0f..cf5c14c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java @@ -137,20 +137,21 @@ public class GroupByFillDataSet extends QueryDataSet { // init QueryDataSource Cache QueryResourceManager.getInstance() .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter); - for (int i = 0; i < paths.size(); i++) { - PreviousFill fill = previousFillExecutors[i]; - firstNotNullTV[i] = fill.getFillResult(); - TimeValuePair timeValuePair = firstNotNullTV[i]; - previousValue[i] = null; - previousTime[i] = Long.MAX_VALUE; - if (ascending && timeValuePair != null && timeValuePair.getValue() != null) { - previousValue[i] = timeValuePair.getValue().getValue(); - previousTime[i] = timeValuePair.getTimestamp(); - } - } } finally { StorageEngine.getInstance().mergeUnLock(lockList); } + + for (int i = 0; i < paths.size(); i++) { + PreviousFill fill = previousFillExecutors[i]; + firstNotNullTV[i] = fill.getFillResult(); + TimeValuePair timeValuePair = firstNotNullTV[i]; + previousValue[i] = null; + previousTime[i] = Long.MAX_VALUE; + if (ascending && timeValuePair != null && timeValuePair.getValue() != null) { + previousValue[i] = timeValuePair.getValue().getValue(); + previousTime[i] = timeValuePair.getTimestamp(); + } + } } private void initLastTimeArray(QueryContext context, GroupByTimeFillPlan groupByFillPlan) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java index 339ba09..0eaa004 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java @@ -105,14 +105,15 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet { // init QueryDataSource Cache QueryResourceManager.getInstance() .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter); - for (int i = 0; i < paths.size(); i++) { - PartialPath path = (PartialPath) paths.get(i); - allDataReaderList.add( - getReaderByTime(path, groupByTimePlan, dataTypes.get(i), context, null)); - } } finally { StorageEngine.getInstance().mergeUnLock(lockList); } + + for (int i = 0; i < paths.size(); i++) { + PartialPath path = (PartialPath) paths.get(i); + allDataReaderList.add( + getReaderByTime(path, groupByTimePlan, dataTypes.get(i), context, null)); + } } protected TimeGenerator getTimeGenerator(QueryContext context, RawDataQueryPlan queryPlan) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java index 5628666..65480a4 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java @@ -102,32 +102,33 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { // init QueryDataSource Cache QueryResourceManager.getInstance() .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter); - // init resultIndexes, group result indexes by path - for (int i = 0; i < paths.size(); i++) { - PartialPath path = (PartialPath) paths.get(i); - if (!pathExecutors.containsKey(path)) { - // init GroupByExecutor - pathExecutors.put( - path, - getGroupByExecutor( - path, - groupByTimePlan.getAllMeasurementsInDevice(path.getDevice()), - dataTypes.get(i), - context, - timeFilter.copy(), - null, - groupByTimePlan.isAscending())); - resultIndexes.put(path, new ArrayList<>()); - } - resultIndexes.get(path).add(i); - AggregateResult aggrResult = - AggregateResultFactory.getAggrResultByName( - groupByTimePlan.getDeduplicatedAggregations().get(i), dataTypes.get(i), ascending); - pathExecutors.get(path).addAggregateResult(aggrResult); - } } finally { StorageEngine.getInstance().mergeUnLock(lockList); } + + // init resultIndexes, group result indexes by path + for (int i = 0; i < paths.size(); i++) { + PartialPath path = (PartialPath) paths.get(i); + if (!pathExecutors.containsKey(path)) { + // init GroupByExecutor + pathExecutors.put( + path, + getGroupByExecutor( + path, + groupByTimePlan.getAllMeasurementsInDevice(path.getDevice()), + dataTypes.get(i), + context, + timeFilter.copy(), + null, + groupByTimePlan.isAscending())); + resultIndexes.put(path, new ArrayList<>()); + } + resultIndexes.get(path).add(i); + AggregateResult aggrResult = + AggregateResultFactory.getAggrResultByName( + groupByTimePlan.getDeduplicatedAggregations().get(i), dataTypes.get(i), ascending); + pathExecutors.get(path).addAggregateResult(aggrResult); + } } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java index 6ce3328..9bd6f9a 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java @@ -115,19 +115,19 @@ public class AggregationExecutor { // init QueryDataSource Cache QueryResourceManager.getInstance() .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter); - - for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) { - aggregateOneSeries( - entry, - aggregateResultList, - aggregationPlan.getAllMeasurementsInDevice(entry.getKey().getDevice()), - timeFilter, - context); - } } finally { StorageEngine.getInstance().mergeUnLock(lockList); } + for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) { + aggregateOneSeries( + entry, + aggregateResultList, + aggregationPlan.getAllMeasurementsInDevice(entry.getKey().getDevice()), + timeFilter, + context); + } + return constructDataSet(Arrays.asList(aggregateResultList), aggregationPlan); } @@ -367,20 +367,20 @@ public class AggregationExecutor { QueryResourceManager.getInstance() .initQueryDataSourceCache( processorToSeriesMap, context, timestampGenerator.getTimeFilter()); - - for (int i = 0; i < selectedSeries.size(); i++) { - PartialPath path = selectedSeries.get(i); - List<Integer> indexes = pathToAggrIndexesMap.remove(path); - if (indexes != null) { - IReaderByTimestamp seriesReaderByTimestamp = - getReaderByTime(path, queryPlan, dataTypes.get(i), context); - readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes); - } - } } finally { StorageEngine.getInstance().mergeUnLock(lockList); } + for (int i = 0; i < selectedSeries.size(); i++) { + PartialPath path = selectedSeries.get(i); + List<Integer> indexes = pathToAggrIndexesMap.remove(path); + if (indexes != null) { + IReaderByTimestamp seriesReaderByTimestamp = + getReaderByTime(path, queryPlan, dataTypes.get(i), context); + readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes); + } + } + List<AggregateResult> aggregateResults = new ArrayList<>(); for (int i = 0; i < selectedSeries.size(); i++) { AggregateResult result = diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java index 662267e..20681d6 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java @@ -89,53 +89,54 @@ public class FillQueryExecutor { // init QueryDataSource Cache QueryResourceManager.getInstance() .initQueryDataSourceCache(processorToSeriesMap, context, contructTimeFilter()); - List<TimeValuePair> timeValuePairs = getTimeValuePairs(context); - long defaultFillInterval = IoTDBDescriptor.getInstance().getConfig().getDefaultFillInterval(); - for (int i = 0; i < selectedSeries.size(); i++) { - PartialPath path = selectedSeries.get(i); - TSDataType dataType = dataTypes.get(i); - - if (timeValuePairs.get(i) != null) { - // No need to fill - record.addField(timeValuePairs.get(i).getValue().getValue(), dataType); - continue; - } + } finally { + StorageEngine.getInstance().mergeUnLock(lockList); + } - IFill fill; - if (!typeIFillMap.containsKey(dataType)) { - switch (dataType) { - case INT32: - case INT64: - case FLOAT: - case DOUBLE: - case BOOLEAN: - case TEXT: - fill = new PreviousFill(dataType, queryTime, defaultFillInterval); - break; - default: - throw new UnsupportedDataTypeException("unsupported data type " + dataType); - } - } else { - fill = typeIFillMap.get(dataType).copy(); - } - fill = - configureFill( - fill, - path, - dataType, - queryTime, - plan.getAllMeasurementsInDevice(path.getDevice()), - context); - - TimeValuePair timeValuePair = fill.getFillResult(); - if (timeValuePair == null || timeValuePair.getValue() == null) { - record.addField(null); - } else { - record.addField(timeValuePair.getValue().getValue(), dataType); + List<TimeValuePair> timeValuePairs = getTimeValuePairs(context); + long defaultFillInterval = IoTDBDescriptor.getInstance().getConfig().getDefaultFillInterval(); + for (int i = 0; i < selectedSeries.size(); i++) { + PartialPath path = selectedSeries.get(i); + TSDataType dataType = dataTypes.get(i); + + if (timeValuePairs.get(i) != null) { + // No need to fill + record.addField(timeValuePairs.get(i).getValue().getValue(), dataType); + continue; + } + + IFill fill; + if (!typeIFillMap.containsKey(dataType)) { + switch (dataType) { + case INT32: + case INT64: + case FLOAT: + case DOUBLE: + case BOOLEAN: + case TEXT: + fill = new PreviousFill(dataType, queryTime, defaultFillInterval); + break; + default: + throw new UnsupportedDataTypeException("unsupported data type " + dataType); } + } else { + fill = typeIFillMap.get(dataType).copy(); + } + fill = + configureFill( + fill, + path, + dataType, + queryTime, + plan.getAllMeasurementsInDevice(path.getDevice()), + context); + + TimeValuePair timeValuePair = fill.getFillResult(); + if (timeValuePair == null || timeValuePair.getValue() == null) { + record.addField(null); + } else { + record.addField(timeValuePair.getValue().getValue(), dataType); } - } finally { - StorageEngine.getInstance().mergeUnLock(lockList); } SingleDataSet dataSet = new SingleDataSet(selectedSeries, dataTypes); diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java index f34f9f6..dfa9ed0 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java @@ -183,26 +183,26 @@ public class LastQueryExecutor { // init QueryDataSource Cache QueryResourceManager.getInstance() .initQueryDataSourceCache(processorToSeriesMap, context, filter); - - for (int i = 0; i < nonCachedPaths.size(); i++) { - QueryDataSource dataSource = - QueryResourceManager.getInstance() - .getQueryDataSource(nonCachedPaths.get(i), context, filter); - LastPointReader lastReader = - new LastPointReader( - nonCachedPaths.get(i), - nonCachedDataTypes.get(i), - deviceMeasurementsMap.get(nonCachedPaths.get(i).getDevice()), - context, - dataSource, - Long.MAX_VALUE, - filter); - readerList.add(lastReader); - } } finally { StorageEngine.getInstance().mergeUnLock(lockList); } + for (int i = 0; i < nonCachedPaths.size(); i++) { + QueryDataSource dataSource = + QueryResourceManager.getInstance() + .getQueryDataSource(nonCachedPaths.get(i), context, filter); + LastPointReader lastReader = + new LastPointReader( + nonCachedPaths.get(i), + nonCachedDataTypes.get(i), + deviceMeasurementsMap.get(nonCachedPaths.get(i).getDevice()), + context, + dataSource, + Long.MAX_VALUE, + filter); + readerList.add(lastReader); + } + // Compute Last result for the rest series paths by scanning Tsfiles int index = 0; for (int i = 0; i < resultContainer.size(); i++) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java index 92372be..0dc5ac8 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java @@ -112,30 +112,31 @@ public class RawDataQueryExecutor { // init QueryDataSource cache QueryResourceManager.getInstance() .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter); - for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) { - PartialPath path = queryPlan.getDeduplicatedPaths().get(i); - TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i); - - QueryDataSource queryDataSource = - QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter); - timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); - - ManagedSeriesReader reader = - new SeriesRawDataBatchReader( - path, - queryPlan.getAllMeasurementsInDevice(path.getDevice()), - dataType, - context, - queryDataSource, - timeFilter, - null, - null, - queryPlan.isAscending()); - readersOfSelectedSeries.add(reader); - } } finally { StorageEngine.getInstance().mergeUnLock(lockList); } + + for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) { + PartialPath path = queryPlan.getDeduplicatedPaths().get(i); + TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i); + + QueryDataSource queryDataSource = + QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter); + timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); + + ManagedSeriesReader reader = + new SeriesRawDataBatchReader( + path, + queryPlan.getAllMeasurementsInDevice(path.getDevice()), + dataType, + context, + queryDataSource, + timeFilter, + null, + null, + queryPlan.isAscending()); + readersOfSelectedSeries.add(reader); + } return readersOfSelectedSeries; } @@ -185,23 +186,24 @@ public class RawDataQueryExecutor { // init QueryDataSource Cache QueryResourceManager.getInstance() .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter); - for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) { - if (cached.get(i)) { - readersOfSelectedSeries.add(null); - continue; - } - PartialPath path = queryPlan.getDeduplicatedPaths().get(i); - IReaderByTimestamp seriesReaderByTimestamp = - getReaderByTimestamp( - path, - queryPlan.getAllMeasurementsInDevice(path.getDevice()), - queryPlan.getDeduplicatedDataTypes().get(i), - context); - readersOfSelectedSeries.add(seriesReaderByTimestamp); - } } finally { StorageEngine.getInstance().mergeUnLock(lockList); } + + for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) { + if (cached.get(i)) { + readersOfSelectedSeries.add(null); + continue; + } + PartialPath path = queryPlan.getDeduplicatedPaths().get(i); + IReaderByTimestamp seriesReaderByTimestamp = + getReaderByTimestamp( + path, + queryPlan.getAllMeasurementsInDevice(path.getDevice()), + queryPlan.getDeduplicatedDataTypes().get(i), + context); + readersOfSelectedSeries.add(seriesReaderByTimestamp); + } return readersOfSelectedSeries; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java index 8e4acc4..287483b 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java @@ -90,11 +90,10 @@ public class ServerTimeGenerator extends TimeGenerator { // init QueryDataSource Cache QueryResourceManager.getInstance() .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter); - - operatorNode = construct(expression); } finally { StorageEngine.getInstance().mergeUnLock(lockList); } + operatorNode = construct(expression); } private Filter getPathListAndConstructTimeFilterFromExpression(
