This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch QueryLockRefine in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c4625afdc9bc5837a0b09e21c3d24567abfd96c0 Author: JackieTien97 <[email protected]> AuthorDate: Sat Jan 29 10:43:43 2022 +0800 Refine the lock granularity of the query --- .../apache/iotdb/db/metadata/tag/TagManager.java | 29 +++---- .../groupby/GroupByWithValueFilterDataSet.java | 42 ++++++----- .../groupby/GroupByWithoutValueFilterDataSet.java | 88 +++++++++++----------- .../db/query/executor/AggregationExecutor.java | 80 +++++++++++--------- .../iotdb/db/query/executor/FillQueryExecutor.java | 72 +++++++++--------- .../iotdb/db/query/executor/LastQueryExecutor.java | 39 +++++----- .../db/query/executor/RawDataQueryExecutor.java | 47 +++++++----- .../query/timegenerator/ServerTimeGenerator.java | 11 ++- 8 files changed, 228 insertions(+), 180 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java index cb0da36..c597b72 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java @@ -172,23 +172,26 @@ public class TagManager { Map<VirtualStorageGroupProcessor, List<PartialPath>> processorToSeriesMap = lockListAndProcessorToSeriesMapPair.right; - // init QueryDataSource cache - QueryResourceManager.getInstance() - .initQueryDataSourceCache(processorToSeriesMap, context, null); - try { - allMatchedNodes = - allMatchedNodes.stream() - .sorted( - Comparator.comparingLong( - (IMeasurementMNode mNode) -> - LastCacheManager.getLastTimeStamp(mNode, context)) - .reversed() - .thenComparing(IMNode::getFullPath)) - .collect(toList()); + // init QueryDataSource cache + QueryResourceManager.getInstance() + .initQueryDataSourceCache(processorToSeriesMap, context, null); + } catch (Exception e) { + logger.error("Meet error when init QueryDataSource ", e); + throw new QueryProcessException("Meet error when init QueryDataSource.", e); } finally { StorageEngine.getInstance().mergeUnLock(list); } + + allMatchedNodes = + allMatchedNodes.stream() + .sorted( + Comparator.comparingLong( + (IMeasurementMNode mNode) -> + LastCacheManager.getLastTimeStamp(mNode, context)) + .reversed() + .thenComparing(IMNode::getFullPath)) + .collect(toList()); } catch (StorageEngineException | QueryProcessException e) { throw new MetadataException(e); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java index feee017..c90fbd9 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java @@ -47,6 +47,9 @@ import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory; import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; import org.apache.iotdb.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -58,6 +61,8 @@ import java.util.Map.Entry; public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet { + private static final Logger logger = LoggerFactory.getLogger(GroupByWithValueFilterDataSet.class); + private Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap; protected GroupByTimePlan groupByTimePlan; @@ -122,27 +127,30 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet { // init QueryDataSource Cache QueryResourceManager.getInstance() .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter); - // init non-aligned series reader - for (PartialPath path : pathToAggrIndexesMap.keySet()) { - IReaderByTimestamp seriesReaderByTimestamp = - getReaderByTime(path, groupByTimePlan, context); - readerToAggrIndexesMap.put( - seriesReaderByTimestamp, Collections.singletonList(pathToAggrIndexesMap.get(path))); - } - // init aligned series reader - for (PartialPath alignedPath : alignedPathToAggrIndexesMap.keySet()) { - IReaderByTimestamp seriesReaderByTimestamp = - getReaderByTime(alignedPath, groupByTimePlan, context); - readerToAggrIndexesMap.put( - seriesReaderByTimestamp, alignedPathToAggrIndexesMap.get(alignedPath)); - } + } catch (Exception e) { + logger.error("Meet error when init QueryDataSource ", e); + throw new QueryProcessException("Meet error when init QueryDataSource.", e); } finally { StorageEngine.getInstance().mergeUnLock(lockList); + } - // assign null to be friendly for GC - pathToAggrIndexesMap = null; - alignedPathToAggrIndexesMap = null; + // init non-aligned series reader + for (PartialPath path : pathToAggrIndexesMap.keySet()) { + IReaderByTimestamp seriesReaderByTimestamp = getReaderByTime(path, groupByTimePlan, context); + readerToAggrIndexesMap.put( + seriesReaderByTimestamp, Collections.singletonList(pathToAggrIndexesMap.get(path))); + } + // assign null to be friendly for GC + pathToAggrIndexesMap = null; + // init aligned series reader + for (PartialPath alignedPath : alignedPathToAggrIndexesMap.keySet()) { + IReaderByTimestamp seriesReaderByTimestamp = + getReaderByTime(alignedPath, groupByTimePlan, context); + readerToAggrIndexesMap.put( + seriesReaderByTimestamp, alignedPathToAggrIndexesMap.get(alignedPath)); } + // assign null to be friendly for GC + alignedPathToAggrIndexesMap = null; } protected TimeGenerator getTimeGenerator(QueryContext context, RawDataQueryPlan queryPlan) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java index 122aca6..cfec005 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java @@ -120,54 +120,58 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { // init QueryDataSource Cache QueryResourceManager.getInstance() .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter); - // init GroupByExecutor for non-aligned series - for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) { - MeasurementPath path = (MeasurementPath) entry.getKey(); - List<Integer> indexes = entry.getValue(); - if (!pathExecutors.containsKey(path)) { - pathExecutors.put( - path, - getGroupByExecutor( - path, - groupByTimePlan.getAllMeasurementsInDevice(path.getDevice()), - context, - timeFilter.copy(), - null, - ascending)); - } - for (int index : indexes) { + } catch (Exception e) { + logger.error("Meet error when init QueryDataSource ", e); + throw new QueryProcessException("Meet error when init QueryDataSource.", e); + } finally { + StorageEngine.getInstance().mergeUnLock(lockList); + } + + // init GroupByExecutor for non-aligned series + for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) { + MeasurementPath path = (MeasurementPath) entry.getKey(); + List<Integer> indexes = entry.getValue(); + if (!pathExecutors.containsKey(path)) { + pathExecutors.put( + path, + getGroupByExecutor( + path, + groupByTimePlan.getAllMeasurementsInDevice(path.getDevice()), + context, + timeFilter.copy(), + null, + ascending)); + } + for (int index : indexes) { + AggregateResult aggrResult = + AggregateResultFactory.getAggrResultByName( + groupByTimePlan.getDeduplicatedAggregations().get(index), + path.getSeriesType(), + ascending); + pathExecutors.get(path).addAggregateResult(aggrResult); + } + } + // init GroupByExecutor for aligned series + for (Map.Entry<AlignedPath, List<List<Integer>>> entry : + alignedPathToAggrIndexesMap.entrySet()) { + AlignedPath path = entry.getKey(); + List<List<Integer>> indexesList = entry.getValue(); + if (!alignedPathExecutors.containsKey(path)) { + alignedPathExecutors.put( + path, getAlignedGroupByExecutor(path, context, timeFilter.copy(), null, ascending)); + } + for (int i = 0; i < path.getMeasurementList().size(); i++) { + List<AggregateResult> aggrResultList = new ArrayList<>(); + for (int index : indexesList.get(i)) { AggregateResult aggrResult = AggregateResultFactory.getAggrResultByName( groupByTimePlan.getDeduplicatedAggregations().get(index), - path.getSeriesType(), + path.getSchemaList().get(i).getType(), ascending); - pathExecutors.get(path).addAggregateResult(aggrResult); + aggrResultList.add(aggrResult); } + alignedPathExecutors.get(path).addAggregateResult(aggrResultList); } - // init GroupByExecutor for aligned series - for (Map.Entry<AlignedPath, List<List<Integer>>> entry : - alignedPathToAggrIndexesMap.entrySet()) { - AlignedPath path = entry.getKey(); - List<List<Integer>> indexesList = entry.getValue(); - if (!alignedPathExecutors.containsKey(path)) { - alignedPathExecutors.put( - path, getAlignedGroupByExecutor(path, context, timeFilter.copy(), null, ascending)); - } - for (int i = 0; i < path.getMeasurementList().size(); i++) { - List<AggregateResult> aggrResultList = new ArrayList<>(); - for (int index : indexesList.get(i)) { - AggregateResult aggrResult = - AggregateResultFactory.getAggrResultByName( - groupByTimePlan.getDeduplicatedAggregations().get(index), - path.getSchemaList().get(i).getType(), - ascending); - aggrResultList.add(aggrResult); - } - alignedPathExecutors.get(path).addAggregateResult(aggrResultList); - } - } - } finally { - StorageEngine.getInstance().mergeUnLock(lockList); } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java index 8d7bbf9..2f4e65e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java @@ -59,6 +59,9 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; import org.apache.iotdb.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -74,6 +77,8 @@ import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenera @SuppressWarnings("java:S1135") // ignore todos public class AggregationExecutor { + private static final Logger logger = LoggerFactory.getLogger(AggregationExecutor.class); + private List<PartialPath> selectedSeries; protected List<TSDataType> dataTypes; protected List<String> aggregations; @@ -132,28 +137,31 @@ public class AggregationExecutor { // init QueryDataSource Cache QueryResourceManager.getInstance() .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter); - - for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) { - PartialPath seriesPath = entry.getKey(); - aggregateOneSeries( - seriesPath, - entry.getValue(), - aggregationPlan.getAllMeasurementsInDevice(seriesPath.getDevice()), - timeFilter); - } - for (Map.Entry<AlignedPath, List<List<Integer>>> entry : - alignedPathToAggrIndexesMap.entrySet()) { - AlignedPath alignedPath = entry.getKey(); - aggregateOneAlignedSeries( - alignedPath, - entry.getValue(), - aggregationPlan.getAllMeasurementsInDevice(alignedPath.getDevice()), - timeFilter); - } + } catch (Exception e) { + logger.error("Meet error when init QueryDataSource ", e); + throw new QueryProcessException("Meet error when init QueryDataSource.", e); } finally { StorageEngine.getInstance().mergeUnLock(lockList); } + for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) { + PartialPath seriesPath = entry.getKey(); + aggregateOneSeries( + seriesPath, + entry.getValue(), + aggregationPlan.getAllMeasurementsInDevice(seriesPath.getDevice()), + timeFilter); + } + for (Map.Entry<AlignedPath, List<List<Integer>>> entry : + alignedPathToAggrIndexesMap.entrySet()) { + AlignedPath alignedPath = entry.getKey(); + aggregateOneAlignedSeries( + alignedPath, + entry.getValue(), + aggregationPlan.getAllMeasurementsInDevice(alignedPath.getDevice()), + timeFilter); + } + return constructDataSet(Arrays.asList(aggregateResultList), aggregationPlan); } @@ -639,26 +647,30 @@ public class AggregationExecutor { QueryResourceManager.getInstance() .initQueryDataSourceCache( processorToSeriesMap, context, timestampGenerator.getTimeFilter()); - - for (PartialPath path : pathToAggrIndexesMap.keySet()) { - IReaderByTimestamp seriesReaderByTimestamp = - getReaderByTime(path, queryPlan, path.getSeriesType(), context); - readerToAggrIndexesMap.put( - seriesReaderByTimestamp, Collections.singletonList(pathToAggrIndexesMap.get(path))); - } - // assign null to be friendly for GC - pathToAggrIndexesMap = null; - for (AlignedPath vectorPath : alignedPathToAggrIndexesMap.keySet()) { - IReaderByTimestamp seriesReaderByTimestamp = - getReaderByTime(vectorPath, queryPlan, vectorPath.getSeriesType(), context); - readerToAggrIndexesMap.put( - seriesReaderByTimestamp, alignedPathToAggrIndexesMap.get(vectorPath)); - } - alignedPathToAggrIndexesMap = null; + } catch (Exception e) { + logger.error("Meet error when init QueryDataSource ", e); + throw new QueryProcessException("Meet error when init QueryDataSource.", e); } finally { StorageEngine.getInstance().mergeUnLock(lockList); } + for (PartialPath path : pathToAggrIndexesMap.keySet()) { + IReaderByTimestamp seriesReaderByTimestamp = + getReaderByTime(path, queryPlan, path.getSeriesType(), context); + readerToAggrIndexesMap.put( + seriesReaderByTimestamp, Collections.singletonList(pathToAggrIndexesMap.get(path))); + } + // assign null to be friendly for GC + pathToAggrIndexesMap = null; + for (AlignedPath vectorPath : alignedPathToAggrIndexesMap.keySet()) { + IReaderByTimestamp seriesReaderByTimestamp = + getReaderByTime(vectorPath, queryPlan, vectorPath.getSeriesType(), context); + readerToAggrIndexesMap.put( + seriesReaderByTimestamp, alignedPathToAggrIndexesMap.get(vectorPath)); + } + // assign null to be friendly for GC + alignedPathToAggrIndexesMap = null; + for (int i = 0; i < selectedSeries.size(); i++) { aggregateResultList[i] = AggregateResultFactory.getAggrResultByName( diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java index ae4eefb..e2c3e60 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java @@ -101,46 +101,50 @@ public class FillQueryExecutor { // init QueryDataSource Cache QueryResourceManager.getInstance() .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter); - List<TimeValuePair> timeValuePairs = getTimeValuePairs(context); - for (int i = 0; i < selectedSeries.size(); i++) { - TSDataType dataType = dataTypes.get(i); + } catch (Exception e) { + logger.error("Meet error when init QueryDataSource ", e); + throw new QueryProcessException("Meet error when init QueryDataSource.", e); + } finally { + StorageEngine.getInstance().mergeUnLock(lockList); + } - if (timeValuePairs.get(i) != null) { - // No need to fill - record.addField(timeValuePairs.get(i).getValue().getValue(), dataType); - continue; - } + List<TimeValuePair> timeValuePairs = getTimeValuePairs(context); + for (int i = 0; i < selectedSeries.size(); i++) { + TSDataType dataType = dataTypes.get(i); - IFill fill = fillExecutors[i]; + if (timeValuePairs.get(i) != null) { + // No need to fill + record.addField(timeValuePairs.get(i).getValue().getValue(), dataType); + continue; + } - if (fill instanceof LinearFill - && (dataType == TSDataType.VECTOR - || dataType == TSDataType.BOOLEAN - || dataType == TSDataType.TEXT)) { - record.addField(null); - logger.info("Linear fill doesn't support the " + i + "-th column in SQL."); - continue; - } + IFill fill = fillExecutors[i]; - TimeValuePair timeValuePair; - try { - timeValuePair = fill.getFillResult(); - if (timeValuePair == null && fill instanceof ValueFill) { - timeValuePair = ((ValueFill) fill).getSpecifiedFillResult(dataType); - } - } catch (QueryProcessException | NumberFormatException ignored) { - record.addField(null); - logger.info("Value fill doesn't support the " + i + "-th column in SQL."); - continue; - } - if (timeValuePair == null || timeValuePair.getValue() == null) { - record.addField(null); - } else { - record.addField(timeValuePair.getValue().getValue(), dataType); + if (fill instanceof LinearFill + && (dataType == TSDataType.VECTOR + || dataType == TSDataType.BOOLEAN + || dataType == TSDataType.TEXT)) { + record.addField(null); + logger.info("Linear fill doesn't support the " + i + "-th column in SQL."); + continue; + } + + TimeValuePair timeValuePair; + try { + timeValuePair = fill.getFillResult(); + if (timeValuePair == null && fill instanceof ValueFill) { + timeValuePair = ((ValueFill) fill).getSpecifiedFillResult(dataType); } + } catch (QueryProcessException | NumberFormatException ignored) { + record.addField(null); + logger.info("Value fill doesn't support the " + i + "-th column in SQL."); + continue; + } + if (timeValuePair == null || timeValuePair.getValue() == null) { + record.addField(null); + } else { + record.addField(timeValuePair.getValue().getValue(), dataType); } - } finally { - StorageEngine.getInstance().mergeUnLock(lockList); } SingleDataSet dataSet = new SingleDataSet(selectedSeries, dataTypes); diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java index 39128a6..ef28a6d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java @@ -188,28 +188,31 @@ public class LastQueryExecutor { // init QueryDataSource Cache QueryResourceManager.getInstance() .initQueryDataSourceCache(processorToSeriesMap, context, filter); - - for (int i = 0; i < nonCachedPaths.size(); i++) { - QueryDataSource dataSource = - QueryResourceManager.getInstance() - .getQueryDataSource(nonCachedPaths.get(i), context, filter, ascending); - LastPointReader lastReader = - nonCachedPaths - .get(i) - .createLastPointReader( - nonCachedDataTypes.get(i), - deviceMeasurementsMap.getOrDefault( - nonCachedPaths.get(i).getDevice(), new HashSet<>()), - context, - dataSource, - Long.MAX_VALUE, - filter); - readerList.add(lastReader); - } + } catch (Exception e) { + logger.error("Meet error when init QueryDataSource ", e); + throw new QueryProcessException("Meet error when init QueryDataSource.", e); } finally { StorageEngine.getInstance().mergeUnLock(lockList); } + for (int i = 0; i < nonCachedPaths.size(); i++) { + QueryDataSource dataSource = + QueryResourceManager.getInstance() + .getQueryDataSource(nonCachedPaths.get(i), context, filter, ascending); + LastPointReader lastReader = + nonCachedPaths + .get(i) + .createLastPointReader( + nonCachedDataTypes.get(i), + deviceMeasurementsMap.getOrDefault( + nonCachedPaths.get(i).getDevice(), new HashSet<>()), + context, + dataSource, + Long.MAX_VALUE, + filter); + readerList.add(lastReader); + } + // Compute Last result for the rest series paths by scanning Tsfiles int index = 0; for (int i = 0; i < resultContainer.size(); i++) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java index 7cb7c73..984234f 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java @@ -114,11 +114,17 @@ public class RawDataQueryExecutor { lockListAndProcessorToSeriesMapPair.right; try { - // init QueryDataSource cache QueryResourceManager.getInstance() .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter); + } catch (Exception e) { + logger.error("Meet error when init QueryDataSource ", e); + throw new QueryProcessException("Meet error when init QueryDataSource.", e); + } finally { + StorageEngine.getInstance().mergeUnLock(lockList); + } + try { List<PartialPath> paths = queryPlan.getDeduplicatedPaths(); for (PartialPath path : paths) { TSDataType dataType = path.getSeriesType(); @@ -142,11 +148,10 @@ public class RawDataQueryExecutor { readersOfSelectedSeries.add(reader); } } catch (Exception e) { - logger.error("Meet error when init series reader ", e); - throw new QueryProcessException("Meet error when init series reader.", e); - } finally { - StorageEngine.getInstance().mergeUnLock(lockList); + logger.error("Meet error when init series reader ", e); + throw new QueryProcessException("Meet error when init series reader .", e); } + return readersOfSelectedSeries; } @@ -255,24 +260,26 @@ public class RawDataQueryExecutor { // init QueryDataSource Cache QueryResourceManager.getInstance() .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter); - - for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) { - if (cached.get(i)) { - readersOfSelectedSeries.add(null); - continue; - } - PartialPath path = queryPlan.getDeduplicatedPaths().get(i); - IReaderByTimestamp seriesReaderByTimestamp = - getReaderByTimestamp( - path, - queryPlan.getAllMeasurementsInDevice(path.getDevice()), - queryPlan.getDeduplicatedDataTypes().get(i), - context); - readersOfSelectedSeries.add(seriesReaderByTimestamp); - } + } catch (Exception e) { + logger.error("Meet error when init QueryDataSource ", e); + throw new QueryProcessException("Meet error when init QueryDataSource.", e); } finally { StorageEngine.getInstance().mergeUnLock(lockList); } + for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) { + if (cached.get(i)) { + readersOfSelectedSeries.add(null); + continue; + } + PartialPath path = queryPlan.getDeduplicatedPaths().get(i); + IReaderByTimestamp seriesReaderByTimestamp = + getReaderByTimestamp( + path, + queryPlan.getAllMeasurementsInDevice(path.getDevice()), + queryPlan.getDeduplicatedDataTypes().get(i), + context); + readersOfSelectedSeries.add(seriesReaderByTimestamp); + } return new Pair<>(readersOfSelectedSeries, readerToIndexList); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java index 55fc9b7..8a52b5b 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java @@ -43,6 +43,9 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; import org.apache.iotdb.tsfile.read.reader.IBatchReader; import org.apache.iotdb.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -54,6 +57,8 @@ import java.util.Map; */ public class ServerTimeGenerator extends TimeGenerator { + private static final Logger logger = LoggerFactory.getLogger(ServerTimeGenerator.class); + protected QueryContext context; protected RawDataQueryPlan queryPlan; @@ -90,11 +95,13 @@ public class ServerTimeGenerator extends TimeGenerator { // init QueryDataSource Cache QueryResourceManager.getInstance() .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter); - - operatorNode = construct(expression); + } catch (Exception e) { + logger.error("Meet error when init QueryDataSource ", e); + throw new QueryProcessException("Meet error when init QueryDataSource.", e); } finally { StorageEngine.getInstance().mergeUnLock(lockList); } + operatorNode = construct(expression); } /**
