This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch unifiy_query_control in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit e8efcfeac78c0b5a26738b9102f39bd570a4de8e Author: 江天 <[email protected]> AuthorDate: Mon Apr 1 11:06:07 2019 +0800 provide unified query resource control interface in QueryResourceManager. --- .../iotdb/db/qp/executor/OverflowQPExecutor.java | 15 +++-- .../iotdb/db/qp/executor/QueryProcessExecutor.java | 24 ++++---- .../iotdb/db/query/context/QueryContext.java | 17 ++++++ ...edFilePathsManager.java => JobFileManager.java} | 25 ++------ .../db/query/control/QueryDataSourceManager.java | 53 ---------------- ...TokenManager.java => QueryResourceManager.java} | 50 +++++++++++---- .../iotdb/db/query/control/QuerySession.java | 71 ---------------------- .../db/query/executor/AggregateEngineExecutor.java | 23 ++++--- .../executor/EngineExecutorWithTimeGenerator.java | 19 +++--- .../EngineExecutorWithoutTimeGenerator.java | 19 +++--- .../iotdb/db/query/executor/EngineQueryRouter.java | 48 ++++----------- .../db/query/executor/FillEngineExecutor.java | 9 ++- .../GroupByWithOnlyTimeFilterDataSetDataSet.java | 9 ++- .../GroupByWithValueFilterDataSetDataSet.java | 11 ++-- .../db/query/factory/SeriesReaderFactory.java | 7 +-- .../query/timegenerator/EngineNodeConstructor.java | 8 +-- .../query/timegenerator/EngineTimeGenerator.java | 6 +- .../org/apache/iotdb/db/rescon/package-info.java} | 53 ++++++++-------- .../java/org/apache/iotdb/db/service/IoTDB.java | 1 - .../org/apache/iotdb/db/service/TSServiceImpl.java | 15 +++-- .../engine/modification/DeletionFileNodeTest.java | 25 ++++++-- .../db/engine/modification/DeletionQueryTest.java | 16 +++-- .../db/integration/IoTDBEngineTimeGeneratorIT.java | 20 +++--- .../db/integration/IoTDBSequenceDataQueryIT.java | 24 +++++--- .../iotdb/db/integration/IoTDBSeriesReaderIT.java | 30 ++++++--- .../org/apache/iotdb/db/qp/plan/QPUpdateTest.java | 13 ++-- .../apache/iotdb/db/qp/utils/MemIntQpExecutor.java | 9 ++- .../db/query/control/FileReaderManagerTest.java | 11 ++-- ...agerTest.java => QueryResourceManagerTest.java} | 2 +- .../apache/iotdb/db/utils/EnvironmentUtils.java | 11 +++- 30 files changed, 284 insertions(+), 360 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java index b8c20d7..d477e2f 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java @@ -51,6 +51,7 @@ import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan; import org.apache.iotdb.db.qp.physical.sys.MetadataPlan; import org.apache.iotdb.db.qp.physical.sys.PropertyPlan; +import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.fill.IFill; import org.apache.iotdb.db.utils.AuthUtils; import org.apache.iotdb.db.utils.LoadDataUtils; @@ -178,24 +179,26 @@ public class OverflowQPExecutor extends QueryProcessExecutor { } @Override - public QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression) + public QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression, + QueryContext context) throws ProcessorException, FileNodeManagerException, QueryFilterOptimizationException, PathErrorException, IOException { - return queryRouter.aggregate(paths, aggres, expression); + return queryRouter.aggregate(paths, aggres, expression, context); } @Override - public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes) + public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes, + QueryContext context) throws ProcessorException, IOException, PathErrorException, FileNodeManagerException { - return queryRouter.fill(fillPaths, queryTime, fillTypes); + return queryRouter.fill(fillPaths, queryTime, fillTypes, context); } @Override public QueryDataSet groupBy(List<Path> paths, List<String> aggres, IExpression expression, - long unit, long origin, List<Pair<Long, Long>> intervals) + long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext context) throws ProcessorException, FileNodeManagerException, QueryFilterOptimizationException, PathErrorException, IOException { - return queryRouter.groupBy(paths, aggres, expression, unit, origin, intervals); + return queryRouter.groupBy(paths, aggres, expression, unit, origin, intervals, context); } @Override diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java index f456ac4..60f97f7 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java @@ -33,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan; import org.apache.iotdb.db.qp.physical.crud.GroupByPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; +import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.executor.EngineQueryRouter; import org.apache.iotdb.db.query.fill.IFill; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; @@ -56,7 +57,7 @@ public abstract class QueryProcessExecutor { * @param queryPlan QueryPlan * @return QueryDataSet */ - public QueryDataSet processQuery(QueryPlan queryPlan) + public QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context) throws IOException, FileNodeManagerException, PathErrorException, QueryFilterOptimizationException, ProcessorException { @@ -66,19 +67,20 @@ public abstract class QueryProcessExecutor { GroupByPlan groupByPlan = (GroupByPlan) queryPlan; return groupBy(groupByPlan.getPaths(), groupByPlan.getAggregations(), groupByPlan.getExpression(), groupByPlan.getUnit(), groupByPlan.getOrigin(), - groupByPlan.getIntervals()); + groupByPlan.getIntervals(), context); } if (queryPlan instanceof AggregationPlan) { return aggregate(queryPlan.getPaths(), queryPlan.getAggregations(), - ((AggregationPlan) queryPlan).getExpression()); + ((AggregationPlan) queryPlan).getExpression(), context); } if (queryPlan instanceof FillQueryPlan) { FillQueryPlan fillQueryPlan = (FillQueryPlan) queryPlan; - return fill(queryPlan.getPaths(), fillQueryPlan.getQueryTime(), fillQueryPlan.getFillType()); + return fill(queryPlan.getPaths(), fillQueryPlan.getQueryTime(), + fillQueryPlan.getFillType(), context); } - return queryRouter.query(queryExpression); + return queryRouter.query(queryExpression, context); } public abstract TSDataType getSeriesType(Path fullPath) throws PathErrorException; @@ -101,16 +103,16 @@ public abstract class QueryProcessExecutor { } public abstract QueryDataSet aggregate(List<Path> paths, List<String> aggres, - IExpression expression) throws ProcessorException, IOException, PathErrorException, - FileNodeManagerException, QueryFilterOptimizationException; + IExpression expression, QueryContext context) throws ProcessorException, IOException, + PathErrorException, FileNodeManagerException, QueryFilterOptimizationException; public abstract QueryDataSet groupBy(List<Path> paths, List<String> aggres, - IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals) - throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, - QueryFilterOptimizationException; + IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals, + QueryContext context) throws ProcessorException, IOException, PathErrorException, + FileNodeManagerException, QueryFilterOptimizationException; public abstract QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, - IFill> fillTypes) + IFill> fillTypes, QueryContext context) throws ProcessorException, IOException, PathErrorException, FileNodeManagerException; /** diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java b/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java index 2a7769a..cfb06ad 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java @@ -44,6 +44,15 @@ public class QueryContext { */ private Map<String, List<Modification>> fileModCache = new HashMap<>(); + private long jobId; + + public QueryContext() { + } + + public QueryContext(long jobId) { + this.jobId = jobId; + } + /** * Find the modifications of timeseries 'path' in 'modFile'. If they are not in the cache, read * them from 'modFile' and put then into the cache. @@ -75,4 +84,12 @@ public class QueryContext { return pathModifications; } + + public long getJobId() { + return jobId; + } + + public void setJobId(long jobId) { + this.jobId = jobId; + } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java similarity index 84% rename from iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java rename to iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java index 62d806c..84e93c6 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java @@ -27,12 +27,10 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; /** * <p> - * Singleton pattern, to manage all query tokens. Each jdbc request has an unique job id, in this jdbc request, - * OpenedFilePathsManager manage all the opened files, and store in the set of current job id. + * JobFileManager records the paths of files that every query job uses for QueryResourceManager. + * <p> */ -public class OpenedFilePathsManager { - - +public class JobFileManager { /** * Map<jobId, Set<filePaths>> @@ -40,17 +38,14 @@ public class OpenedFilePathsManager { private ConcurrentHashMap<Long, Set<String>> sealedFilePathsMap; private ConcurrentHashMap<Long, Set<String>> unsealedFilePathsMap; - private OpenedFilePathsManager() { + public JobFileManager() { sealedFilePathsMap = new ConcurrentHashMap<>(); unsealedFilePathsMap = new ConcurrentHashMap<>(); } - public static OpenedFilePathsManager getInstance() { - return OpenedFilePathsManagerHelper.INSTANCE; - } - /** - * Set job id for current request thread. When a query request is created firstly, this method must be invoked. + * Set job id for current request thread. When a query request is created firstly, + * this method must be invoked. */ public void addJobId(long jobId) { sealedFilePathsMap.computeIfAbsent(jobId, x -> new HashSet<>()); @@ -109,12 +104,4 @@ public class OpenedFilePathsManager { FileReaderManager.getInstance().increaseFileReaderReference(filePath, isSealed); } } - - private static class OpenedFilePathsManagerHelper { - private static final OpenedFilePathsManager INSTANCE = new OpenedFilePathsManager(); - - private OpenedFilePathsManagerHelper() { - - } - } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java deleted file mode 100644 index add4e19..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.query.control; - -import org.apache.iotdb.db.engine.filenode.FileNodeManager; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; -import org.apache.iotdb.db.exception.FileNodeManagerException; -import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.tsfile.read.common.Path; -import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; - -/** - * <p> - * This class is used to get query data source of a given path. See the component of - * <code>QueryDataSource</code> - */ -public class QueryDataSourceManager { - - private static FileNodeManager fileNodeManager = FileNodeManager.getInstance(); - - private QueryDataSourceManager() { - } - - public static QueryDataSource getQueryDataSource(long jobId, Path selectedPath, - QueryContext context) - throws FileNodeManagerException { - - SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(selectedPath, null); - QueryDataSource queryDataSource = fileNodeManager.query(singleSeriesExpression, context); - - // add used files to current thread request cached map - OpenedFilePathsManager.getInstance() - .addUsedFilesForGivenJob(jobId, queryDataSource); - - return queryDataSource; - } -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java similarity index 75% rename from iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java rename to iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java index 7a566e8..a535d87 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java @@ -24,8 +24,11 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.db.engine.filenode.FileNodeManager; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.FileNodeManagerException; +import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.expression.ExpressionType; import org.apache.iotdb.tsfile.read.expression.IBinaryExpression; @@ -34,13 +37,17 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; /** * <p> - * Singleton pattern, to manage all query tokens. Each jdbc query request can query multiple series, - * in the processing of querying different device id, the <code>FileNodeManager.getInstance(). - * beginQuery</code> and <code>FileNodeManager.getInstance().endQueryForGivenJob</code> must be invoked in the - * beginning and ending of jdbc request. + * QueryResourceManager manages resource (file streams) used by each query job, and assign Ids to the jobs. + * During the life cycle of a query, the following methods must be called in strict order: + * 1. assignJobId - get an Id for the new job. + * 2. beginQueryOfGivenQueryPaths - remind FileNodeManager that some files are being used + * 3. (if using filter)beginQueryOfGivenExpression + * - remind FileNodeManager that some files are being used + * 4. getQueryDataSource - open files for the job or reuse existing readers. + * 5. endQueryForGivenJob - release the resource used by this job. * </p> */ -public class QueryTokenManager { +public class QueryResourceManager { /** * Map<jobId, Map<deviceId, List<token>>>. @@ -79,22 +86,29 @@ public class QueryTokenManager { * </p> */ private ConcurrentHashMap<Long, ConcurrentHashMap<String, List<Integer>>> queryTokensMap; + private JobFileManager filePathsManager; + private AtomicLong maxJobId; - private QueryTokenManager() { + private QueryResourceManager() { queryTokensMap = new ConcurrentHashMap<>(); + filePathsManager = new JobFileManager(); + maxJobId = new AtomicLong(0); } - public static QueryTokenManager getInstance() { + public static QueryResourceManager getInstance() { return QueryTokenManagerHelper.INSTANCE; } /** - * Set job id for current request thread. When a query request is created firstly, this method + * Assign a jobId for a new query job. When a query request is created firstly, this method * must be invoked. */ - public void addJobId(long jobId) { + public long assignJobId() { + long jobId = maxJobId.incrementAndGet(); queryTokensMap.computeIfAbsent(jobId, x -> new ConcurrentHashMap<>()); + filePathsManager.addJobId(jobId); + return jobId; } /** @@ -126,6 +140,20 @@ public class QueryTokenManager { } } + public QueryDataSource getQueryDataSource(Path selectedPath, + QueryContext context) + throws FileNodeManagerException { + + SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(selectedPath, null); + QueryDataSource queryDataSource = FileNodeManager.getInstance() + .query(singleSeriesExpression, context); + + // add used files to current thread request cached map + filePathsManager.addUsedFilesForGivenJob(context.getJobId(), queryDataSource); + + return queryDataSource; + } + /** * Whenever the jdbc request is closed normally or abnormally, this method must be invoked. All * query tokens created by this jdbc request must be cleared. @@ -142,7 +170,7 @@ public class QueryTokenManager { } queryTokensMap.remove(jobId); // remove usage of opened file paths of current thread - OpenedFilePathsManager.getInstance().removeUsedFilesForGivenJob(jobId); + filePathsManager.removeUsedFilesForGivenJob(jobId); } private void getUniquePaths(IExpression expression, Set<String> deviceIdSet) { @@ -161,7 +189,7 @@ public class QueryTokenManager { private static class QueryTokenManagerHelper { - private static final QueryTokenManager INSTANCE = new QueryTokenManager(); + private static final QueryResourceManager INSTANCE = new QueryResourceManager(); private QueryTokenManagerHelper() { } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java deleted file mode 100644 index 3151745..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.query.control; - -import java.util.concurrent.atomic.AtomicLong; - -public class QuerySession { - /** - * Each jdbc request has an unique jod id, job id is stored in thread local variable jobIdContainer. - */ - private ThreadLocal<Long> jobId; - - /** - * Each unique jdbc request(query, aggregation or others job) has an unique job id. This job id - * will always be maintained until the request is closed. In each job, the unique file will be - * only opened once to avoid too many opened files error. - */ - private static AtomicLong jobIdGenerator = new AtomicLong(); - - private QuerySession() { - this.jobId = new ThreadLocal<Long>(){ - @Override - protected Long initialValue() { - super.initialValue(); - long id = jobIdGenerator.incrementAndGet(); - return id; - } - }; - } - - public long getJobId() { - return jobId.get(); - } - - public static long getCurrentThreadJobId() { - return QuerySessionHelper.INSTANCE.jobId.get(); - } - - private static class QuerySessionHelper { - - private static final QuerySession INSTANCE = new QuerySession(); - - private QuerySessionHelper() { - } - } - - public static QuerySession getCurrentThreadQuerySession() { - return QuerySessionHelper.INSTANCE; - } - - - - -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java index a21cc80..ba76c2f 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java @@ -34,8 +34,7 @@ import org.apache.iotdb.db.query.aggregation.AggregateFunction; import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc; import org.apache.iotdb.db.query.aggregation.impl.MaxTimeAggrFunc; import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.control.QueryDataSourceManager; -import org.apache.iotdb.db.query.control.QueryTokenManager; +import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.dataset.AggreResultDataPointReader; import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator; import org.apache.iotdb.db.query.factory.SeriesReaderFactory; @@ -55,7 +54,6 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; public class AggregateEngineExecutor { - private long jobId; private List<Path> selectedSeries; private List<String> aggres; private IExpression expression; @@ -68,9 +66,8 @@ public class AggregateEngineExecutor { /** * constructor. */ - public AggregateEngineExecutor(long jobId, List<Path> selectedSeries, List<String> aggres, + public AggregateEngineExecutor(List<Path> selectedSeries, List<String> aggres, IExpression expression) { - this.jobId = jobId; this.selectedSeries = selectedSeries; this.aggres = aggres; this.expression = expression; @@ -88,7 +85,8 @@ public class AggregateEngineExecutor { if (expression != null) { timeFilter = ((GlobalTimeExpression) expression).getFilter(); } - QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId, selectedSeries); + QueryResourceManager + .getInstance().beginQueryOfGivenQueryPaths(context.getJobId(), selectedSeries); List<SequenceDataReader> readersOfSequenceData = new ArrayList<>(); List<IPointReader> readersOfUnSequenceData = new ArrayList<>(); @@ -101,8 +99,8 @@ public class AggregateEngineExecutor { function.init(); aggregateFunctions.add(function); - QueryDataSource queryDataSource = QueryDataSourceManager - .getQueryDataSource(jobId, selectedSeries.get(i), context); + QueryDataSource queryDataSource = QueryResourceManager.getInstance() + .getQueryDataSource(selectedSeries.get(i), context); // sequence reader for sealed tsfile, unsealed tsfile, memory SequenceDataReader sequenceReader; @@ -258,12 +256,13 @@ public class AggregateEngineExecutor { */ public QueryDataSet executeWithTimeGenerator(QueryContext context) throws FileNodeManagerException, PathErrorException, IOException, ProcessorException { - QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId, selectedSeries); - QueryTokenManager.getInstance().beginQueryOfGivenExpression(jobId, expression); + QueryResourceManager + .getInstance().beginQueryOfGivenQueryPaths(context.getJobId(), selectedSeries); + QueryResourceManager.getInstance().beginQueryOfGivenExpression(context.getJobId(), expression); - EngineTimeGenerator timestampGenerator = new EngineTimeGenerator(jobId, expression, context); + EngineTimeGenerator timestampGenerator = new EngineTimeGenerator(expression, context); List<EngineReaderByTimeStamp> readersOfSelectedSeries = SeriesReaderFactory - .getByTimestampReadersOfSelectedPaths(jobId, selectedSeries, context); + .getByTimestampReadersOfSelectedPaths(selectedSeries, context); List<AggregateFunction> aggregateFunctions = new ArrayList<>(); for (int i = 0; i < selectedSeries.size(); i++) { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java index 77372a6..3b40187 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java @@ -26,7 +26,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.control.QueryTokenManager; +import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.dataset.EngineDataSetWithTimeGenerator; import org.apache.iotdb.db.query.factory.SeriesReaderFactory; import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; @@ -42,10 +42,8 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; public class EngineExecutorWithTimeGenerator { private QueryExpression queryExpression; - private long jobId; - EngineExecutorWithTimeGenerator(long jobId, QueryExpression queryExpression) { - this.jobId = jobId; + EngineExecutorWithTimeGenerator(QueryExpression queryExpression) { this.queryExpression = queryExpression; } @@ -58,18 +56,17 @@ public class EngineExecutorWithTimeGenerator { */ public QueryDataSet execute(QueryContext context) throws FileNodeManagerException { - QueryTokenManager.getInstance() - .beginQueryOfGivenQueryPaths(jobId, queryExpression.getSelectedSeries()); - QueryTokenManager.getInstance() - .beginQueryOfGivenExpression(jobId, queryExpression.getExpression()); + QueryResourceManager.getInstance() + .beginQueryOfGivenQueryPaths(context.getJobId(), queryExpression.getSelectedSeries()); + QueryResourceManager.getInstance() + .beginQueryOfGivenExpression(context.getJobId(), queryExpression.getExpression()); EngineTimeGenerator timestampGenerator; List<EngineReaderByTimeStamp> readersOfSelectedSeries; try { - timestampGenerator = new EngineTimeGenerator(jobId, queryExpression.getExpression(), context); + timestampGenerator = new EngineTimeGenerator(queryExpression.getExpression(), context); readersOfSelectedSeries = SeriesReaderFactory - .getByTimestampReadersOfSelectedPaths(jobId, queryExpression.getSelectedSeries(), - context); + .getByTimestampReadersOfSelectedPaths(queryExpression.getSelectedSeries(), context); } catch (IOException ex) { throw new FileNodeManagerException(ex); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java index 248c2c0..2d07796 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java @@ -27,8 +27,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.control.QueryDataSourceManager; -import org.apache.iotdb.db.query.control.QueryTokenManager; +import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator; import org.apache.iotdb.db.query.factory.SeriesReaderFactory; import org.apache.iotdb.db.query.reader.AllDataReader; @@ -48,10 +47,8 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; public class EngineExecutorWithoutTimeGenerator { private QueryExpression queryExpression; - private long jobId; - public EngineExecutorWithoutTimeGenerator(long jobId, QueryExpression queryExpression) { - this.jobId = jobId; + public EngineExecutorWithoutTimeGenerator(QueryExpression queryExpression) { this.queryExpression = queryExpression; } @@ -66,12 +63,12 @@ public class EngineExecutorWithoutTimeGenerator { List<IPointReader> readersOfSelectedSeries = new ArrayList<>(); List<TSDataType> dataTypes = new ArrayList<>(); - QueryTokenManager.getInstance() - .beginQueryOfGivenQueryPaths(jobId, queryExpression.getSelectedSeries()); + QueryResourceManager.getInstance() + .beginQueryOfGivenQueryPaths(context.getJobId(), queryExpression.getSelectedSeries()); for (Path path : queryExpression.getSelectedSeries()) { - QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path, + QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path, context); // add data type @@ -120,12 +117,12 @@ public class EngineExecutorWithoutTimeGenerator { List<IPointReader> readersOfSelectedSeries = new ArrayList<>(); List<TSDataType> dataTypes = new ArrayList<>(); - QueryTokenManager.getInstance() - .beginQueryOfGivenQueryPaths(jobId, queryExpression.getSelectedSeries()); + QueryResourceManager.getInstance() + .beginQueryOfGivenQueryPaths(context.getJobId(), queryExpression.getSelectedSeries()); for (Path path : queryExpression.getSelectedSeries()) { - QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path, + QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path, context); // add data type diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java index 2090074..96ad5ad 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java @@ -27,9 +27,6 @@ import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.ProcessorException; import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.control.OpenedFilePathsManager; -import org.apache.iotdb.db.query.control.QuerySession; -import org.apache.iotdb.db.query.control.QueryTokenManager; import org.apache.iotdb.db.query.executor.groupby.GroupByWithOnlyTimeFilterDataSetDataSet; import org.apache.iotdb.db.query.executor.groupby.GroupByWithValueFilterDataSetDataSet; import org.apache.iotdb.db.query.fill.IFill; @@ -57,15 +54,9 @@ public class EngineQueryRouter { /** * execute physical plan. */ - public QueryDataSet query(QueryExpression queryExpression) + public QueryDataSet query(QueryExpression queryExpression, QueryContext context) throws FileNodeManagerException { - long nextJobId = QuerySession.getCurrentThreadJobId(); - QueryTokenManager.getInstance().addJobId(nextJobId); - OpenedFilePathsManager.getInstance().addJobId(nextJobId); - - QueryContext context = new QueryContext(); - if (queryExpression.hasQueryFilter()) { try { IExpression optimizedExpression = ExpressionOptimizer.getInstance() @@ -74,12 +65,10 @@ public class EngineQueryRouter { if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) { EngineExecutorWithoutTimeGenerator engineExecutor = - new EngineExecutorWithoutTimeGenerator( - nextJobId, queryExpression); + new EngineExecutorWithoutTimeGenerator(queryExpression); return engineExecutor.executeWithGlobalTimeFilter(context); } else { EngineExecutorWithTimeGenerator engineExecutor = new EngineExecutorWithTimeGenerator( - nextJobId, queryExpression); return engineExecutor.execute(context); } @@ -89,7 +78,6 @@ public class EngineQueryRouter { } } else { EngineExecutorWithoutTimeGenerator engineExecutor = new EngineExecutorWithoutTimeGenerator( - nextJobId, queryExpression); return engineExecutor.executeWithoutFilter(context); } @@ -99,19 +87,13 @@ public class EngineQueryRouter { * execute aggregation query. */ public QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres, - IExpression expression) throws QueryFilterOptimizationException, FileNodeManagerException, - IOException, PathErrorException, ProcessorException { - - long nextJobId = QuerySession.getCurrentThreadJobId(); - QueryTokenManager.getInstance().addJobId(nextJobId); - OpenedFilePathsManager.getInstance().addJobId(nextJobId); - - QueryContext context = new QueryContext(); + IExpression expression, QueryContext context) throws QueryFilterOptimizationException, + FileNodeManagerException, IOException, PathErrorException, ProcessorException { if (expression != null) { IExpression optimizedExpression = ExpressionOptimizer.getInstance() .optimize(expression, selectedSeries); - AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor(nextJobId, + AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor( selectedSeries, aggres, optimizedExpression); if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) { return engineExecutor.executeWithOutTimeGenerator(context); @@ -119,7 +101,7 @@ public class EngineQueryRouter { return engineExecutor.executeWithTimeGenerator(context); } } else { - AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor(nextJobId, + AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor( selectedSeries, aggres, null); return engineExecutor.executeWithOutTimeGenerator(context); } @@ -137,15 +119,12 @@ public class EngineQueryRouter { * @param intervals time intervals, closed interval. */ public QueryDataSet groupBy(List<Path> selectedSeries, List<String> aggres, - IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals) + IExpression expression, long unit, long origin, List<Pair<Long, Long>> intervals, + QueryContext context) throws ProcessorException, QueryFilterOptimizationException, FileNodeManagerException, PathErrorException, IOException { - long nextJobId = QuerySession.getCurrentThreadJobId(); - QueryTokenManager.getInstance().addJobId(nextJobId); - OpenedFilePathsManager.getInstance().addJobId(nextJobId); - - QueryContext context = new QueryContext(); + long nextJobId = context.getJobId(); // check the legitimacy of intervals for (Pair<Long, Long> pair : intervals) { @@ -205,13 +184,12 @@ public class EngineQueryRouter { * @param queryTime timestamp * @param fillType type IFill map */ - public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillType) + public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillType, + QueryContext context) throws FileNodeManagerException, PathErrorException, IOException { - long nextJobId = QuerySession.getCurrentThreadJobId(); - QueryTokenManager.getInstance().addJobId(nextJobId); - OpenedFilePathsManager.getInstance().addJobId(nextJobId); - QueryContext context = new QueryContext(); + long nextJobId = context.getJobId(); + FillEngineExecutor fillEngineExecutor = new FillEngineExecutor(nextJobId, fillPaths, queryTime, fillType); return fillEngineExecutor.execute(context); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java index a465bf7..83c5fa9 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java @@ -28,8 +28,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.control.QueryDataSourceManager; -import org.apache.iotdb.db.query.control.QueryTokenManager; +import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator; import org.apache.iotdb.db.query.fill.IFill; import org.apache.iotdb.db.query.fill.PreviousFill; @@ -60,13 +59,13 @@ public class FillEngineExecutor { */ public QueryDataSet execute(QueryContext context) throws FileNodeManagerException, PathErrorException, IOException { - QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId, selectedSeries); + QueryResourceManager.getInstance().beginQueryOfGivenQueryPaths(jobId, selectedSeries); List<IFill> fillList = new ArrayList<>(); List<TSDataType> dataTypeList = new ArrayList<>(); for (Path path : selectedSeries) { - QueryDataSource queryDataSource = QueryDataSourceManager - .getQueryDataSource(jobId, path, context); + QueryDataSource queryDataSource = QueryResourceManager.getInstance() + .getQueryDataSource(path, context); TSDataType dataType = MManager.getInstance().getSeriesType(path.getFullPath()); dataTypeList.add(dataType); IFill fill = null; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithOnlyTimeFilterDataSetDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithOnlyTimeFilterDataSetDataSet.java index d94aa0d..e82b567 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithOnlyTimeFilterDataSetDataSet.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithOnlyTimeFilterDataSetDataSet.java @@ -29,8 +29,7 @@ import org.apache.iotdb.db.exception.ProcessorException; import org.apache.iotdb.db.query.aggregation.AggreResultData; import org.apache.iotdb.db.query.aggregation.AggregateFunction; import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.control.QueryDataSourceManager; -import org.apache.iotdb.db.query.control.QueryTokenManager; +import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.factory.SeriesReaderFactory; import org.apache.iotdb.db.query.reader.IAggregateReader; import org.apache.iotdb.db.query.reader.IPointReader; @@ -78,13 +77,13 @@ public class GroupByWithOnlyTimeFilterDataSetDataSet extends GroupByEngineDataSe throws FileNodeManagerException, PathErrorException, ProcessorException, IOException { initAggreFuction(aggres); // init reader - QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId, selectedSeries); + QueryResourceManager.getInstance().beginQueryOfGivenQueryPaths(jobId, selectedSeries); if (expression != null) { timeFilter = ((GlobalTimeExpression) expression).getFilter(); } for (int i = 0; i < selectedSeries.size(); i++) { - QueryDataSource queryDataSource = QueryDataSourceManager - .getQueryDataSource(jobId, selectedSeries.get(i), context); + QueryDataSource queryDataSource = QueryResourceManager.getInstance() + .getQueryDataSource(selectedSeries.get(i), context); // sequence reader for sealed tsfile, unsealed tsfile, memory SequenceDataReader sequenceReader = new SequenceDataReader(queryDataSource.getSeqDataSource(), diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithValueFilterDataSetDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithValueFilterDataSetDataSet.java index d995737..7a6b3d2 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithValueFilterDataSetDataSet.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithValueFilterDataSetDataSet.java @@ -28,7 +28,7 @@ import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.ProcessorException; import org.apache.iotdb.db.query.aggregation.AggregateFunction; import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.control.QueryTokenManager; +import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.factory.SeriesReaderFactory; import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp; import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator; @@ -73,11 +73,12 @@ public class GroupByWithValueFilterDataSetDataSet extends GroupByEngineDataSet { throws FileNodeManagerException, PathErrorException, ProcessorException, IOException { initAggreFuction(aggres); - QueryTokenManager.getInstance().beginQueryOfGivenExpression(jobId, expression); - QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId, selectedSeries); - this.timestampGenerator = new EngineTimeGenerator(jobId, expression, context); + QueryResourceManager.getInstance().beginQueryOfGivenExpression(context.getJobId(), expression); + QueryResourceManager + .getInstance().beginQueryOfGivenQueryPaths(context.getJobId(), selectedSeries); + this.timestampGenerator = new EngineTimeGenerator(expression, context); this.allDataReaderList = SeriesReaderFactory - .getByTimestampReadersOfSelectedPaths(jobId, selectedSeries, context); + .getByTimestampReadersOfSelectedPaths(selectedSeries, context); } @Override diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java index 98b16db..a3c29ba 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java @@ -30,7 +30,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.FileReaderManager; -import org.apache.iotdb.db.query.control.QueryDataSourceManager; +import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.reader.AllDataReader; import org.apache.iotdb.db.query.reader.IBatchReader; import org.apache.iotdb.db.query.reader.IPointReader; @@ -187,19 +187,18 @@ public class SeriesReaderFactory { /** * construct ByTimestampReader, include sequential data and unsequential data. * - * @param jobId query jobId * @param paths selected series path * @param context query context * @return the list of EngineReaderByTimeStamp */ - public static List<EngineReaderByTimeStamp> getByTimestampReadersOfSelectedPaths(long jobId, + public static List<EngineReaderByTimeStamp> getByTimestampReadersOfSelectedPaths( List<Path> paths, QueryContext context) throws IOException, FileNodeManagerException { List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>(); for (Path path : paths) { - QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path, + QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path, context); PriorityMergeReaderByTimestamp mergeReaderByTimestamp = new PriorityMergeReaderByTimestamp(); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java index c5aa51f..4ffe62b 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java @@ -27,7 +27,7 @@ import java.io.IOException; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.control.QueryDataSourceManager; +import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.factory.SeriesReaderFactory; import org.apache.iotdb.db.query.reader.AllDataReader; import org.apache.iotdb.db.query.reader.IReader; @@ -44,10 +44,8 @@ import org.apache.iotdb.tsfile.read.query.timegenerator.node.OrNode; public class EngineNodeConstructor { - private long jobId; - public EngineNodeConstructor(long jobId) { - this.jobId = jobId; + public EngineNodeConstructor() { } /** @@ -89,7 +87,7 @@ public class EngineNodeConstructor { QueryContext context) throws IOException, FileNodeManagerException { - QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, + QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource( singleSeriesExpression.getSeriesPath(), context); Filter filter = singleSeriesExpression.getFilter(); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java index 897a7d1..350ea6f 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java @@ -34,20 +34,18 @@ public class EngineTimeGenerator implements TimeGenerator { private IExpression expression; private Node operatorNode; - private long jobId; /** * Constructor of EngineTimeGenerator. */ - public EngineTimeGenerator(long jobId, IExpression expression, QueryContext context) + public EngineTimeGenerator(IExpression expression, QueryContext context) throws FileNodeManagerException { - this.jobId = jobId; this.expression = expression; initNode(context); } private void initNode(QueryContext context) throws FileNodeManagerException { - EngineNodeConstructor engineNodeConstructor = new EngineNodeConstructor(jobId); + EngineNodeConstructor engineNodeConstructor = new EngineNodeConstructor(); this.operatorNode = engineNodeConstructor.construct(expression, context); } diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/control/QueryTokenManagerTest.java b/iotdb/src/main/java/org/apache/iotdb/db/rescon/package-info.java similarity index 81% copy from iotdb/src/test/java/org/apache/iotdb/db/query/control/QueryTokenManagerTest.java copy to iotdb/src/main/java/org/apache/iotdb/db/rescon/package-info.java index 3ac1540..96c83c9 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/query/control/QueryTokenManagerTest.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/rescon/package-info.java @@ -1,29 +1,24 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.query.control; - -import org.junit.Test; - -public class QueryTokenManagerTest { - - @Test - public void test() { - //TODO - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * rescon means Resource Control, classes in this package provide global control over various + * resources shared in IoTDB. + */ +package org.apache.iotdb.db.rescon; \ No newline at end of file diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java index c63ec74..d85f056 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java @@ -105,7 +105,6 @@ public class IoTDB implements IoTDBMBean { registerManager.register(CloseMergeService.getInstance()); registerManager.register(StatMonitor.getInstance()); registerManager.register(BasicMemController.getInstance()); - registerManager.register(FileReaderManager.getInstance()); registerManager.register(SyncServerManager.getInstance()); JMXService.registerMBean(getInstance(), mbeanName); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index fca74d1..ad3c855 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -49,8 +49,8 @@ import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; -import org.apache.iotdb.db.query.control.QuerySession; -import org.apache.iotdb.db.query.control.QueryTokenManager; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.service.rpc.thrift.ServerProperties; import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq; import org.apache.iotdb.service.rpc.thrift.TSCancelOperationResp; @@ -104,6 +104,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { private ThreadLocal<HashMap<String, QueryDataSet>> queryRet = new ThreadLocal<>(); private ThreadLocal<ZoneId> zoneIds = new ThreadLocal<>(); private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private QueryContext context; public TSServiceImpl() throws IOException { // do nothing because there is no need @@ -185,7 +186,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { LOGGER.info("{}: receive close operation", IoTDBConstant.GLOBAL_DB_NAME); try { // end query for all the query tokens created by current thread - QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId()); + if(context != null) { + QueryResourceManager.getInstance().endQueryForGivenJob(context.getJobId()); + } clearAllStatusForCurrentRequest(); } catch (FileNodeManagerException e) { @@ -590,7 +593,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { if (!queryRet.get().containsKey(statement)) { PhysicalPlan physicalPlan = queryStatus.get().get(statement); processor.getExecutor().setFetchSize(fetchSize); - queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan); + + context = new QueryContext(); + context.setJobId(QueryResourceManager.getInstance().assignJobId()); + + queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan, context); queryRet.get().put(statement, queryDataSet); } else { queryDataSet = queryRet.get().get(statement); diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java index 35bfc84..03d3e57 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.engine.modification; import static junit.framework.TestCase.assertTrue; +import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT; +import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID; import static org.junit.Assert.assertEquals; import java.io.File; @@ -35,8 +37,9 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.MetadataArgsErrorException; import org.apache.iotdb.db.exception.PathErrorException; +import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.metadata.MManager; -import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; @@ -67,7 +70,9 @@ public class DeletionFileNodeTest { @Before public void setup() throws MetadataArgsErrorException, - PathErrorException, IOException, FileNodeManagerException { + PathErrorException, IOException, FileNodeManagerException, StartupException { + EnvironmentUtils.envSetUp(); + MManager.getInstance().setStorageLevelToMTree(processorName); for (int i = 0; i < 10; i++) { MManager.getInstance().addPathToMTree(processorName + "." + measurements[i], dataType, @@ -103,8 +108,10 @@ public class DeletionFileNodeTest { SingleSeriesExpression expression = new SingleSeriesExpression(new Path(processorName, measurements[5]), null); - QueryContext context = new QueryContext(); - QueryDataSource dataSource = FileNodeManager.getInstance().query(expression, context); + QueryResourceManager.getInstance().beginQueryOfGivenExpression(TEST_QUERY_JOB_ID, expression); + QueryDataSource dataSource = QueryResourceManager.getInstance() + .getQueryDataSource(expression.getSeriesPath(), TEST_QUERY_CONTEXT); + Iterator<TimeValuePair> timeValuePairs = dataSource.getSeqDataSource().getReadableChunk().getIterator(); int count = 0; @@ -113,6 +120,7 @@ public class DeletionFileNodeTest { count++; } assertEquals(50, count); + QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID); } @Test @@ -185,8 +193,11 @@ public class DeletionFileNodeTest { SingleSeriesExpression expression = new SingleSeriesExpression(new Path(processorName, measurements[5]), null); - QueryContext context = new QueryContext(); - QueryDataSource dataSource = FileNodeManager.getInstance().query(expression, context); + + QueryResourceManager.getInstance().beginQueryOfGivenExpression(TEST_QUERY_JOB_ID, expression); + QueryDataSource dataSource = QueryResourceManager.getInstance() + .getQueryDataSource(expression.getSeriesPath(), TEST_QUERY_CONTEXT); + Iterator<TimeValuePair> timeValuePairs = dataSource.getOverflowSeriesDataSource().getReadableMemChunk().getIterator(); int count = 0; @@ -195,6 +206,8 @@ public class DeletionFileNodeTest { count++; } assertEquals(50, count); + + QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID); } @Test diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java index c3e7dc3..3e9bb99 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.engine.modification; +import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT; import static org.junit.Assert.assertEquals; import java.io.IOException; @@ -30,6 +31,7 @@ import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel; import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.MetadataArgsErrorException; import org.apache.iotdb.db.exception.PathErrorException; +import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.query.executor.EngineQueryRouter; import org.apache.iotdb.db.utils.EnvironmentUtils; @@ -63,7 +65,9 @@ public class DeletionQueryTest { @Before public void setup() throws MetadataArgsErrorException, - PathErrorException, IOException, FileNodeManagerException { + PathErrorException, IOException, FileNodeManagerException, StartupException { + EnvironmentUtils.envSetUp(); + MManager.getInstance().setStorageLevelToMTree(processorName); for (int i = 0; i < 10; i++) { MManager.getInstance().addPathToMTree(processorName + "." + measurements[i], dataType, @@ -103,7 +107,7 @@ public class DeletionQueryTest { pathList.add(new Path(processorName, measurements[5])); QueryExpression queryExpression = QueryExpression.create(pathList, null); - QueryDataSet dataSet = router.query(queryExpression); + QueryDataSet dataSet = router.query(queryExpression, TEST_QUERY_CONTEXT); int count = 0; while (dataSet.hasNext()) { @@ -134,7 +138,7 @@ public class DeletionQueryTest { pathList.add(new Path(processorName, measurements[5])); QueryExpression queryExpression = QueryExpression.create(pathList, null); - QueryDataSet dataSet = router.query(queryExpression); + QueryDataSet dataSet = router.query(queryExpression, TEST_QUERY_CONTEXT); int count = 0; while (dataSet.hasNext()) { @@ -176,7 +180,7 @@ public class DeletionQueryTest { pathList.add(new Path(processorName, measurements[5])); QueryExpression queryExpression = QueryExpression.create(pathList, null); - QueryDataSet dataSet = router.query(queryExpression); + QueryDataSet dataSet = router.query(queryExpression, TEST_QUERY_CONTEXT); int count = 0; while (dataSet.hasNext()) { @@ -218,7 +222,7 @@ public class DeletionQueryTest { pathList.add(new Path(processorName, measurements[5])); QueryExpression queryExpression = QueryExpression.create(pathList, null); - QueryDataSet dataSet = router.query(queryExpression); + QueryDataSet dataSet = router.query(queryExpression, TEST_QUERY_CONTEXT); int count = 0; while (dataSet.hasNext()) { @@ -282,7 +286,7 @@ public class DeletionQueryTest { pathList.add(new Path(processorName, measurements[5])); QueryExpression queryExpression = QueryExpression.create(pathList, null); - QueryDataSet dataSet = router.query(queryExpression); + QueryDataSet dataSet = router.query(queryExpression, TEST_QUERY_CONTEXT); int count = 0; while (dataSet.hasNext()) { diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java index 82f7c23..e172971 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.integration; +import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -29,7 +30,7 @@ import java.sql.SQLException; import java.sql.Statement; import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.control.OpenedFilePathsManager; +import org.apache.iotdb.db.query.control.JobFileManager; import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.EnvironmentUtils; @@ -197,10 +198,8 @@ public class IoTDBEngineTimeGeneratorIT { SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(pd0s0, FilterFactory.and(valueGtEq, timeGt)); - OpenedFilePathsManager.getInstance().addJobId(0); - QueryContext context = new QueryContext(); - EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, singleSeriesExpression, - context); + EngineTimeGenerator timeGenerator = new EngineTimeGenerator(singleSeriesExpression, + TEST_QUERY_CONTEXT); int cnt = 0; while (timeGenerator.hasNext()) { @@ -222,11 +221,9 @@ public class IoTDBEngineTimeGeneratorIT { Path pd1s0 = new Path(Constant.d1s0); ValueFilter.ValueGtEq valueGtEq = ValueFilter.gtEq(5); - OpenedFilePathsManager.getInstance().addJobId(0); IExpression singleSeriesExpression = new SingleSeriesExpression(pd1s0, valueGtEq); - QueryContext context = new QueryContext(); - EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, singleSeriesExpression, - context); + EngineTimeGenerator timeGenerator = new EngineTimeGenerator(singleSeriesExpression, + TEST_QUERY_CONTEXT); int cnt = 0; while (timeGenerator.hasNext()) { @@ -258,9 +255,8 @@ public class IoTDBEngineTimeGeneratorIT { IExpression andExpression = BinaryExpression .and(singleSeriesExpression1, singleSeriesExpression2); - OpenedFilePathsManager.getInstance().addJobId(0); - QueryContext context = new QueryContext(); - EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, andExpression, context); + EngineTimeGenerator timeGenerator = new EngineTimeGenerator(andExpression, + TEST_QUERY_CONTEXT); int cnt = 0; while (timeGenerator.hasNext()) { long time = timeGenerator.next(); diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java index d7c091e..a4b95a3 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java @@ -18,6 +18,8 @@ */ package org.apache.iotdb.db.integration; +import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT; +import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -27,8 +29,8 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import org.apache.iotdb.db.exception.FileNodeManagerException; -import org.apache.iotdb.db.query.control.QuerySession; -import org.apache.iotdb.db.query.control.QueryTokenManager; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.executor.EngineQueryRouter; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.EnvironmentUtils; @@ -179,7 +181,9 @@ public class IoTDBSequenceDataQueryIT { queryExpression.addSelectedPath(new Path(Constant.d1s1)); queryExpression.setExpression(null); - QueryDataSet queryDataSet = engineExecutor.query(queryExpression); + TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId(); + TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); + QueryDataSet queryDataSet = engineExecutor.query(queryExpression, TEST_QUERY_CONTEXT); int cnt = 0; while (queryDataSet.hasNext()) { @@ -189,7 +193,7 @@ public class IoTDBSequenceDataQueryIT { } assertEquals(1000, cnt); - QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId()); + QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID); } @Test @@ -202,7 +206,9 @@ public class IoTDBSequenceDataQueryIT { GlobalTimeExpression globalTimeExpression = new GlobalTimeExpression(TimeFilter.gtEq(800L)); queryExpression.setExpression(globalTimeExpression); - QueryDataSet queryDataSet = engineExecutor.query(queryExpression); + TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId(); + TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); + QueryDataSet queryDataSet = engineExecutor.query(queryExpression, TEST_QUERY_CONTEXT); int cnt = 0; while (queryDataSet.hasNext()) { @@ -215,7 +221,7 @@ public class IoTDBSequenceDataQueryIT { } assertEquals(350, cnt); - QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId()); + QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID); } @Test @@ -236,7 +242,9 @@ public class IoTDBSequenceDataQueryIT { ValueFilter.gtEq(14)); queryExpression.setExpression(singleSeriesExpression); - QueryDataSet queryDataSet = engineExecutor.query(queryExpression); + TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId(); + TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); + QueryDataSet queryDataSet = engineExecutor.query(queryExpression, TEST_QUERY_CONTEXT); int cnt = 0; while (queryDataSet.hasNext()) { @@ -246,7 +254,7 @@ public class IoTDBSequenceDataQueryIT { } assertEquals(count, cnt); - QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId()); + QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID); } } diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java index 1c991ae..6b8d413 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java @@ -18,6 +18,8 @@ */ package org.apache.iotdb.db.integration; +import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT; +import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; @@ -29,8 +31,8 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import org.apache.iotdb.db.exception.FileNodeManagerException; -import org.apache.iotdb.db.query.control.QuerySession; -import org.apache.iotdb.db.query.control.QueryTokenManager; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.executor.EngineQueryRouter; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.EnvironmentUtils; @@ -254,7 +256,9 @@ public class IoTDBSeriesReaderIT { queryExpression.addSelectedPath(new Path(Constant.d1s1)); queryExpression.setExpression(null); - QueryDataSet queryDataSet = engineExecutor.query(queryExpression); + TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId(); + TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); + QueryDataSet queryDataSet = engineExecutor.query(queryExpression, TEST_QUERY_CONTEXT); int cnt = 0; while (queryDataSet.hasNext()) { @@ -263,7 +267,7 @@ public class IoTDBSeriesReaderIT { } assertEquals(23400, cnt); - QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId()); + QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID); } @Test @@ -280,7 +284,9 @@ public class IoTDBSeriesReaderIT { ValueFilter.gtEq(20)); queryExpression.setExpression(singleSeriesExpression); - QueryDataSet queryDataSet = engineExecutor.query(queryExpression); + TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId(); + TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); + QueryDataSet queryDataSet = engineExecutor.query(queryExpression, TEST_QUERY_CONTEXT); int cnt = 0; while (queryDataSet.hasNext()) { @@ -291,7 +297,7 @@ public class IoTDBSeriesReaderIT { } assertEquals(16440, cnt); - QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId()); + QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID); } @Test @@ -306,7 +312,9 @@ public class IoTDBSeriesReaderIT { SingleSeriesExpression expression = new SingleSeriesExpression(path, TimeFilter.gt(22987L)); queryExpression.setExpression(expression); - QueryDataSet queryDataSet = engineExecutor.query(queryExpression); + TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId(); + TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); + QueryDataSet queryDataSet = engineExecutor.query(queryExpression, TEST_QUERY_CONTEXT); int cnt = 0; while (queryDataSet.hasNext()) { @@ -317,7 +325,7 @@ public class IoTDBSeriesReaderIT { } assertEquals(3012, cnt); - QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId()); + QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID); } @Test @@ -333,7 +341,9 @@ public class IoTDBSeriesReaderIT { ValueFilter.lt(111)); queryExpression.setExpression(singleSeriesExpression); - QueryDataSet queryDataSet = engineExecutor.query(queryExpression); + TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId(); + TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); + QueryDataSet queryDataSet = engineExecutor.query(queryExpression, TEST_QUERY_CONTEXT); int cnt = 0; while (queryDataSet.hasNext()) { @@ -344,7 +354,7 @@ public class IoTDBSeriesReaderIT { } assertEquals(22800, cnt); - QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId()); + QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID); } @Test diff --git a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java index 42e279d..7caaa59 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.qp.plan; +import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -123,7 +124,8 @@ public class QPUpdateTest { // query to assert sqlStr = "select sensor_1,sensor_2 from root.qp_update_test.device_1"; PhysicalPlan plan2 = processor.parseSQLToPhysicalPlan(sqlStr); - QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) plan2); + QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) plan2, + TEST_QUERY_CONTEXT); String[] expect = {"10 33000 null", "20 null 10"}; int i = 0; while (queryDataSet.hasNext()) { @@ -145,7 +147,8 @@ public class QPUpdateTest { PhysicalPlan plan2 = processor.parseSQLToPhysicalPlan(sqlStr); // RecordReaderFactory.getInstance().removeRecordReader("root.qp_update_test.device_1", "sensor_1"); // RecordReaderFactory.getInstance().removeRecordReader("root.qp_update_test.device_1", "sensor_2"); - QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) plan2); + QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) plan2, + TEST_QUERY_CONTEXT); String[] expect = {"20 null 10"}; int i = 0; @@ -168,7 +171,8 @@ public class QPUpdateTest { PhysicalPlan plan2 = processor.parseSQLToPhysicalPlan(sqlStr); // RecordReaderFactory.getInstance().removeRecordReader("root.qp_update_test.device_1", "sensor_1"); // RecordReaderFactory.getInstance().removeRecordReader("root.qp_update_test.device_1", "sensor_2"); - QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) plan2); + QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) plan2, + TEST_QUERY_CONTEXT); String[] expect = {"20 null 10"}; int i = 0; @@ -193,7 +197,8 @@ public class QPUpdateTest { // query to assert sqlStr = "select sensor_1,sensor_2 from root.qp_update_test.device_1"; PhysicalPlan plan2 = processor.parseSQLToPhysicalPlan(sqlStr); - QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) plan2); + QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) plan2, + TEST_QUERY_CONTEXT); String[] expect = {"13 50 40", "20 null 10"}; int i = 0; diff --git a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java index 07983b3..1446cb1 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.qp.physical.crud.UpdatePlan; +import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.fill.IFill; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -108,7 +109,8 @@ public class MemIntQpExecutor extends QueryProcessExecutor { } @Override - public QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression) + public QueryDataSet aggregate(List<Path> paths, List<String> aggres, IExpression expression, + QueryContext context) throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, QueryFilterOptimizationException { return null; @@ -116,14 +118,15 @@ public class MemIntQpExecutor extends QueryProcessExecutor { @Override public QueryDataSet groupBy(List<Path> paths, List<String> aggres, IExpression expression, - long unit, long origin, List<Pair<Long, Long>> intervals) + long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext context) throws ProcessorException, IOException, PathErrorException, FileNodeManagerException, QueryFilterOptimizationException { return null; } @Override - public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes) + public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillTypes, + QueryContext context) throws ProcessorException, IOException, PathErrorException, FileNodeManagerException { return null; } diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java index 81d6f30..970489d 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.query.control; +import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID; import static org.junit.Assert.fail; import java.io.File; @@ -54,6 +55,7 @@ public class FileReaderManagerTest { String filePath = "target/test.file"; FileReaderManager manager = FileReaderManager.getInstance(); + JobFileManager testManager = new JobFileManager(); for (int i = 1; i <= MAX_FILE_SIZE; i++) { File file = new File(filePath + i); @@ -62,10 +64,10 @@ public class FileReaderManagerTest { Thread t1 = new Thread(() -> { try { - OpenedFilePathsManager.getInstance().addJobId(1L); + testManager.addJobId(1L); for (int i = 1; i <= 6; i++) { - OpenedFilePathsManager.getInstance().addFilePathToMap(1L, filePath + i, + testManager.addFilePathToMap(1L, filePath + i, false); manager.get(filePath + i, false); Assert.assertTrue(manager.contains(filePath + i, false)); @@ -80,10 +82,10 @@ public class FileReaderManagerTest { Thread t2 = new Thread(() -> { try { - OpenedFilePathsManager.getInstance().addJobId(2L); + testManager.addJobId(2L); for (int i = 4; i <= MAX_FILE_SIZE; i++) { - OpenedFilePathsManager.getInstance().addFilePathToMap(2L, filePath + i, + testManager.addFilePathToMap(2L, filePath + i, false); manager.get(filePath + i, false); Assert.assertTrue(manager.contains(filePath + i, false)); @@ -120,7 +122,6 @@ public class FileReaderManagerTest { // } // } - OpenedFilePathsManager.getInstance().removeUsedFilesForGivenJob(QuerySession.getCurrentThreadJobId()); FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders(); for (int i = 1; i < MAX_FILE_SIZE; i++) { File file = new File(filePath + i); diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/control/QueryTokenManagerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/control/QueryResourceManagerTest.java similarity index 93% rename from iotdb/src/test/java/org/apache/iotdb/db/query/control/QueryTokenManagerTest.java rename to iotdb/src/test/java/org/apache/iotdb/db/query/control/QueryResourceManagerTest.java index 3ac1540..930ab6f 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/query/control/QueryTokenManagerTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/query/control/QueryResourceManagerTest.java @@ -20,7 +20,7 @@ package org.apache.iotdb.db.query.control; import org.junit.Test; -public class QueryTokenManagerTest { +public class QueryResourceManagerTest { @Test public void test() { diff --git a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java index ad5a007..d4d9c63 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java @@ -34,9 +34,9 @@ import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.monitor.StatMonitor; +import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.FileReaderManager; -import org.apache.iotdb.db.query.control.QuerySession; -import org.apache.iotdb.db.query.control.QueryTokenManager; +import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; @@ -59,9 +59,12 @@ public class EnvironmentUtils { private static Directories directories = Directories.getInstance(); private static TSFileConfig tsfileConfig = TSFileDescriptor.getInstance().getConfig(); + public static long TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId(); + public static QueryContext TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); + public static void cleanEnv() throws IOException, FileNodeManagerException { - QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId()); + QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID); // clear opened file streams FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders(); @@ -169,5 +172,7 @@ public class EnvironmentUtils { } FileNodeManager.getInstance().resetFileNodeManager(); MultiFileLogNodeManager.getInstance().start(); + TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId(); + TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); } }
