This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch threadlocal_for_query
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/threadlocal_for_query by this 
push:
     new 65ab1e6  fix multiply remove jobId
65ab1e6 is described below

commit 65ab1e6763c1de41f47840b7b16a6cfe5108da26
Author: xiangdong huang <[email protected]>
AuthorDate: Fri Mar 29 14:33:11 2019 +0800

    fix multiply remove jobId
---
 .../db/query/control/OpenedFilePathsManager.java   | 34 +++++++++++-----------
 .../iotdb/db/query/control/QuerySession.java       |  4 +--
 .../iotdb/db/query/control/QueryTokenManager.java  |  8 +++--
 3 files changed, 24 insertions(+), 22 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
index 409b369..62d806c 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/OpenedFilePathsManager.java
@@ -37,12 +37,12 @@ public class OpenedFilePathsManager {
   /**
    * Map<jobId, Set<filePaths>>
    */
-  private ConcurrentHashMap<Long, Set<String>> closedFilePathsMap;
-  private ConcurrentHashMap<Long, Set<String>> unclosedFilePathsMap;
+  private ConcurrentHashMap<Long, Set<String>> sealedFilePathsMap;
+  private ConcurrentHashMap<Long, Set<String>> unsealedFilePathsMap;
 
   private OpenedFilePathsManager() {
-    closedFilePathsMap = new ConcurrentHashMap<>();
-    unclosedFilePathsMap = new ConcurrentHashMap<>();
+    sealedFilePathsMap = new ConcurrentHashMap<>();
+    unsealedFilePathsMap = new ConcurrentHashMap<>();
   }
 
   public static OpenedFilePathsManager getInstance() {
@@ -53,12 +53,12 @@ public class OpenedFilePathsManager {
    * Set job id for current request thread. When a query request is created 
firstly, this method must be invoked.
    */
   public void addJobId(long jobId) {
-    closedFilePathsMap.put(jobId, new HashSet<>());
-    unclosedFilePathsMap.put(jobId, new HashSet<>());
+    sealedFilePathsMap.computeIfAbsent(jobId, x -> new HashSet<>());
+    unsealedFilePathsMap.computeIfAbsent(jobId, x -> new HashSet<>());
   }
 
   /**
-   * Add the unique file paths to closedFilePathsMap and unclosedFilePathsMap.
+   * Add the unique file paths to sealedFilePathsMap and unsealedFilePathsMap.
    */
   public void addUsedFilesForGivenJob(long jobId, QueryDataSource dataSource) {
     for (TsFileResource tsFileResource : 
dataSource.getSeqDataSource().getSealedTsFiles()) {
@@ -83,30 +83,30 @@ public class OpenedFilePathsManager {
    * Whenever the jdbc request is closed normally or abnormally, this method 
must be invoked. All file paths used by
    * this jdbc request must be cleared and thus the usage reference must be 
decreased.
    */
-  public void removeUsedFilesForGivenJob(long jobId) {
-      for (String filePath : closedFilePathsMap.get(jobId)) {
+  void removeUsedFilesForGivenJob(long jobId) {
+      for (String filePath : sealedFilePathsMap.get(jobId)) {
         FileReaderManager.getInstance().decreaseFileReaderReference(filePath, 
false);
       }
-      closedFilePathsMap.remove(jobId);
-      for (String filePath : unclosedFilePathsMap.get(jobId)) {
+      sealedFilePathsMap.remove(jobId);
+      for (String filePath : unsealedFilePathsMap.get(jobId)) {
         FileReaderManager.getInstance().decreaseFileReaderReference(filePath, 
true);
       }
-      unclosedFilePathsMap.remove(jobId);
+      unsealedFilePathsMap.remove(jobId);
   }
 
   /**
    * Increase the usage reference of filePath of job id. Before the invoking 
of this method,
    * <code>this.setJobIdForCurrentRequestThread</code> has been invoked,
-   * so <code>closedFilePathsMap.get(jobId)</code> or 
<code>unclosedFilePathsMap.get(jobId)</code>
+   * so <code>sealedFilePathsMap.get(jobId)</code> or 
<code>unsealedFilePathsMap.get(jobId)</code>
    * must not return null.
    */
-  void addFilePathToMap(long jobId, String filePath, boolean isClosed) {
-    ConcurrentHashMap<Long, Set<String>> pathMap = !isClosed ? 
unclosedFilePathsMap :
-        closedFilePathsMap;
+  void addFilePathToMap(long jobId, String filePath, boolean isSealed) {
+    ConcurrentHashMap<Long, Set<String>> pathMap = !isSealed ? 
unsealedFilePathsMap :
+        sealedFilePathsMap;
     //TODO this is not an atomic operation, is there concurrent problem?
     if (!pathMap.get(jobId).contains(filePath)) {
       pathMap.get(jobId).add(filePath);
-      FileReaderManager.getInstance().increaseFileReaderReference(filePath, 
isClosed);
+      FileReaderManager.getInstance().increaseFileReaderReference(filePath, 
isSealed);
     }
   }
 
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java
index 20415c6..3151745 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QuerySession.java
@@ -32,7 +32,7 @@ public class QuerySession {
    * will always be maintained until the request is closed. In each job, the 
unique file will be
    * only opened once to avoid too many opened files error.
    */
-  private AtomicLong jobIdGenerator = new AtomicLong();
+  private static AtomicLong jobIdGenerator = new AtomicLong();
 
   private QuerySession() {
     this.jobId = new ThreadLocal<Long>(){
@@ -40,8 +40,6 @@ public class QuerySession {
       protected Long initialValue() {
         super.initialValue();
         long id = jobIdGenerator.incrementAndGet();
-        OpenedFilePathsManager.getInstance().addJobId(id);
-        QueryTokenManager.getInstance().addJobId(id);
         return id;
       }
     };
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java
index 44ee763..7a566e8 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryTokenManager.java
@@ -94,7 +94,7 @@ public class QueryTokenManager {
    * must be invoked.
    */
   public void addJobId(long jobId) {
-    queryTokensMap.put(jobId, new ConcurrentHashMap<>());
+    queryTokensMap.computeIfAbsent(jobId, x -> new ConcurrentHashMap<>());
   }
 
   /**
@@ -131,6 +131,10 @@ public class QueryTokenManager {
    * query tokens created by this jdbc request must be cleared.
    */
   public void endQueryForGivenJob(long jobId) throws FileNodeManagerException {
+    if (queryTokensMap.get(jobId) == null) {
+      // no resource need to be released.
+      return;
+    }
       for (Map.Entry<String, List<Integer>> entry : 
queryTokensMap.get(jobId).entrySet()) {
         for (int token : entry.getValue()) {
           FileNodeManager.getInstance().endQuery(entry.getKey(), token);
@@ -152,7 +156,7 @@ public class QueryTokenManager {
   }
 
   private void putQueryTokenForCurrentRequestThread(long jobId, String 
deviceId, int queryToken) {
-    queryTokensMap.get(jobId).computeIfPresent(deviceId, (x, y) -> new 
ArrayList<>()).add(queryToken);
+    queryTokensMap.get(jobId).computeIfAbsent(deviceId, x -> new 
ArrayList<>()).add(queryToken);
   }
 
   private static class QueryTokenManagerHelper {

Reply via email to