This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch unifiy_query_control
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit e8efcfeac78c0b5a26738b9102f39bd570a4de8e
Author: 江天 <[email protected]>
AuthorDate: Mon Apr 1 11:06:07 2019 +0800

    provide unified query resource control interface in QueryResourceManager.
---
 .../iotdb/db/qp/executor/OverflowQPExecutor.java   | 15 +++--
 .../iotdb/db/qp/executor/QueryProcessExecutor.java | 24 ++++----
 .../iotdb/db/query/context/QueryContext.java       | 17 ++++++
 ...edFilePathsManager.java => JobFileManager.java} | 25 ++------
 .../db/query/control/QueryDataSourceManager.java   | 53 ----------------
 ...TokenManager.java => QueryResourceManager.java} | 50 +++++++++++----
 .../iotdb/db/query/control/QuerySession.java       | 71 ----------------------
 .../db/query/executor/AggregateEngineExecutor.java | 23 ++++---
 .../executor/EngineExecutorWithTimeGenerator.java  | 19 +++---
 .../EngineExecutorWithoutTimeGenerator.java        | 19 +++---
 .../iotdb/db/query/executor/EngineQueryRouter.java | 48 ++++-----------
 .../db/query/executor/FillEngineExecutor.java      |  9 ++-
 .../GroupByWithOnlyTimeFilterDataSetDataSet.java   |  9 ++-
 .../GroupByWithValueFilterDataSetDataSet.java      | 11 ++--
 .../db/query/factory/SeriesReaderFactory.java      |  7 +--
 .../query/timegenerator/EngineNodeConstructor.java |  8 +--
 .../query/timegenerator/EngineTimeGenerator.java   |  6 +-
 .../org/apache/iotdb/db/rescon/package-info.java}  | 53 ++++++++--------
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  1 -
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 15 +++--
 .../engine/modification/DeletionFileNodeTest.java  | 25 ++++++--
 .../db/engine/modification/DeletionQueryTest.java  | 16 +++--
 .../db/integration/IoTDBEngineTimeGeneratorIT.java | 20 +++---
 .../db/integration/IoTDBSequenceDataQueryIT.java   | 24 +++++---
 .../iotdb/db/integration/IoTDBSeriesReaderIT.java  | 30 ++++++---
 .../org/apache/iotdb/db/qp/plan/QPUpdateTest.java  | 13 ++--
 .../apache/iotdb/db/qp/utils/MemIntQpExecutor.java |  9 ++-
 .../db/query/control/FileReaderManagerTest.java    | 11 ++--
 ...agerTest.java => QueryResourceManagerTest.java} |  2 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    | 11 +++-
 30 files changed, 284 insertions(+), 360 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index b8c20d7..d477e2f 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
 import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
 import org.apache.iotdb.db.qp.physical.sys.MetadataPlan;
 import org.apache.iotdb.db.qp.physical.sys.PropertyPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.fill.IFill;
 import org.apache.iotdb.db.utils.AuthUtils;
 import org.apache.iotdb.db.utils.LoadDataUtils;
@@ -178,24 +179,26 @@ public class OverflowQPExecutor extends 
QueryProcessExecutor {
   }
 
   @Override
-  public QueryDataSet aggregate(List<Path> paths, List<String> aggres, 
IExpression expression)
+  public QueryDataSet aggregate(List<Path> paths, List<String> aggres, 
IExpression expression,
+      QueryContext context)
       throws ProcessorException, FileNodeManagerException, 
QueryFilterOptimizationException,
       PathErrorException, IOException {
-    return queryRouter.aggregate(paths, aggres, expression);
+    return queryRouter.aggregate(paths, aggres, expression, context);
   }
 
   @Override
-  public QueryDataSet fill(List<Path> fillPaths, long queryTime, 
Map<TSDataType, IFill> fillTypes)
+  public QueryDataSet fill(List<Path> fillPaths, long queryTime, 
Map<TSDataType, IFill> fillTypes,
+      QueryContext context)
       throws ProcessorException, IOException, PathErrorException, 
FileNodeManagerException {
-    return queryRouter.fill(fillPaths, queryTime, fillTypes);
+    return queryRouter.fill(fillPaths, queryTime, fillTypes, context);
   }
 
   @Override
   public QueryDataSet groupBy(List<Path> paths, List<String> aggres, 
IExpression expression,
-      long unit, long origin, List<Pair<Long, Long>> intervals)
+      long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext 
context)
       throws ProcessorException, FileNodeManagerException, 
QueryFilterOptimizationException,
       PathErrorException, IOException {
-    return queryRouter.groupBy(paths, aggres, expression, unit, origin, 
intervals);
+    return queryRouter.groupBy(paths, aggres, expression, unit, origin, 
intervals, context);
   }
 
   @Override
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index f456ac4..60f97f7 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.executor.EngineQueryRouter;
 import org.apache.iotdb.db.query.fill.IFill;
 import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -56,7 +57,7 @@ public abstract class QueryProcessExecutor {
    * @param queryPlan QueryPlan
    * @return QueryDataSet
    */
-  public QueryDataSet processQuery(QueryPlan queryPlan)
+  public QueryDataSet processQuery(QueryPlan queryPlan, QueryContext context)
       throws IOException, FileNodeManagerException, PathErrorException,
       QueryFilterOptimizationException, ProcessorException {
 
@@ -66,19 +67,20 @@ public abstract class QueryProcessExecutor {
       GroupByPlan groupByPlan = (GroupByPlan) queryPlan;
       return groupBy(groupByPlan.getPaths(), groupByPlan.getAggregations(),
           groupByPlan.getExpression(), groupByPlan.getUnit(), 
groupByPlan.getOrigin(),
-          groupByPlan.getIntervals());
+          groupByPlan.getIntervals(), context);
     }
 
     if (queryPlan instanceof AggregationPlan) {
       return aggregate(queryPlan.getPaths(), queryPlan.getAggregations(),
-          ((AggregationPlan) queryPlan).getExpression());
+          ((AggregationPlan) queryPlan).getExpression(), context);
     }
 
     if (queryPlan instanceof FillQueryPlan) {
       FillQueryPlan fillQueryPlan = (FillQueryPlan) queryPlan;
-      return fill(queryPlan.getPaths(), fillQueryPlan.getQueryTime(), 
fillQueryPlan.getFillType());
+      return fill(queryPlan.getPaths(), fillQueryPlan.getQueryTime(),
+          fillQueryPlan.getFillType(), context);
     }
-    return queryRouter.query(queryExpression);
+    return queryRouter.query(queryExpression, context);
   }
 
   public abstract TSDataType getSeriesType(Path fullPath) throws 
PathErrorException;
@@ -101,16 +103,16 @@ public abstract class QueryProcessExecutor {
   }
 
   public abstract QueryDataSet aggregate(List<Path> paths, List<String> aggres,
-      IExpression expression) throws ProcessorException, IOException, 
PathErrorException,
-      FileNodeManagerException, QueryFilterOptimizationException;
+      IExpression expression, QueryContext context) throws ProcessorException, 
IOException,
+      PathErrorException, FileNodeManagerException, 
QueryFilterOptimizationException;
 
   public abstract QueryDataSet groupBy(List<Path> paths, List<String> aggres,
-      IExpression expression, long unit, long origin, List<Pair<Long, Long>> 
intervals)
-      throws ProcessorException, IOException, PathErrorException, 
FileNodeManagerException,
-      QueryFilterOptimizationException;
+      IExpression expression, long unit, long origin, List<Pair<Long, Long>> 
intervals,
+      QueryContext context) throws ProcessorException, IOException, 
PathErrorException,
+      FileNodeManagerException, QueryFilterOptimizationException;
 
   public abstract QueryDataSet fill(List<Path> fillPaths, long queryTime, 
Map<TSDataType,
-      IFill> fillTypes)
+      IFill> fillTypes, QueryContext context)
       throws ProcessorException, IOException, PathErrorException, 
FileNodeManagerException;
 
   /**
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
index 2a7769a..cfb06ad 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
@@ -44,6 +44,15 @@ public class QueryContext {
    */
   private Map<String, List<Modification>> fileModCache = new HashMap<>();
 
+  private long jobId;
+
+  public QueryContext() {
+  }
+
+  public QueryContext(long jobId) {
+    this.jobId = jobId;
+  }
+
   /**
    * Find the modifications of timeseries 'path' in 'modFile'. If they are not 
in the cache, read
    * them from 'modFile' and put then into the cache.
@@ -75,4 +84,12 @@ public class QueryContext {
 
     return pathModifications;
   }
+
+  public long getJobId() {
+    return jobId;
+  }
+
+  public void setJobId(long jobId) {
+    this.jobId = jobId;
+  }
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
 b/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java
similarity index 84%
rename from 
iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
rename to 
iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java
index 62d806c..84e93c6 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java
@@ -27,12 +27,10 @@ import 
org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 
 /**
  * <p>
- * Singleton pattern, to manage all query tokens. Each jdbc request has an 
unique job id, in this jdbc request,
- * OpenedFilePathsManager manage all the opened files, and store in the set of 
current job id.
+ * JobFileManager records the paths of files that every query job uses for 
QueryResourceManager.
+ * <p>
  */
-public class OpenedFilePathsManager {
-
-
+public class JobFileManager {
 
   /**
    * Map<jobId, Set<filePaths>>
@@ -40,17 +38,14 @@ public class OpenedFilePathsManager {
   private ConcurrentHashMap<Long, Set<String>> sealedFilePathsMap;
   private ConcurrentHashMap<Long, Set<String>> unsealedFilePathsMap;
 
-  private OpenedFilePathsManager() {
+  public JobFileManager() {
     sealedFilePathsMap = new ConcurrentHashMap<>();
     unsealedFilePathsMap = new ConcurrentHashMap<>();
   }
 
-  public static OpenedFilePathsManager getInstance() {
-    return OpenedFilePathsManagerHelper.INSTANCE;
-  }
-
   /**
-   * Set job id for current request thread. When a query request is created 
firstly, this method must be invoked.
+   * Set job id for current request thread. When a query request is created 
firstly,
+   * this method must be invoked.
    */
   public void addJobId(long jobId) {
     sealedFilePathsMap.computeIfAbsent(jobId, x -> new HashSet<>());
@@ -109,12 +104,4 @@ public class OpenedFilePathsManager {
       FileReaderManager.getInstance().increaseFileReaderReference(filePath, 
isSealed);
     }
   }
-
-  private static class OpenedFilePathsManagerHelper {
-    private static final OpenedFilePathsManager INSTANCE = new 
OpenedFilePathsManager();
-
-    private OpenedFilePathsManagerHelper() {
-
-    }
-  }
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java
deleted file mode 100644
index add4e19..0000000
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.query.control;
-
-import org.apache.iotdb.db.engine.filenode.FileNodeManager;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.exception.FileNodeManagerException;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
-
-/**
- * <p>
- * This class is used to get query data source of a given path. See the 
component of
- * <code>QueryDataSource</code>
- */
-public class QueryDataSourceManager {
-
-  private static FileNodeManager fileNodeManager = 
FileNodeManager.getInstance();
-
-  private QueryDataSourceManager() {
-  }
-
-  public static QueryDataSource getQueryDataSource(long jobId, Path 
selectedPath,
-      QueryContext context)
-      throws FileNodeManagerException {
-
-    SingleSeriesExpression singleSeriesExpression = new 
SingleSeriesExpression(selectedPath, null);
-    QueryDataSource queryDataSource = 
fileNodeManager.query(singleSeriesExpression, context);
-
-    // add used files to current thread request cached map
-    OpenedFilePathsManager.getInstance()
-        .addUsedFilesForGivenJob(jobId, queryDataSource);
-
-    return queryDataSource;
-  }
-}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
similarity index 75%
rename from 
iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java
rename to 
iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index 7a566e8..a535d87 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -24,8 +24,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.ExpressionType;
 import org.apache.iotdb.tsfile.read.expression.IBinaryExpression;
@@ -34,13 +37,17 @@ import 
org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 
 /**
  * <p>
- * Singleton pattern, to manage all query tokens. Each jdbc query request can 
query multiple series,
- * in the processing of querying different device id, the 
<code>FileNodeManager.getInstance().
- * beginQuery</code> and 
<code>FileNodeManager.getInstance().endQueryForGivenJob</code> must be invoked 
in the
- * beginning and ending of jdbc request.
+ * QueryResourceManager manages resource (file streams) used by each query 
job, and assign Ids to the jobs.
+ * During the life cycle of a query, the following methods must be called in 
strict order:
+ * 1. assignJobId - get an Id for the new job.
+ * 2. beginQueryOfGivenQueryPaths - remind FileNodeManager that some files are 
being used
+ * 3. (if using filter)beginQueryOfGivenExpression
+ *     - remind FileNodeManager that some files are being used
+ * 4. getQueryDataSource - open files for the job or reuse existing readers.
+ * 5. endQueryForGivenJob - release the resource used by this job.
  * </p>
  */
-public class QueryTokenManager {
+public class QueryResourceManager {
 
   /**
    * Map&lt;jobId, Map&lt;deviceId, List&lt;token&gt;&gt;&gt;.
@@ -79,22 +86,29 @@ public class QueryTokenManager {
    * </p>
    */
   private ConcurrentHashMap<Long, ConcurrentHashMap<String, List<Integer>>> 
queryTokensMap;
+  private JobFileManager filePathsManager;
+  private AtomicLong maxJobId;
 
 
-  private QueryTokenManager() {
+  private QueryResourceManager() {
     queryTokensMap = new ConcurrentHashMap<>();
+    filePathsManager = new JobFileManager();
+    maxJobId = new AtomicLong(0);
   }
 
-  public static QueryTokenManager getInstance() {
+  public static QueryResourceManager getInstance() {
     return QueryTokenManagerHelper.INSTANCE;
   }
 
   /**
-   * Set job id for current request thread. When a query request is created 
firstly, this method
+   * Assign a jobId for a new query job. When a query request is created 
firstly, this method
    * must be invoked.
    */
-  public void addJobId(long jobId) {
+  public long assignJobId() {
+    long jobId = maxJobId.incrementAndGet();
     queryTokensMap.computeIfAbsent(jobId, x -> new ConcurrentHashMap<>());
+    filePathsManager.addJobId(jobId);
+    return jobId;
   }
 
   /**
@@ -126,6 +140,20 @@ public class QueryTokenManager {
     }
   }
 
+  public QueryDataSource getQueryDataSource(Path selectedPath,
+      QueryContext context)
+      throws FileNodeManagerException {
+
+    SingleSeriesExpression singleSeriesExpression = new 
SingleSeriesExpression(selectedPath, null);
+    QueryDataSource queryDataSource = FileNodeManager.getInstance()
+        .query(singleSeriesExpression, context);
+
+    // add used files to current thread request cached map
+    filePathsManager.addUsedFilesForGivenJob(context.getJobId(), 
queryDataSource);
+
+    return queryDataSource;
+  }
+
   /**
    * Whenever the jdbc request is closed normally or abnormally, this method 
must be invoked. All
    * query tokens created by this jdbc request must be cleared.
@@ -142,7 +170,7 @@ public class QueryTokenManager {
       }
       queryTokensMap.remove(jobId);
     // remove usage of opened file paths of current thread
-    OpenedFilePathsManager.getInstance().removeUsedFilesForGivenJob(jobId);
+    filePathsManager.removeUsedFilesForGivenJob(jobId);
   }
 
   private void getUniquePaths(IExpression expression, Set<String> deviceIdSet) 
{
@@ -161,7 +189,7 @@ public class QueryTokenManager {
 
   private static class QueryTokenManagerHelper {
 
-    private static final QueryTokenManager INSTANCE = new QueryTokenManager();
+    private static final QueryResourceManager INSTANCE = new 
QueryResourceManager();
 
     private QueryTokenManagerHelper() {
     }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java
deleted file mode 100644
index 3151745..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.query.control;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class QuerySession {
-  /**
-   * Each jdbc request has an unique jod id, job id is stored in thread local 
variable jobIdContainer.
-   */
-  private ThreadLocal<Long> jobId;
-
-  /**
-   * Each unique jdbc request(query, aggregation or others job) has an unique 
job id. This job id
-   * will always be maintained until the request is closed. In each job, the 
unique file will be
-   * only opened once to avoid too many opened files error.
-   */
-  private static AtomicLong jobIdGenerator = new AtomicLong();
-
-  private QuerySession() {
-    this.jobId = new ThreadLocal<Long>(){
-      @Override
-      protected Long initialValue() {
-        super.initialValue();
-        long id = jobIdGenerator.incrementAndGet();
-        return id;
-      }
-    };
-  }
-
-  public long getJobId() {
-    return jobId.get();
-  }
-
-  public static long getCurrentThreadJobId() {
-    return QuerySessionHelper.INSTANCE.jobId.get();
-  }
-
-  private static class QuerySessionHelper {
-
-    private static final QuerySession INSTANCE = new QuerySession();
-
-    private QuerySessionHelper() {
-    }
-  }
-
-  public static QuerySession getCurrentThreadQuerySession() {
-    return QuerySessionHelper.INSTANCE;
-  }
-
-
-
-
-}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index a21cc80..ba76c2f 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -34,8 +34,7 @@ import 
org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc;
 import org.apache.iotdb.db.query.aggregation.impl.MaxTimeAggrFunc;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryDataSourceManager;
-import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.AggreResultDataPointReader;
 import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
@@ -55,7 +54,6 @@ import 
org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 public class AggregateEngineExecutor {
 
-  private long jobId;
   private List<Path> selectedSeries;
   private List<String> aggres;
   private IExpression expression;
@@ -68,9 +66,8 @@ public class AggregateEngineExecutor {
   /**
    * constructor.
    */
-  public AggregateEngineExecutor(long jobId, List<Path> selectedSeries, 
List<String> aggres,
+  public AggregateEngineExecutor(List<Path> selectedSeries, List<String> 
aggres,
       IExpression expression) {
-    this.jobId = jobId;
     this.selectedSeries = selectedSeries;
     this.aggres = aggres;
     this.expression = expression;
@@ -88,7 +85,8 @@ public class AggregateEngineExecutor {
     if (expression != null) {
       timeFilter = ((GlobalTimeExpression) expression).getFilter();
     }
-    QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId, 
selectedSeries);
+    QueryResourceManager
+        .getInstance().beginQueryOfGivenQueryPaths(context.getJobId(), 
selectedSeries);
 
     List<SequenceDataReader> readersOfSequenceData = new ArrayList<>();
     List<IPointReader> readersOfUnSequenceData = new ArrayList<>();
@@ -101,8 +99,8 @@ public class AggregateEngineExecutor {
       function.init();
       aggregateFunctions.add(function);
 
-      QueryDataSource queryDataSource = QueryDataSourceManager
-          .getQueryDataSource(jobId, selectedSeries.get(i), context);
+      QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+          .getQueryDataSource(selectedSeries.get(i), context);
 
       // sequence reader for sealed tsfile, unsealed tsfile, memory
       SequenceDataReader sequenceReader;
@@ -258,12 +256,13 @@ public class AggregateEngineExecutor {
    */
   public QueryDataSet executeWithTimeGenerator(QueryContext context)
       throws FileNodeManagerException, PathErrorException, IOException, 
ProcessorException {
-    QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId, 
selectedSeries);
-    QueryTokenManager.getInstance().beginQueryOfGivenExpression(jobId, 
expression);
+    QueryResourceManager
+        .getInstance().beginQueryOfGivenQueryPaths(context.getJobId(), 
selectedSeries);
+    
QueryResourceManager.getInstance().beginQueryOfGivenExpression(context.getJobId(),
 expression);
 
-    EngineTimeGenerator timestampGenerator = new EngineTimeGenerator(jobId, 
expression, context);
+    EngineTimeGenerator timestampGenerator = new 
EngineTimeGenerator(expression, context);
     List<EngineReaderByTimeStamp> readersOfSelectedSeries = SeriesReaderFactory
-        .getByTimestampReadersOfSelectedPaths(jobId, selectedSeries, context);
+        .getByTimestampReadersOfSelectedPaths(selectedSeries, context);
 
     List<AggregateFunction> aggregateFunctions = new ArrayList<>();
     for (int i = 0; i < selectedSeries.size(); i++) {
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
index 77372a6..3b40187 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.EngineDataSetWithTimeGenerator;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
 import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
@@ -42,10 +42,8 @@ import 
org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 public class EngineExecutorWithTimeGenerator {
 
   private QueryExpression queryExpression;
-  private long jobId;
 
-  EngineExecutorWithTimeGenerator(long jobId, QueryExpression queryExpression) 
{
-    this.jobId = jobId;
+  EngineExecutorWithTimeGenerator(QueryExpression queryExpression) {
     this.queryExpression = queryExpression;
   }
 
@@ -58,18 +56,17 @@ public class EngineExecutorWithTimeGenerator {
    */
   public QueryDataSet execute(QueryContext context) throws 
FileNodeManagerException {
 
-    QueryTokenManager.getInstance()
-        .beginQueryOfGivenQueryPaths(jobId, 
queryExpression.getSelectedSeries());
-    QueryTokenManager.getInstance()
-        .beginQueryOfGivenExpression(jobId, queryExpression.getExpression());
+    QueryResourceManager.getInstance()
+        .beginQueryOfGivenQueryPaths(context.getJobId(), 
queryExpression.getSelectedSeries());
+    QueryResourceManager.getInstance()
+        .beginQueryOfGivenExpression(context.getJobId(), 
queryExpression.getExpression());
 
     EngineTimeGenerator timestampGenerator;
     List<EngineReaderByTimeStamp> readersOfSelectedSeries;
     try {
-      timestampGenerator = new EngineTimeGenerator(jobId, 
queryExpression.getExpression(), context);
+      timestampGenerator = new 
EngineTimeGenerator(queryExpression.getExpression(), context);
       readersOfSelectedSeries = SeriesReaderFactory
-          .getByTimestampReadersOfSelectedPaths(jobId, 
queryExpression.getSelectedSeries(),
-              context);
+          
.getByTimestampReadersOfSelectedPaths(queryExpression.getSelectedSeries(), 
context);
     } catch (IOException ex) {
       throw new FileNodeManagerException(ex);
     }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
index 248c2c0..2d07796 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
@@ -27,8 +27,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryDataSourceManager;
-import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
 import org.apache.iotdb.db.query.reader.AllDataReader;
@@ -48,10 +47,8 @@ import 
org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 public class EngineExecutorWithoutTimeGenerator {
 
   private QueryExpression queryExpression;
-  private long jobId;
 
-  public EngineExecutorWithoutTimeGenerator(long jobId, QueryExpression 
queryExpression) {
-    this.jobId = jobId;
+  public EngineExecutorWithoutTimeGenerator(QueryExpression queryExpression) {
     this.queryExpression = queryExpression;
   }
 
@@ -66,12 +63,12 @@ public class EngineExecutorWithoutTimeGenerator {
     List<IPointReader> readersOfSelectedSeries = new ArrayList<>();
     List<TSDataType> dataTypes = new ArrayList<>();
 
-    QueryTokenManager.getInstance()
-        .beginQueryOfGivenQueryPaths(jobId, 
queryExpression.getSelectedSeries());
+    QueryResourceManager.getInstance()
+        .beginQueryOfGivenQueryPaths(context.getJobId(), 
queryExpression.getSelectedSeries());
 
     for (Path path : queryExpression.getSelectedSeries()) {
 
-      QueryDataSource queryDataSource = 
QueryDataSourceManager.getQueryDataSource(jobId, path,
+      QueryDataSource queryDataSource = 
QueryResourceManager.getInstance().getQueryDataSource(path,
           context);
 
       // add data type
@@ -120,12 +117,12 @@ public class EngineExecutorWithoutTimeGenerator {
     List<IPointReader> readersOfSelectedSeries = new ArrayList<>();
     List<TSDataType> dataTypes = new ArrayList<>();
 
-    QueryTokenManager.getInstance()
-        .beginQueryOfGivenQueryPaths(jobId, 
queryExpression.getSelectedSeries());
+    QueryResourceManager.getInstance()
+        .beginQueryOfGivenQueryPaths(context.getJobId(), 
queryExpression.getSelectedSeries());
 
     for (Path path : queryExpression.getSelectedSeries()) {
 
-      QueryDataSource queryDataSource = 
QueryDataSourceManager.getQueryDataSource(jobId, path,
+      QueryDataSource queryDataSource = 
QueryResourceManager.getInstance().getQueryDataSource(path,
           context);
 
       // add data type
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index 2090074..96ad5ad 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -27,9 +27,6 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.OpenedFilePathsManager;
-import org.apache.iotdb.db.query.control.QuerySession;
-import org.apache.iotdb.db.query.control.QueryTokenManager;
 import 
org.apache.iotdb.db.query.executor.groupby.GroupByWithOnlyTimeFilterDataSetDataSet;
 import 
org.apache.iotdb.db.query.executor.groupby.GroupByWithValueFilterDataSetDataSet;
 import org.apache.iotdb.db.query.fill.IFill;
@@ -57,15 +54,9 @@ public class EngineQueryRouter {
   /**
    * execute physical plan.
    */
-  public QueryDataSet query(QueryExpression queryExpression)
+  public QueryDataSet query(QueryExpression queryExpression, QueryContext 
context)
       throws FileNodeManagerException {
 
-    long nextJobId = QuerySession.getCurrentThreadJobId();
-    QueryTokenManager.getInstance().addJobId(nextJobId);
-    OpenedFilePathsManager.getInstance().addJobId(nextJobId);
-
-    QueryContext context = new QueryContext();
-
     if (queryExpression.hasQueryFilter()) {
       try {
         IExpression optimizedExpression = ExpressionOptimizer.getInstance()
@@ -74,12 +65,10 @@ public class EngineQueryRouter {
 
         if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
           EngineExecutorWithoutTimeGenerator engineExecutor =
-              new EngineExecutorWithoutTimeGenerator(
-                  nextJobId, queryExpression);
+              new EngineExecutorWithoutTimeGenerator(queryExpression);
           return engineExecutor.executeWithGlobalTimeFilter(context);
         } else {
           EngineExecutorWithTimeGenerator engineExecutor = new 
EngineExecutorWithTimeGenerator(
-              nextJobId,
               queryExpression);
           return engineExecutor.execute(context);
         }
@@ -89,7 +78,6 @@ public class EngineQueryRouter {
       }
     } else {
       EngineExecutorWithoutTimeGenerator engineExecutor = new 
EngineExecutorWithoutTimeGenerator(
-          nextJobId,
           queryExpression);
       return engineExecutor.executeWithoutFilter(context);
     }
@@ -99,19 +87,13 @@ public class EngineQueryRouter {
    * execute aggregation query.
    */
   public QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres,
-      IExpression expression) throws QueryFilterOptimizationException, 
FileNodeManagerException,
-      IOException, PathErrorException, ProcessorException {
-
-    long nextJobId = QuerySession.getCurrentThreadJobId();
-    QueryTokenManager.getInstance().addJobId(nextJobId);
-    OpenedFilePathsManager.getInstance().addJobId(nextJobId);
-
-    QueryContext context = new QueryContext();
+      IExpression expression, QueryContext context) throws 
QueryFilterOptimizationException,
+      FileNodeManagerException, IOException, PathErrorException, 
ProcessorException {
 
     if (expression != null) {
       IExpression optimizedExpression = ExpressionOptimizer.getInstance()
           .optimize(expression, selectedSeries);
-      AggregateEngineExecutor engineExecutor = new 
AggregateEngineExecutor(nextJobId,
+      AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor(
           selectedSeries, aggres, optimizedExpression);
       if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
         return engineExecutor.executeWithOutTimeGenerator(context);
@@ -119,7 +101,7 @@ public class EngineQueryRouter {
         return engineExecutor.executeWithTimeGenerator(context);
       }
     } else {
-      AggregateEngineExecutor engineExecutor = new 
AggregateEngineExecutor(nextJobId,
+      AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor(
           selectedSeries, aggres, null);
       return engineExecutor.executeWithOutTimeGenerator(context);
     }
@@ -137,15 +119,12 @@ public class EngineQueryRouter {
    * @param intervals time intervals, closed interval.
    */
   public QueryDataSet groupBy(List<Path> selectedSeries, List<String> aggres,
-      IExpression expression, long unit, long origin, List<Pair<Long, Long>> 
intervals)
+      IExpression expression, long unit, long origin, List<Pair<Long, Long>> 
intervals,
+      QueryContext context)
       throws ProcessorException, QueryFilterOptimizationException, 
FileNodeManagerException,
       PathErrorException, IOException {
 
-    long nextJobId = QuerySession.getCurrentThreadJobId();
-    QueryTokenManager.getInstance().addJobId(nextJobId);
-    OpenedFilePathsManager.getInstance().addJobId(nextJobId);
-
-    QueryContext context = new QueryContext();
+    long nextJobId = context.getJobId();
 
     // check the legitimacy of intervals
     for (Pair<Long, Long> pair : intervals) {
@@ -205,13 +184,12 @@ public class EngineQueryRouter {
    * @param queryTime timestamp
    * @param fillType type IFill map
    */
-  public QueryDataSet fill(List<Path> fillPaths, long queryTime, 
Map<TSDataType, IFill> fillType)
+  public QueryDataSet fill(List<Path> fillPaths, long queryTime, 
Map<TSDataType, IFill> fillType,
+      QueryContext context)
       throws FileNodeManagerException, PathErrorException, IOException {
-    long nextJobId = QuerySession.getCurrentThreadJobId();
-    QueryTokenManager.getInstance().addJobId(nextJobId);
-    OpenedFilePathsManager.getInstance().addJobId(nextJobId);
 
-    QueryContext context = new QueryContext();
+    long nextJobId = context.getJobId();
+
     FillEngineExecutor fillEngineExecutor = new FillEngineExecutor(nextJobId, 
fillPaths, queryTime,
         fillType);
     return fillEngineExecutor.execute(context);
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
index a465bf7..83c5fa9 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
@@ -28,8 +28,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryDataSourceManager;
-import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
 import org.apache.iotdb.db.query.fill.IFill;
 import org.apache.iotdb.db.query.fill.PreviousFill;
@@ -60,13 +59,13 @@ public class FillEngineExecutor {
    */
   public QueryDataSet execute(QueryContext context)
       throws FileNodeManagerException, PathErrorException, IOException {
-    QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId, 
selectedSeries);
+    QueryResourceManager.getInstance().beginQueryOfGivenQueryPaths(jobId, 
selectedSeries);
 
     List<IFill> fillList = new ArrayList<>();
     List<TSDataType> dataTypeList = new ArrayList<>();
     for (Path path : selectedSeries) {
-      QueryDataSource queryDataSource = QueryDataSourceManager
-          .getQueryDataSource(jobId, path, context);
+      QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+          .getQueryDataSource(path, context);
       TSDataType dataType = 
MManager.getInstance().getSeriesType(path.getFullPath());
       dataTypeList.add(dataType);
       IFill fill = null;
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithOnlyTimeFilterDataSetDataSet.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithOnlyTimeFilterDataSetDataSet.java
index d94aa0d..e82b567 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithOnlyTimeFilterDataSetDataSet.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithOnlyTimeFilterDataSetDataSet.java
@@ -29,8 +29,7 @@ import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.query.aggregation.AggreResultData;
 import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryDataSourceManager;
-import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
 import org.apache.iotdb.db.query.reader.IAggregateReader;
 import org.apache.iotdb.db.query.reader.IPointReader;
@@ -78,13 +77,13 @@ public class GroupByWithOnlyTimeFilterDataSetDataSet 
extends GroupByEngineDataSe
       throws FileNodeManagerException, PathErrorException, ProcessorException, 
IOException {
     initAggreFuction(aggres);
     // init reader
-    QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId, 
selectedSeries);
+    QueryResourceManager.getInstance().beginQueryOfGivenQueryPaths(jobId, 
selectedSeries);
     if (expression != null) {
       timeFilter = ((GlobalTimeExpression) expression).getFilter();
     }
     for (int i = 0; i < selectedSeries.size(); i++) {
-      QueryDataSource queryDataSource = QueryDataSourceManager
-          .getQueryDataSource(jobId, selectedSeries.get(i), context);
+      QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+          .getQueryDataSource(selectedSeries.get(i), context);
 
       // sequence reader for sealed tsfile, unsealed tsfile, memory
       SequenceDataReader sequenceReader = new 
SequenceDataReader(queryDataSource.getSeqDataSource(),
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithValueFilterDataSetDataSet.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithValueFilterDataSetDataSet.java
index d995737..7a6b3d2 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithValueFilterDataSetDataSet.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/groupby/GroupByWithValueFilterDataSetDataSet.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
 import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
 import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
@@ -73,11 +73,12 @@ public class GroupByWithValueFilterDataSetDataSet extends 
GroupByEngineDataSet {
       throws FileNodeManagerException, PathErrorException, ProcessorException, 
IOException {
     initAggreFuction(aggres);
 
-    QueryTokenManager.getInstance().beginQueryOfGivenExpression(jobId, 
expression);
-    QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId, 
selectedSeries);
-    this.timestampGenerator = new EngineTimeGenerator(jobId, expression, 
context);
+    
QueryResourceManager.getInstance().beginQueryOfGivenExpression(context.getJobId(),
 expression);
+    QueryResourceManager
+        .getInstance().beginQueryOfGivenQueryPaths(context.getJobId(), 
selectedSeries);
+    this.timestampGenerator = new EngineTimeGenerator(expression, context);
     this.allDataReaderList = SeriesReaderFactory
-        .getByTimestampReadersOfSelectedPaths(jobId, selectedSeries, context);
+        .getByTimestampReadersOfSelectedPaths(selectedSeries, context);
   }
 
   @Override
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index 98b16db..a3c29ba 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -30,7 +30,7 @@ import 
org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.query.control.QueryDataSourceManager;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.reader.AllDataReader;
 import org.apache.iotdb.db.query.reader.IBatchReader;
 import org.apache.iotdb.db.query.reader.IPointReader;
@@ -187,19 +187,18 @@ public class SeriesReaderFactory {
   /**
    * construct ByTimestampReader, include sequential data and unsequential 
data.
    *
-   * @param jobId query jobId
    * @param paths selected series path
    * @param context query context
    * @return the list of EngineReaderByTimeStamp
    */
-  public static List<EngineReaderByTimeStamp> 
getByTimestampReadersOfSelectedPaths(long jobId,
+  public static List<EngineReaderByTimeStamp> 
getByTimestampReadersOfSelectedPaths(
       List<Path> paths, QueryContext context) throws IOException, 
FileNodeManagerException {
 
     List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
 
     for (Path path : paths) {
 
-      QueryDataSource queryDataSource = 
QueryDataSourceManager.getQueryDataSource(jobId, path,
+      QueryDataSource queryDataSource = 
QueryResourceManager.getInstance().getQueryDataSource(path,
           context);
 
       PriorityMergeReaderByTimestamp mergeReaderByTimestamp = new 
PriorityMergeReaderByTimestamp();
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
index c5aa51f..4ffe62b 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
@@ -27,7 +27,7 @@ import java.io.IOException;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryDataSourceManager;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
 import org.apache.iotdb.db.query.reader.AllDataReader;
 import org.apache.iotdb.db.query.reader.IReader;
@@ -44,10 +44,8 @@ import 
org.apache.iotdb.tsfile.read.query.timegenerator.node.OrNode;
 
 public class EngineNodeConstructor {
 
-  private long jobId;
 
-  public EngineNodeConstructor(long jobId) {
-    this.jobId = jobId;
+  public EngineNodeConstructor() {
   }
 
   /**
@@ -89,7 +87,7 @@ public class EngineNodeConstructor {
       QueryContext context)
       throws IOException, FileNodeManagerException {
 
-    QueryDataSource queryDataSource = 
QueryDataSourceManager.getQueryDataSource(jobId,
+    QueryDataSource queryDataSource = 
QueryResourceManager.getInstance().getQueryDataSource(
         singleSeriesExpression.getSeriesPath(), context);
 
     Filter filter = singleSeriesExpression.getFilter();
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
index 897a7d1..350ea6f 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
@@ -34,20 +34,18 @@ public class EngineTimeGenerator implements TimeGenerator {
 
   private IExpression expression;
   private Node operatorNode;
-  private long jobId;
 
   /**
    * Constructor of EngineTimeGenerator.
    */
-  public EngineTimeGenerator(long jobId, IExpression expression, QueryContext 
context)
+  public EngineTimeGenerator(IExpression expression, QueryContext context)
       throws FileNodeManagerException {
-    this.jobId = jobId;
     this.expression = expression;
     initNode(context);
   }
 
   private void initNode(QueryContext context) throws FileNodeManagerException {
-    EngineNodeConstructor engineNodeConstructor = new 
EngineNodeConstructor(jobId);
+    EngineNodeConstructor engineNodeConstructor = new EngineNodeConstructor();
     this.operatorNode = engineNodeConstructor.construct(expression, context);
   }
 
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/query/control/QueryTokenManagerTest.java
 b/iotdb/src/main/java/org/apache/iotdb/db/rescon/package-info.java
similarity index 81%
copy from 
iotdb/src/test/java/org/apache/iotdb/db/query/control/QueryTokenManagerTest.java
copy to iotdb/src/main/java/org/apache/iotdb/db/rescon/package-info.java
index 3ac1540..96c83c9 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/query/control/QueryTokenManagerTest.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/rescon/package-info.java
@@ -1,29 +1,24 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.query.control;
-
-import org.junit.Test;
-
-public class QueryTokenManagerTest {
-
-  @Test
-  public void test() {
-    //TODO
-  }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * rescon means Resource Control, classes in this package provide global 
control over various
+ * resources shared in IoTDB.
+ */
+package org.apache.iotdb.db.rescon;
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java 
b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index c63ec74..d85f056 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -105,7 +105,6 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(CloseMergeService.getInstance());
     registerManager.register(StatMonitor.getInstance());
     registerManager.register(BasicMemController.getInstance());
-    registerManager.register(FileReaderManager.getInstance());
     registerManager.register(SyncServerManager.getInstance());
 
     JMXService.registerMBean(getInstance(), mbeanName);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java 
b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index fca74d1..ad3c855 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -49,8 +49,8 @@ import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
-import org.apache.iotdb.db.query.control.QuerySession;
-import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.service.rpc.thrift.ServerProperties;
 import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
 import org.apache.iotdb.service.rpc.thrift.TSCancelOperationResp;
@@ -104,6 +104,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
   private ThreadLocal<HashMap<String, QueryDataSet>> queryRet = new 
ThreadLocal<>();
   private ThreadLocal<ZoneId> zoneIds = new ThreadLocal<>();
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private QueryContext context;
 
   public TSServiceImpl() throws IOException {
     // do nothing because there is no need
@@ -185,7 +186,9 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     LOGGER.info("{}: receive close operation", IoTDBConstant.GLOBAL_DB_NAME);
     try {
       // end query for all the query tokens created by current thread
-      
QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId());
+      if(context != null) {
+        
QueryResourceManager.getInstance().endQueryForGivenJob(context.getJobId());
+      }
 
       clearAllStatusForCurrentRequest();
     } catch (FileNodeManagerException e) {
@@ -590,7 +593,11 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       if (!queryRet.get().containsKey(statement)) {
         PhysicalPlan physicalPlan = queryStatus.get().get(statement);
         processor.getExecutor().setFetchSize(fetchSize);
-        queryDataSet = processor.getExecutor().processQuery((QueryPlan) 
physicalPlan);
+
+        context = new QueryContext();
+        context.setJobId(QueryResourceManager.getInstance().assignJobId());
+
+        queryDataSet = processor.getExecutor().processQuery((QueryPlan) 
physicalPlan, context);
         queryRet.get().put(statement, queryDataSet);
       } else {
         queryDataSet = queryRet.get().get(statement);
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
index 35bfc84..03d3e57 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.engine.modification;
 
 import static junit.framework.TestCase.assertTrue;
+import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
+import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID;
 import static org.junit.Assert.assertEquals;
 
 import java.io.File;
@@ -35,8 +37,9 @@ import 
org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.MetadataArgsErrorException;
 import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -67,7 +70,9 @@ public class DeletionFileNodeTest {
 
   @Before
   public void setup() throws MetadataArgsErrorException,
-      PathErrorException, IOException, FileNodeManagerException {
+      PathErrorException, IOException, FileNodeManagerException, 
StartupException {
+    EnvironmentUtils.envSetUp();
+
     MManager.getInstance().setStorageLevelToMTree(processorName);
     for (int i = 0; i < 10; i++) {
       MManager.getInstance().addPathToMTree(processorName + "." + 
measurements[i], dataType,
@@ -103,8 +108,10 @@ public class DeletionFileNodeTest {
 
     SingleSeriesExpression expression = new SingleSeriesExpression(new 
Path(processorName,
         measurements[5]), null);
-    QueryContext context = new QueryContext();
-    QueryDataSource dataSource = 
FileNodeManager.getInstance().query(expression, context);
+    
QueryResourceManager.getInstance().beginQueryOfGivenExpression(TEST_QUERY_JOB_ID,
 expression);
+    QueryDataSource dataSource = QueryResourceManager.getInstance()
+        .getQueryDataSource(expression.getSeriesPath(), TEST_QUERY_CONTEXT);
+
     Iterator<TimeValuePair> timeValuePairs =
         dataSource.getSeqDataSource().getReadableChunk().getIterator();
     int count = 0;
@@ -113,6 +120,7 @@ public class DeletionFileNodeTest {
       count++;
     }
     assertEquals(50, count);
+    QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID);
   }
 
   @Test
@@ -185,8 +193,11 @@ public class DeletionFileNodeTest {
 
     SingleSeriesExpression expression = new SingleSeriesExpression(new 
Path(processorName,
         measurements[5]), null);
-    QueryContext context = new QueryContext();
-    QueryDataSource dataSource = 
FileNodeManager.getInstance().query(expression, context);
+
+    
QueryResourceManager.getInstance().beginQueryOfGivenExpression(TEST_QUERY_JOB_ID,
 expression);
+    QueryDataSource dataSource = QueryResourceManager.getInstance()
+        .getQueryDataSource(expression.getSeriesPath(), TEST_QUERY_CONTEXT);
+
     Iterator<TimeValuePair> timeValuePairs =
         
dataSource.getOverflowSeriesDataSource().getReadableMemChunk().getIterator();
     int count = 0;
@@ -195,6 +206,8 @@ public class DeletionFileNodeTest {
       count++;
     }
     assertEquals(50, count);
+
+    QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID);
   }
 
   @Test
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
index c3e7dc3..3e9bb99 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.engine.modification;
 
+import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
@@ -30,6 +31,7 @@ import 
org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.MetadataArgsErrorException;
 import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.query.executor.EngineQueryRouter;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -63,7 +65,9 @@ public class DeletionQueryTest {
 
   @Before
   public void setup() throws MetadataArgsErrorException,
-      PathErrorException, IOException, FileNodeManagerException {
+      PathErrorException, IOException, FileNodeManagerException, 
StartupException {
+    EnvironmentUtils.envSetUp();
+
     MManager.getInstance().setStorageLevelToMTree(processorName);
     for (int i = 0; i < 10; i++) {
       MManager.getInstance().addPathToMTree(processorName + "." + 
measurements[i], dataType,
@@ -103,7 +107,7 @@ public class DeletionQueryTest {
     pathList.add(new Path(processorName, measurements[5]));
 
     QueryExpression queryExpression = QueryExpression.create(pathList, null);
-    QueryDataSet dataSet = router.query(queryExpression);
+    QueryDataSet dataSet = router.query(queryExpression, TEST_QUERY_CONTEXT);
 
     int count = 0;
     while (dataSet.hasNext()) {
@@ -134,7 +138,7 @@ public class DeletionQueryTest {
     pathList.add(new Path(processorName, measurements[5]));
 
     QueryExpression queryExpression = QueryExpression.create(pathList, null);
-    QueryDataSet dataSet = router.query(queryExpression);
+    QueryDataSet dataSet = router.query(queryExpression, TEST_QUERY_CONTEXT);
 
     int count = 0;
     while (dataSet.hasNext()) {
@@ -176,7 +180,7 @@ public class DeletionQueryTest {
     pathList.add(new Path(processorName, measurements[5]));
 
     QueryExpression queryExpression = QueryExpression.create(pathList, null);
-    QueryDataSet dataSet = router.query(queryExpression);
+    QueryDataSet dataSet = router.query(queryExpression, TEST_QUERY_CONTEXT);
 
     int count = 0;
     while (dataSet.hasNext()) {
@@ -218,7 +222,7 @@ public class DeletionQueryTest {
     pathList.add(new Path(processorName, measurements[5]));
 
     QueryExpression queryExpression = QueryExpression.create(pathList, null);
-    QueryDataSet dataSet = router.query(queryExpression);
+    QueryDataSet dataSet = router.query(queryExpression, TEST_QUERY_CONTEXT);
 
     int count = 0;
     while (dataSet.hasNext()) {
@@ -282,7 +286,7 @@ public class DeletionQueryTest {
     pathList.add(new Path(processorName, measurements[5]));
 
     QueryExpression queryExpression = QueryExpression.create(pathList, null);
-    QueryDataSet dataSet = router.query(queryExpression);
+    QueryDataSet dataSet = router.query(queryExpression, TEST_QUERY_CONTEXT);
 
     int count = 0;
     while (dataSet.hasNext()) {
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
index 82f7c23..e172971 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.integration;
 
+import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -29,7 +30,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.OpenedFilePathsManager;
+import org.apache.iotdb.db.query.control.JobFileManager;
 import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -197,10 +198,8 @@ public class IoTDBEngineTimeGeneratorIT {
 
     SingleSeriesExpression singleSeriesExpression = new 
SingleSeriesExpression(pd0s0,
         FilterFactory.and(valueGtEq, timeGt));
-    OpenedFilePathsManager.getInstance().addJobId(0);
-    QueryContext context = new QueryContext();
-    EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, 
singleSeriesExpression,
-        context);
+    EngineTimeGenerator timeGenerator = new 
EngineTimeGenerator(singleSeriesExpression,
+        TEST_QUERY_CONTEXT);
 
     int cnt = 0;
     while (timeGenerator.hasNext()) {
@@ -222,11 +221,9 @@ public class IoTDBEngineTimeGeneratorIT {
     Path pd1s0 = new Path(Constant.d1s0);
     ValueFilter.ValueGtEq valueGtEq = ValueFilter.gtEq(5);
 
-    OpenedFilePathsManager.getInstance().addJobId(0);
     IExpression singleSeriesExpression = new SingleSeriesExpression(pd1s0, 
valueGtEq);
-    QueryContext context = new QueryContext();
-    EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, 
singleSeriesExpression,
-        context);
+    EngineTimeGenerator timeGenerator = new 
EngineTimeGenerator(singleSeriesExpression,
+        TEST_QUERY_CONTEXT);
 
     int cnt = 0;
     while (timeGenerator.hasNext()) {
@@ -258,9 +255,8 @@ public class IoTDBEngineTimeGeneratorIT {
     IExpression andExpression = BinaryExpression
         .and(singleSeriesExpression1, singleSeriesExpression2);
 
-    OpenedFilePathsManager.getInstance().addJobId(0);
-    QueryContext context = new QueryContext();
-    EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, 
andExpression, context);
+    EngineTimeGenerator timeGenerator = new EngineTimeGenerator(andExpression,
+        TEST_QUERY_CONTEXT);
     int cnt = 0;
     while (timeGenerator.hasNext()) {
       long time = timeGenerator.next();
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java
index d7c091e..a4b95a3 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.integration;
 
+import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
+import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -27,8 +29,8 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
-import org.apache.iotdb.db.query.control.QuerySession;
-import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.executor.EngineQueryRouter;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -179,7 +181,9 @@ public class IoTDBSequenceDataQueryIT {
     queryExpression.addSelectedPath(new Path(Constant.d1s1));
     queryExpression.setExpression(null);
 
-    QueryDataSet queryDataSet = engineExecutor.query(queryExpression);
+    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
+    TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
+    QueryDataSet queryDataSet = engineExecutor.query(queryExpression, 
TEST_QUERY_CONTEXT);
 
     int cnt = 0;
     while (queryDataSet.hasNext()) {
@@ -189,7 +193,7 @@ public class IoTDBSequenceDataQueryIT {
     }
     assertEquals(1000, cnt);
 
-    
QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId());
+    QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID);
   }
 
   @Test
@@ -202,7 +206,9 @@ public class IoTDBSequenceDataQueryIT {
 
     GlobalTimeExpression globalTimeExpression = new 
GlobalTimeExpression(TimeFilter.gtEq(800L));
     queryExpression.setExpression(globalTimeExpression);
-    QueryDataSet queryDataSet = engineExecutor.query(queryExpression);
+    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
+    TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
+    QueryDataSet queryDataSet = engineExecutor.query(queryExpression, 
TEST_QUERY_CONTEXT);
 
     int cnt = 0;
     while (queryDataSet.hasNext()) {
@@ -215,7 +221,7 @@ public class IoTDBSequenceDataQueryIT {
     }
     assertEquals(350, cnt);
 
-    
QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId());
+    QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID);
   }
 
   @Test
@@ -236,7 +242,9 @@ public class IoTDBSequenceDataQueryIT {
         ValueFilter.gtEq(14));
     queryExpression.setExpression(singleSeriesExpression);
 
-    QueryDataSet queryDataSet = engineExecutor.query(queryExpression);
+    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
+    TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
+    QueryDataSet queryDataSet = engineExecutor.query(queryExpression, 
TEST_QUERY_CONTEXT);
 
     int cnt = 0;
     while (queryDataSet.hasNext()) {
@@ -246,7 +254,7 @@ public class IoTDBSequenceDataQueryIT {
     }
     assertEquals(count, cnt);
 
-    
QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId());
+    QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID);
   }
 
 }
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
index 1c991ae..6b8d413 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.integration;
 
+import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
+import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
@@ -29,8 +31,8 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
-import org.apache.iotdb.db.query.control.QuerySession;
-import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.executor.EngineQueryRouter;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -254,7 +256,9 @@ public class IoTDBSeriesReaderIT {
     queryExpression.addSelectedPath(new Path(Constant.d1s1));
     queryExpression.setExpression(null);
 
-    QueryDataSet queryDataSet = engineExecutor.query(queryExpression);
+    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
+    TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
+    QueryDataSet queryDataSet = engineExecutor.query(queryExpression, 
TEST_QUERY_CONTEXT);
 
     int cnt = 0;
     while (queryDataSet.hasNext()) {
@@ -263,7 +267,7 @@ public class IoTDBSeriesReaderIT {
     }
     assertEquals(23400, cnt);
 
-    
QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId());
+    QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID);
   }
 
   @Test
@@ -280,7 +284,9 @@ public class IoTDBSeriesReaderIT {
         ValueFilter.gtEq(20));
     queryExpression.setExpression(singleSeriesExpression);
 
-    QueryDataSet queryDataSet = engineExecutor.query(queryExpression);
+    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
+    TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
+    QueryDataSet queryDataSet = engineExecutor.query(queryExpression, 
TEST_QUERY_CONTEXT);
 
     int cnt = 0;
     while (queryDataSet.hasNext()) {
@@ -291,7 +297,7 @@ public class IoTDBSeriesReaderIT {
     }
     assertEquals(16440, cnt);
 
-    
QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId());
+    QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID);
   }
 
   @Test
@@ -306,7 +312,9 @@ public class IoTDBSeriesReaderIT {
     SingleSeriesExpression expression = new SingleSeriesExpression(path, 
TimeFilter.gt(22987L));
     queryExpression.setExpression(expression);
 
-    QueryDataSet queryDataSet = engineExecutor.query(queryExpression);
+    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
+    TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
+    QueryDataSet queryDataSet = engineExecutor.query(queryExpression, 
TEST_QUERY_CONTEXT);
 
     int cnt = 0;
     while (queryDataSet.hasNext()) {
@@ -317,7 +325,7 @@ public class IoTDBSeriesReaderIT {
     }
     assertEquals(3012, cnt);
 
-    
QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId());
+    QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID);
   }
 
   @Test
@@ -333,7 +341,9 @@ public class IoTDBSeriesReaderIT {
         ValueFilter.lt(111));
     queryExpression.setExpression(singleSeriesExpression);
 
-    QueryDataSet queryDataSet = engineExecutor.query(queryExpression);
+    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
+    TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
+    QueryDataSet queryDataSet = engineExecutor.query(queryExpression, 
TEST_QUERY_CONTEXT);
 
     int cnt = 0;
     while (queryDataSet.hasNext()) {
@@ -344,7 +354,7 @@ public class IoTDBSeriesReaderIT {
     }
     assertEquals(22800, cnt);
 
-    
QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId());
+    QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID);
   }
 
   @Test
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java 
b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java
index 42e279d..7caaa59 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.plan;
 
+import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -123,7 +124,8 @@ public class QPUpdateTest {
     // query to assert
     sqlStr = "select sensor_1,sensor_2 from root.qp_update_test.device_1";
     PhysicalPlan plan2 = processor.parseSQLToPhysicalPlan(sqlStr);
-    QueryDataSet queryDataSet = 
processor.getExecutor().processQuery((QueryPlan) plan2);
+    QueryDataSet queryDataSet = 
processor.getExecutor().processQuery((QueryPlan) plan2,
+        TEST_QUERY_CONTEXT);
     String[] expect = {"10     33000   null", "20      null    10"};
     int i = 0;
     while (queryDataSet.hasNext()) {
@@ -145,7 +147,8 @@ public class QPUpdateTest {
     PhysicalPlan plan2 = processor.parseSQLToPhysicalPlan(sqlStr);
     // 
RecordReaderFactory.getInstance().removeRecordReader("root.qp_update_test.device_1",
 "sensor_1");
     // 
RecordReaderFactory.getInstance().removeRecordReader("root.qp_update_test.device_1",
 "sensor_2");
-    QueryDataSet queryDataSet = 
processor.getExecutor().processQuery((QueryPlan) plan2);
+    QueryDataSet queryDataSet = 
processor.getExecutor().processQuery((QueryPlan) plan2,
+        TEST_QUERY_CONTEXT);
 
     String[] expect = {"20     null    10"};
     int i = 0;
@@ -168,7 +171,8 @@ public class QPUpdateTest {
     PhysicalPlan plan2 = processor.parseSQLToPhysicalPlan(sqlStr);
     // 
RecordReaderFactory.getInstance().removeRecordReader("root.qp_update_test.device_1",
 "sensor_1");
     // 
RecordReaderFactory.getInstance().removeRecordReader("root.qp_update_test.device_1",
 "sensor_2");
-    QueryDataSet queryDataSet = 
processor.getExecutor().processQuery((QueryPlan) plan2);
+    QueryDataSet queryDataSet = 
processor.getExecutor().processQuery((QueryPlan) plan2,
+        TEST_QUERY_CONTEXT);
 
     String[] expect = {"20     null    10"};
     int i = 0;
@@ -193,7 +197,8 @@ public class QPUpdateTest {
     // query to assert
     sqlStr = "select sensor_1,sensor_2 from root.qp_update_test.device_1";
     PhysicalPlan plan2 = processor.parseSQLToPhysicalPlan(sqlStr);
-    QueryDataSet queryDataSet = 
processor.getExecutor().processQuery((QueryPlan) plan2);
+    QueryDataSet queryDataSet = 
processor.getExecutor().processQuery((QueryPlan) plan2,
+        TEST_QUERY_CONTEXT);
 
     String[] expect = {"13     50      40", "20        null    10"};
     int i = 0;
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java 
b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
index 07983b3..1446cb1 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.fill.IFill;
 import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -108,7 +109,8 @@ public class MemIntQpExecutor extends QueryProcessExecutor {
   }
 
   @Override
-  public QueryDataSet aggregate(List<Path> paths, List<String> aggres, 
IExpression expression)
+  public QueryDataSet aggregate(List<Path> paths, List<String> aggres, 
IExpression expression,
+      QueryContext context)
       throws ProcessorException, IOException, PathErrorException, 
FileNodeManagerException,
       QueryFilterOptimizationException {
     return null;
@@ -116,14 +118,15 @@ public class MemIntQpExecutor extends 
QueryProcessExecutor {
 
   @Override
   public QueryDataSet groupBy(List<Path> paths, List<String> aggres, 
IExpression expression,
-      long unit, long origin, List<Pair<Long, Long>> intervals)
+      long unit, long origin, List<Pair<Long, Long>> intervals, QueryContext 
context)
       throws ProcessorException, IOException, PathErrorException, 
FileNodeManagerException,
       QueryFilterOptimizationException {
     return null;
   }
 
   @Override
-  public QueryDataSet fill(List<Path> fillPaths, long queryTime, 
Map<TSDataType, IFill> fillTypes)
+  public QueryDataSet fill(List<Path> fillPaths, long queryTime, 
Map<TSDataType, IFill> fillTypes,
+      QueryContext context)
       throws ProcessorException, IOException, PathErrorException, 
FileNodeManagerException {
     return null;
   }
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
index 81d6f30..970489d 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.query.control;
 
+import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID;
 import static org.junit.Assert.fail;
 
 import java.io.File;
@@ -54,6 +55,7 @@ public class FileReaderManagerTest {
     String filePath = "target/test.file";
 
     FileReaderManager manager = FileReaderManager.getInstance();
+    JobFileManager testManager = new JobFileManager();
 
     for (int i = 1; i <= MAX_FILE_SIZE; i++) {
       File file = new File(filePath + i);
@@ -62,10 +64,10 @@ public class FileReaderManagerTest {
 
     Thread t1 = new Thread(() -> {
       try {
-        OpenedFilePathsManager.getInstance().addJobId(1L);
+        testManager.addJobId(1L);
 
         for (int i = 1; i <= 6; i++) {
-          OpenedFilePathsManager.getInstance().addFilePathToMap(1L, filePath + 
i,
+          testManager.addFilePathToMap(1L, filePath + i,
               false);
           manager.get(filePath + i, false);
           Assert.assertTrue(manager.contains(filePath + i, false));
@@ -80,10 +82,10 @@ public class FileReaderManagerTest {
 
     Thread t2 = new Thread(() -> {
       try {
-        OpenedFilePathsManager.getInstance().addJobId(2L);
+        testManager.addJobId(2L);
 
         for (int i = 4; i <= MAX_FILE_SIZE; i++) {
-          OpenedFilePathsManager.getInstance().addFilePathToMap(2L, filePath + 
i,
+          testManager.addFilePathToMap(2L, filePath + i,
               false);
           manager.get(filePath + i, false);
           Assert.assertTrue(manager.contains(filePath + i, false));
@@ -120,7 +122,6 @@ public class FileReaderManagerTest {
     // }
     // }
 
-    
OpenedFilePathsManager.getInstance().removeUsedFilesForGivenJob(QuerySession.getCurrentThreadJobId());
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
     for (int i = 1; i < MAX_FILE_SIZE; i++) {
       File file = new File(filePath + i);
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/query/control/QueryTokenManagerTest.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/query/control/QueryResourceManagerTest.java
similarity index 93%
rename from 
iotdb/src/test/java/org/apache/iotdb/db/query/control/QueryTokenManagerTest.java
rename to 
iotdb/src/test/java/org/apache/iotdb/db/query/control/QueryResourceManagerTest.java
index 3ac1540..930ab6f 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/query/control/QueryTokenManagerTest.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/query/control/QueryResourceManagerTest.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.query.control;
 
 import org.junit.Test;
 
-public class QueryTokenManagerTest {
+public class QueryResourceManagerTest {
 
   @Test
   public void test() {
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java 
b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index ad5a007..d4d9c63 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -34,9 +34,9 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.monitor.StatMonitor;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.query.control.QuerySession;
-import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -59,9 +59,12 @@ public class EnvironmentUtils {
   private static Directories directories = Directories.getInstance();
   private static TSFileConfig tsfileConfig = 
TSFileDescriptor.getInstance().getConfig();
 
+  public static long TEST_QUERY_JOB_ID = 
QueryResourceManager.getInstance().assignJobId();
+  public static QueryContext TEST_QUERY_CONTEXT = new 
QueryContext(TEST_QUERY_JOB_ID);
+
   public static void cleanEnv() throws IOException, FileNodeManagerException {
 
-    
QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId());
+    QueryResourceManager.getInstance().endQueryForGivenJob(TEST_QUERY_JOB_ID);
 
     // clear opened file streams
     FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
@@ -169,5 +172,7 @@ public class EnvironmentUtils {
     }
     FileNodeManager.getInstance().resetFileNodeManager();
     MultiFileLogNodeManager.getInstance().start();
+    TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
+    TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
   }
 }

Reply via email to