This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch threadlocal_for_query in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 291f361bbc07a4688e025b0e31cd305dd7de0f10 Author: xiangdong huang <[email protected]> AuthorDate: Fri Mar 29 01:01:56 2019 +0800 refactor QueryTokenManager: move threadlocal as QuerySession --- .../db/query/control/OpenedFilePathsManager.java | 18 ++---- .../db/query/control/QueryDataSourceManager.java | 2 +- .../iotdb/db/query/control/QuerySession.java | 73 ++++++++++++++++++++++ .../iotdb/db/query/control/QueryTokenManager.java | 31 ++++----- .../iotdb/db/query/executor/EngineQueryRouter.java | 38 +++++------ .../org/apache/iotdb/db/service/TSServiceImpl.java | 7 +-- .../db/integration/IoTDBEngineTimeGeneratorIT.java | 6 +- .../db/integration/IoTDBSequenceDataQueryIT.java | 7 ++- .../iotdb/db/integration/IoTDBSeriesReaderIT.java | 9 +-- .../db/query/control/FileReaderManagerTest.java | 6 +- .../apache/iotdb/db/utils/EnvironmentUtils.java | 3 +- 11 files changed, 124 insertions(+), 76 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java index e6ffba3..409b369 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java @@ -32,10 +32,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; */ public class OpenedFilePathsManager { - /** - * Each jdbc request has an unique jod id, job id is stored in thread local variable jobIdContainer. - */ - private ThreadLocal<Long> jobIdContainer; + /** * Map<jobId, Set<filePaths>> @@ -44,7 +41,6 @@ public class OpenedFilePathsManager { private ConcurrentHashMap<Long, Set<String>> unclosedFilePathsMap; private OpenedFilePathsManager() { - jobIdContainer = new ThreadLocal<>(); closedFilePathsMap = new ConcurrentHashMap<>(); unclosedFilePathsMap = new ConcurrentHashMap<>(); } @@ -56,8 +52,7 @@ public class OpenedFilePathsManager { /** * Set job id for current request thread. When a query request is created firstly, this method must be invoked. */ - public void setJobIdForCurrentRequestThread(long jobId) { - jobIdContainer.set(jobId); + public void addJobId(long jobId) { closedFilePathsMap.put(jobId, new HashSet<>()); unclosedFilePathsMap.put(jobId, new HashSet<>()); } @@ -65,7 +60,7 @@ public class OpenedFilePathsManager { /** * Add the unique file paths to closedFilePathsMap and unclosedFilePathsMap. */ - void addUsedFilesForCurrentRequestThread(long jobId, QueryDataSource dataSource) { + public void addUsedFilesForGivenJob(long jobId, QueryDataSource dataSource) { for (TsFileResource tsFileResource : dataSource.getSeqDataSource().getSealedTsFiles()) { String sealedFilePath = tsFileResource.getFilePath(); addFilePathToMap(jobId, sealedFilePath, true); @@ -88,11 +83,7 @@ public class OpenedFilePathsManager { * Whenever the jdbc request is closed normally or abnormally, this method must be invoked. All file paths used by * this jdbc request must be cleared and thus the usage reference must be decreased. */ - public void removeUsedFilesForCurrentRequestThread() { - if (jobIdContainer.get() != null) { - long jobId = jobIdContainer.get(); - jobIdContainer.remove(); - + public void removeUsedFilesForGivenJob(long jobId) { for (String filePath : closedFilePathsMap.get(jobId)) { FileReaderManager.getInstance().decreaseFileReaderReference(filePath, false); } @@ -101,7 +92,6 @@ public class OpenedFilePathsManager { FileReaderManager.getInstance().decreaseFileReaderReference(filePath, true); } unclosedFilePathsMap.remove(jobId); - } } /** diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java index f3fd1f3..add4e19 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java @@ -46,7 +46,7 @@ public class QueryDataSourceManager { // add used files to current thread request cached map OpenedFilePathsManager.getInstance() - .addUsedFilesForCurrentRequestThread(jobId, queryDataSource); + .addUsedFilesForGivenJob(jobId, queryDataSource); return queryDataSource; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java new file mode 100644 index 0000000..20415c6 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.control; + +import java.util.concurrent.atomic.AtomicLong; + +public class QuerySession { + /** + * Each jdbc request has an unique jod id, job id is stored in thread local variable jobIdContainer. + */ + private ThreadLocal<Long> jobId; + + /** + * Each unique jdbc request(query, aggregation or others job) has an unique job id. This job id + * will always be maintained until the request is closed. In each job, the unique file will be + * only opened once to avoid too many opened files error. + */ + private AtomicLong jobIdGenerator = new AtomicLong(); + + private QuerySession() { + this.jobId = new ThreadLocal<Long>(){ + @Override + protected Long initialValue() { + super.initialValue(); + long id = jobIdGenerator.incrementAndGet(); + OpenedFilePathsManager.getInstance().addJobId(id); + QueryTokenManager.getInstance().addJobId(id); + return id; + } + }; + } + + public long getJobId() { + return jobId.get(); + } + + public static long getCurrentThreadJobId() { + return QuerySessionHelper.INSTANCE.jobId.get(); + } + + private static class QuerySessionHelper { + + private static final QuerySession INSTANCE = new QuerySession(); + + private QuerySessionHelper() { + } + } + + public static QuerySession getCurrentThreadQuerySession() { + return QuerySessionHelper.INSTANCE; + } + + + + +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java index 50a2cb4..44ee763 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java @@ -36,18 +36,13 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; * <p> * Singleton pattern, to manage all query tokens. Each jdbc query request can query multiple series, * in the processing of querying different device id, the <code>FileNodeManager.getInstance(). - * beginQuery</code> and <code>FileNodeManager.getInstance().endQuery</code> must be invoked in the + * beginQuery</code> and <code>FileNodeManager.getInstance().endQueryForGivenJob</code> must be invoked in the * beginning and ending of jdbc request. * </p> */ public class QueryTokenManager { /** - * Each jdbc request has unique jod id, job id is stored in thread local variable jobContainer. - */ - private ThreadLocal<Long> jobContainer; - - /** * Map<jobId, Map<deviceId, List<token>>>. * * <p> @@ -72,21 +67,21 @@ public class QueryTokenManager { * <code>FileNodeManager.getInstance().beginQuery(device_2)</code> will be invoked again, it * returns result token `3` and `4` . * - * <code>FileNodeManager.getInstance().endQuery(device_1, 1)</code> and - * <code>FileNodeManager.getInstance().endQuery(device_2, 2)</code> must be invoked no matter how + * <code>FileNodeManager.getInstance().endQueryForGivenJob(device_1, 1)</code> and + * <code>FileNodeManager.getInstance().endQueryForGivenJob(device_2, 2)</code> must be invoked no matter how * query process Q1 exits normally or abnormally. So is Q2, - * <code>FileNodeManager.getInstance().endQuery(device_1, 3)</code> and - * <code>FileNodeManager.getInstance().endQuery(device_2, 4)</code> must be invoked + * <code>FileNodeManager.getInstance().endQueryForGivenJob(device_1, 3)</code> and + * <code>FileNodeManager.getInstance().endQueryForGivenJob(device_2, 4)</code> must be invoked * * Last but no least, to ensure the correctness of write process and query process of IoTDB, * <code>FileNodeManager.getInstance().beginQuery()</code> and - * <code>FileNodeManager.getInstance().endQuery()</code> must be executed rightly. + * <code>FileNodeManager.getInstance().endQueryForGivenJob()</code> must be executed rightly. * </p> */ private ConcurrentHashMap<Long, ConcurrentHashMap<String, List<Integer>>> queryTokensMap; + private QueryTokenManager() { - jobContainer = new ThreadLocal<>(); queryTokensMap = new ConcurrentHashMap<>(); } @@ -98,8 +93,7 @@ public class QueryTokenManager { * Set job id for current request thread. When a query request is created firstly, this method * must be invoked. */ - public void setJobIdForCurrentRequestThread(long jobId) { - jobContainer.set(jobId); + public void addJobId(long jobId) { queryTokensMap.put(jobId, new ConcurrentHashMap<>()); } @@ -136,18 +130,15 @@ public class QueryTokenManager { * Whenever the jdbc request is closed normally or abnormally, this method must be invoked. All * query tokens created by this jdbc request must be cleared. */ - public void endQueryForCurrentRequestThread() throws FileNodeManagerException { - if (jobContainer.get() != null) { - long jobId = jobContainer.get(); - jobContainer.remove(); - + public void endQueryForGivenJob(long jobId) throws FileNodeManagerException { for (Map.Entry<String, List<Integer>> entry : queryTokensMap.get(jobId).entrySet()) { for (int token : entry.getValue()) { FileNodeManager.getInstance().endQuery(entry.getKey(), token); } } queryTokensMap.remove(jobId); - } + // remove usage of opened file paths of current thread + OpenedFilePathsManager.getInstance().removeUsedFilesForGivenJob(jobId); } private void getUniquePaths(IExpression expression, Set<String> deviceIdSet) { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java index 2e3ce45..2090074 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java @@ -23,12 +23,12 @@ import java.io.IOException; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.ProcessorException; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.OpenedFilePathsManager; +import org.apache.iotdb.db.query.control.QuerySession; import org.apache.iotdb.db.query.control.QueryTokenManager; import org.apache.iotdb.db.query.executor.groupby.GroupByWithOnlyTimeFilterDataSetDataSet; import org.apache.iotdb.db.query.executor.groupby.GroupByWithValueFilterDataSetDataSet; @@ -52,12 +52,7 @@ import org.apache.iotdb.tsfile.utils.Pair; */ public class EngineQueryRouter { - /** - * Each unique jdbc request(query, aggregation or others job) has an unique job id. This job id - * will always be maintained until the request is closed. In each job, the unique file will be - * only opened once to avoid too many opened files error. - */ - private AtomicLong jobIdGenerator = new AtomicLong(); + /** * execute physical plan. @@ -65,9 +60,9 @@ public class EngineQueryRouter { public QueryDataSet query(QueryExpression queryExpression) throws FileNodeManagerException { - long nextJobId = getNextJobId(); - QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(nextJobId); - OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(nextJobId); + long nextJobId = QuerySession.getCurrentThreadJobId(); + QueryTokenManager.getInstance().addJobId(nextJobId); + OpenedFilePathsManager.getInstance().addJobId(nextJobId); QueryContext context = new QueryContext(); @@ -107,9 +102,9 @@ public class EngineQueryRouter { IExpression expression) throws QueryFilterOptimizationException, FileNodeManagerException, IOException, PathErrorException, ProcessorException { - long nextJobId = getNextJobId(); - QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(nextJobId); - OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(nextJobId); + long nextJobId = QuerySession.getCurrentThreadJobId(); + QueryTokenManager.getInstance().addJobId(nextJobId); + OpenedFilePathsManager.getInstance().addJobId(nextJobId); QueryContext context = new QueryContext(); @@ -146,9 +141,10 @@ public class EngineQueryRouter { throws ProcessorException, QueryFilterOptimizationException, FileNodeManagerException, PathErrorException, IOException { - long nextJobId = getNextJobId(); - QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(nextJobId); - OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(nextJobId); + long nextJobId = QuerySession.getCurrentThreadJobId(); + QueryTokenManager.getInstance().addJobId(nextJobId); + OpenedFilePathsManager.getInstance().addJobId(nextJobId); + QueryContext context = new QueryContext(); // check the legitimacy of intervals @@ -211,9 +207,9 @@ public class EngineQueryRouter { */ public QueryDataSet fill(List<Path> fillPaths, long queryTime, Map<TSDataType, IFill> fillType) throws FileNodeManagerException, PathErrorException, IOException { - long nextJobId = getNextJobId(); - QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(nextJobId); - OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(nextJobId); + long nextJobId = QuerySession.getCurrentThreadJobId(); + QueryTokenManager.getInstance().addJobId(nextJobId); + OpenedFilePathsManager.getInstance().addJobId(nextJobId); QueryContext context = new QueryContext(); FillEngineExecutor fillEngineExecutor = new FillEngineExecutor(nextJobId, fillPaths, queryTime, @@ -244,7 +240,5 @@ public class EngineQueryRouter { return merged; } - private synchronized long getNextJobId() { - return jobIdGenerator.incrementAndGet(); - } + } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 834dde3..c4877d8 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -49,7 +49,7 @@ import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; -import org.apache.iotdb.db.query.control.OpenedFilePathsManager; +import org.apache.iotdb.db.query.control.QuerySession; import org.apache.iotdb.db.query.control.QueryTokenManager; import org.apache.iotdb.service.rpc.thrift.ServerProperties; import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq; @@ -185,10 +185,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { LOGGER.info("{}: receive close operation", IoTDBConstant.GLOBAL_DB_NAME); try { // end query for all the query tokens created by current thread - QueryTokenManager.getInstance().endQueryForCurrentRequestThread(); - - // remove usage of opened file paths of current thread - OpenedFilePathsManager.getInstance().removeUsedFilesForCurrentRequestThread(); + QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId()); clearAllStatusForCurrentRequest(); } catch (FileNodeManagerException e) { diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java index 2a45c29..82f7c23 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java @@ -197,7 +197,7 @@ public class IoTDBEngineTimeGeneratorIT { SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(pd0s0, FilterFactory.and(valueGtEq, timeGt)); - OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(0); + OpenedFilePathsManager.getInstance().addJobId(0); QueryContext context = new QueryContext(); EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, singleSeriesExpression, context); @@ -222,7 +222,7 @@ public class IoTDBEngineTimeGeneratorIT { Path pd1s0 = new Path(Constant.d1s0); ValueFilter.ValueGtEq valueGtEq = ValueFilter.gtEq(5); - OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(0); + OpenedFilePathsManager.getInstance().addJobId(0); IExpression singleSeriesExpression = new SingleSeriesExpression(pd1s0, valueGtEq); QueryContext context = new QueryContext(); EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, singleSeriesExpression, @@ -258,7 +258,7 @@ public class IoTDBEngineTimeGeneratorIT { IExpression andExpression = BinaryExpression .and(singleSeriesExpression1, singleSeriesExpression2); - OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(0); + OpenedFilePathsManager.getInstance().addJobId(0); QueryContext context = new QueryContext(); EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, andExpression, context); int cnt = 0; diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java index b910437..d7c091e 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java @@ -27,6 +27,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import org.apache.iotdb.db.exception.FileNodeManagerException; +import org.apache.iotdb.db.query.control.QuerySession; import org.apache.iotdb.db.query.control.QueryTokenManager; import org.apache.iotdb.db.query.executor.EngineQueryRouter; import org.apache.iotdb.db.service.IoTDB; @@ -188,7 +189,7 @@ public class IoTDBSequenceDataQueryIT { } assertEquals(1000, cnt); - QueryTokenManager.getInstance().endQueryForCurrentRequestThread(); + QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId()); } @Test @@ -214,7 +215,7 @@ public class IoTDBSequenceDataQueryIT { } assertEquals(350, cnt); - QueryTokenManager.getInstance().endQueryForCurrentRequestThread(); + QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId()); } @Test @@ -245,7 +246,7 @@ public class IoTDBSequenceDataQueryIT { } assertEquals(count, cnt); - QueryTokenManager.getInstance().endQueryForCurrentRequestThread(); + QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId()); } } diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java index 387310b..1c991ae 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java @@ -29,6 +29,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import org.apache.iotdb.db.exception.FileNodeManagerException; +import org.apache.iotdb.db.query.control.QuerySession; import org.apache.iotdb.db.query.control.QueryTokenManager; import org.apache.iotdb.db.query.executor.EngineQueryRouter; import org.apache.iotdb.db.service.IoTDB; @@ -262,7 +263,7 @@ public class IoTDBSeriesReaderIT { } assertEquals(23400, cnt); - QueryTokenManager.getInstance().endQueryForCurrentRequestThread(); + QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId()); } @Test @@ -290,7 +291,7 @@ public class IoTDBSeriesReaderIT { } assertEquals(16440, cnt); - QueryTokenManager.getInstance().endQueryForCurrentRequestThread(); + QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId()); } @Test @@ -316,7 +317,7 @@ public class IoTDBSeriesReaderIT { } assertEquals(3012, cnt); - QueryTokenManager.getInstance().endQueryForCurrentRequestThread(); + QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId()); } @Test @@ -343,7 +344,7 @@ public class IoTDBSeriesReaderIT { } assertEquals(22800, cnt); - QueryTokenManager.getInstance().endQueryForCurrentRequestThread(); + QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId()); } @Test diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java index fe17a1a..81d6f30 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java @@ -62,7 +62,7 @@ public class FileReaderManagerTest { Thread t1 = new Thread(() -> { try { - OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(1L); + OpenedFilePathsManager.getInstance().addJobId(1L); for (int i = 1; i <= 6; i++) { OpenedFilePathsManager.getInstance().addFilePathToMap(1L, filePath + i, @@ -80,7 +80,7 @@ public class FileReaderManagerTest { Thread t2 = new Thread(() -> { try { - OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(2L); + OpenedFilePathsManager.getInstance().addJobId(2L); for (int i = 4; i <= MAX_FILE_SIZE; i++) { OpenedFilePathsManager.getInstance().addFilePathToMap(2L, filePath + i, @@ -120,7 +120,7 @@ public class FileReaderManagerTest { // } // } - OpenedFilePathsManager.getInstance().removeUsedFilesForCurrentRequestThread(); + OpenedFilePathsManager.getInstance().removeUsedFilesForGivenJob(QuerySession.getCurrentThreadJobId()); FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders(); for (int i = 1; i < MAX_FILE_SIZE; i++) { File file = new File(filePath + i); diff --git a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java index e45fe1f..ad5a007 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java @@ -35,6 +35,7 @@ import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.monitor.StatMonitor; import org.apache.iotdb.db.query.control.FileReaderManager; +import org.apache.iotdb.db.query.control.QuerySession; import org.apache.iotdb.db.query.control.QueryTokenManager; import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; @@ -60,7 +61,7 @@ public class EnvironmentUtils { public static void cleanEnv() throws IOException, FileNodeManagerException { - QueryTokenManager.getInstance().endQueryForCurrentRequestThread(); + QueryTokenManager.getInstance().endQueryForGivenJob(QuerySession.getCurrentThreadJobId()); // clear opened file streams FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
