This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch threadlocal_for_query
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/threadlocal_for_query by this
push:
new 65ab1e6 fix multiply remove jobId
65ab1e6 is described below
commit 65ab1e6763c1de41f47840b7b16a6cfe5108da26
Author: xiangdong huang <[email protected]>
AuthorDate: Fri Mar 29 14:33:11 2019 +0800
fix multiply remove jobId
---
.../db/query/control/OpenedFilePathsManager.java | 34 +++++++++++-----------
.../iotdb/db/query/control/QuerySession.java | 4 +--
.../iotdb/db/query/control/QueryTokenManager.java | 8 +++--
3 files changed, 24 insertions(+), 22 deletions(-)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
index 409b369..62d806c 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
@@ -37,12 +37,12 @@ public class OpenedFilePathsManager {
/**
* Map<jobId, Set<filePaths>>
*/
- private ConcurrentHashMap<Long, Set<String>> closedFilePathsMap;
- private ConcurrentHashMap<Long, Set<String>> unclosedFilePathsMap;
+ private ConcurrentHashMap<Long, Set<String>> sealedFilePathsMap;
+ private ConcurrentHashMap<Long, Set<String>> unsealedFilePathsMap;
private OpenedFilePathsManager() {
- closedFilePathsMap = new ConcurrentHashMap<>();
- unclosedFilePathsMap = new ConcurrentHashMap<>();
+ sealedFilePathsMap = new ConcurrentHashMap<>();
+ unsealedFilePathsMap = new ConcurrentHashMap<>();
}
public static OpenedFilePathsManager getInstance() {
@@ -53,12 +53,12 @@ public class OpenedFilePathsManager {
* Set job id for current request thread. When a query request is created
firstly, this method must be invoked.
*/
public void addJobId(long jobId) {
- closedFilePathsMap.put(jobId, new HashSet<>());
- unclosedFilePathsMap.put(jobId, new HashSet<>());
+ sealedFilePathsMap.computeIfAbsent(jobId, x -> new HashSet<>());
+ unsealedFilePathsMap.computeIfAbsent(jobId, x -> new HashSet<>());
}
/**
- * Add the unique file paths to closedFilePathsMap and unclosedFilePathsMap.
+ * Add the unique file paths to sealedFilePathsMap and unsealedFilePathsMap.
*/
public void addUsedFilesForGivenJob(long jobId, QueryDataSource dataSource) {
for (TsFileResource tsFileResource :
dataSource.getSeqDataSource().getSealedTsFiles()) {
@@ -83,30 +83,30 @@ public class OpenedFilePathsManager {
* Whenever the jdbc request is closed normally or abnormally, this method
must be invoked. All file paths used by
* this jdbc request must be cleared and thus the usage reference must be
decreased.
*/
- public void removeUsedFilesForGivenJob(long jobId) {
- for (String filePath : closedFilePathsMap.get(jobId)) {
+ void removeUsedFilesForGivenJob(long jobId) {
+ for (String filePath : sealedFilePathsMap.get(jobId)) {
FileReaderManager.getInstance().decreaseFileReaderReference(filePath,
false);
}
- closedFilePathsMap.remove(jobId);
- for (String filePath : unclosedFilePathsMap.get(jobId)) {
+ sealedFilePathsMap.remove(jobId);
+ for (String filePath : unsealedFilePathsMap.get(jobId)) {
FileReaderManager.getInstance().decreaseFileReaderReference(filePath,
true);
}
- unclosedFilePathsMap.remove(jobId);
+ unsealedFilePathsMap.remove(jobId);
}
/**
* Increase the usage reference of filePath of job id. Before the invoking
of this method,
* <code>this.setJobIdForCurrentRequestThread</code> has been invoked,
- * so <code>closedFilePathsMap.get(jobId)</code> or
<code>unclosedFilePathsMap.get(jobId)</code>
+ * so <code>sealedFilePathsMap.get(jobId)</code> or
<code>unsealedFilePathsMap.get(jobId)</code>
* must not return null.
*/
- void addFilePathToMap(long jobId, String filePath, boolean isClosed) {
- ConcurrentHashMap<Long, Set<String>> pathMap = !isClosed ?
unclosedFilePathsMap :
- closedFilePathsMap;
+ void addFilePathToMap(long jobId, String filePath, boolean isSealed) {
+ ConcurrentHashMap<Long, Set<String>> pathMap = !isSealed ?
unsealedFilePathsMap :
+ sealedFilePathsMap;
//TODO this is not an atomic operation, is there concurrent problem?
if (!pathMap.get(jobId).contains(filePath)) {
pathMap.get(jobId).add(filePath);
- FileReaderManager.getInstance().increaseFileReaderReference(filePath,
isClosed);
+ FileReaderManager.getInstance().increaseFileReaderReference(filePath,
isSealed);
}
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java
index 20415c6..3151745 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java
@@ -32,7 +32,7 @@ public class QuerySession {
* will always be maintained until the request is closed. In each job, the
unique file will be
* only opened once to avoid too many opened files error.
*/
- private AtomicLong jobIdGenerator = new AtomicLong();
+ private static AtomicLong jobIdGenerator = new AtomicLong();
private QuerySession() {
this.jobId = new ThreadLocal<Long>(){
@@ -40,8 +40,6 @@ public class QuerySession {
protected Long initialValue() {
super.initialValue();
long id = jobIdGenerator.incrementAndGet();
- OpenedFilePathsManager.getInstance().addJobId(id);
- QueryTokenManager.getInstance().addJobId(id);
return id;
}
};
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java
index 44ee763..7a566e8 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java
@@ -94,7 +94,7 @@ public class QueryTokenManager {
* must be invoked.
*/
public void addJobId(long jobId) {
- queryTokensMap.put(jobId, new ConcurrentHashMap<>());
+ queryTokensMap.computeIfAbsent(jobId, x -> new ConcurrentHashMap<>());
}
/**
@@ -131,6 +131,10 @@ public class QueryTokenManager {
* query tokens created by this jdbc request must be cleared.
*/
public void endQueryForGivenJob(long jobId) throws FileNodeManagerException {
+ if (queryTokensMap.get(jobId) == null) {
+ // no resource need to be released.
+ return;
+ }
for (Map.Entry<String, List<Integer>> entry :
queryTokensMap.get(jobId).entrySet()) {
for (int token : entry.getValue()) {
FileNodeManager.getInstance().endQuery(entry.getKey(), token);
@@ -152,7 +156,7 @@ public class QueryTokenManager {
}
private void putQueryTokenForCurrentRequestThread(long jobId, String
deviceId, int queryToken) {
- queryTokensMap.get(jobId).computeIfPresent(deviceId, (x, y) -> new
ArrayList<>()).add(queryToken);
+ queryTokensMap.get(jobId).computeIfAbsent(deviceId, x -> new
ArrayList<>()).add(queryToken);
}
private static class QueryTokenManagerHelper {