This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rel_0.12_debug_compaction_stop
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel_0.12_debug_compaction_stop 
by this push:
     new 66a3f40  try to fix
66a3f40 is described below

commit 66a3f40915fbf43f1db2301e43a36dce4ff799c8
Author: LebronAl <[email protected]>
AuthorDate: Wed Nov 10 18:28:31 2021 +0800

    try to fix
---
 .../apache/iotdb/cluster/server/ClientServer.java  |  6 +++--
 .../cluster/server/service/DataSyncService.java    | 10 ++++++--
 .../iotdb/db/query/control/FileReaderManager.java  | 29 +++++++++++-----------
 .../iotdb/db/query/control/QueryFileManager.java   |  9 ++++---
 .../db/query/control/FileReaderManagerTest.java    |  4 +--
 5 files changed, 34 insertions(+), 24 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index 107624f..8582c3b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@ -292,7 +292,7 @@ public class ClientServer extends TSServiceImpl {
   protected QueryContext genQueryContext(long queryId, boolean debug) {
     RemoteQueryContext context = new RemoteQueryContext(queryId, debug);
     queryContextMap.put(queryId, context);
-    logger.warn("regist queryid {}", queryId);
+    logger.warn("regist remote queryid {}", queryId);
     return context;
   }
 
@@ -309,7 +309,7 @@ public class ClientServer extends TSServiceImpl {
     // release resources remotely
     RemoteQueryContext context = queryContextMap.remove(queryId);
     if (context != null) {
-      logger.warn("release queryid {}", queryId);
+      logger.warn("release remote queryid {}", queryId);
       // release the resources in every queried node
       for (Entry<Node, Set<Node>> headerEntry : 
context.getQueriedNodesMap().entrySet()) {
         Node header = headerEntry.getKey();
@@ -341,6 +341,8 @@ public class ClientServer extends TSServiceImpl {
           }
         }
       }
+    } else {
+      logger.warn("release remote queryid null {}", queryId);
     }
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index 8ff3c35..c3903b7 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -196,7 +196,10 @@ public class DataSyncService extends BaseSyncService 
implements TSDataService.If
   @Override
   public long querySingleSeries(SingleSeriesQueryRequest request) throws 
TException {
     try {
-      return 
dataGroupMember.getLocalQueryExecutor().querySingleSeries(request);
+      long result = 
dataGroupMember.getLocalQueryExecutor().querySingleSeries(request);
+      logger.error(
+          "querySingleSeriesByTimestamp: queryId: {}, readerId: {}", 
request.getQueryId(), result);
+      return result;
     } catch (Exception e) {
       throw new TException(e);
     }
@@ -214,7 +217,10 @@ public class DataSyncService extends BaseSyncService 
implements TSDataService.If
   @Override
   public long querySingleSeriesByTimestamp(SingleSeriesQueryRequest request) 
throws TException {
     try {
-      return 
dataGroupMember.getLocalQueryExecutor().querySingleSeriesByTimestamp(request);
+      long result = 
dataGroupMember.getLocalQueryExecutor().querySingleSeriesByTimestamp(request);
+      logger.error(
+          "querySingleSeriesByTimestamp: queryId: {}, readerId: {}", 
request.getQueryId(), result);
+      return result;
     } catch (Exception e) {
       throw new TException(e);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java 
b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index 3fc199f..15463e3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -32,12 +32,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * FileReaderManager is a singleton, which is used to manage all file 
readers(opened file streams)
@@ -66,12 +67,12 @@ public class FileReaderManager implements IService {
    * the key of closedFileReaderMap is the file path and the value of 
closedFileReaderMap is the
    * file's reference count.
    */
-  private Map<String, AtomicInteger> closedReferenceMap;
+  private Map<String, Set<Long>> closedReferenceMap;
   /**
    * the key of unclosedFileReaderMap is the file path and the value of 
unclosedFileReaderMap is the
    * file's reference count.
    */
-  private Map<String, AtomicInteger> unclosedReferenceMap;
+  private Map<String, Set<Long>> unclosedReferenceMap;
 
   private ScheduledExecutorService executorService;
 
@@ -119,14 +120,14 @@ public class FileReaderManager implements IService {
   }
 
   private void clearMap(
-      Map<String, TsFileSequenceReader> readerMap, Map<String, AtomicInteger> 
refMap) {
+      Map<String, TsFileSequenceReader> readerMap, Map<String, Set<Long>> 
refMap) {
     Iterator<Map.Entry<String, TsFileSequenceReader>> iterator = 
readerMap.entrySet().iterator();
     while (iterator.hasNext()) {
       Map.Entry<String, TsFileSequenceReader> entry = iterator.next();
       TsFileSequenceReader reader = entry.getValue();
-      AtomicInteger refAtom = refMap.get(entry.getKey());
+      Set<Long> refAtom = refMap.get(entry.getKey());
 
-      if (refAtom != null && refAtom.get() == 0) {
+      if (refAtom != null && refAtom.size() == 0) {
         try {
           reader.close();
         } catch (IOException e) {
@@ -191,17 +192,17 @@ public class FileReaderManager implements IService {
    * Increase the reference count of the reader specified by filePath. Only 
when the reference count
    * of a reader equals zero, the reader can be closed and removed.
    */
-  void increaseFileReaderReference(TsFileResource tsFile, boolean isClosed) {
+  void increaseFileReaderReference(TsFileResource tsFile, boolean isClosed, 
long queryId) {
     tsFile.readLock("fileReaderReference");
     synchronized (this) {
       if (!isClosed) {
         unclosedReferenceMap
-            .computeIfAbsent(tsFile.getTsFilePath(), k -> new AtomicInteger())
-            .getAndIncrement();
+            .computeIfAbsent(tsFile.getTsFilePath(), k -> new HashSet<>())
+            .add(queryId);
       } else {
         closedReferenceMap
-            .computeIfAbsent(tsFile.getTsFilePath(), k -> new AtomicInteger())
-            .getAndIncrement();
+            .computeIfAbsent(tsFile.getTsFilePath(), k -> new HashSet<>())
+            .add(queryId);
       }
     }
   }
@@ -210,12 +211,12 @@ public class FileReaderManager implements IService {
    * Decrease the reference count of the reader specified by filePath. This 
method is latch-free.
    * Only when the reference count of a reader equals zero, the reader can be 
closed and removed.
    */
-  void decreaseFileReaderReference(TsFileResource tsFile, boolean isClosed) {
+  void decreaseFileReaderReference(TsFileResource tsFile, boolean isClosed, 
long queryId) {
     synchronized (this) {
       if (!isClosed && 
unclosedReferenceMap.containsKey(tsFile.getTsFilePath())) {
-        unclosedReferenceMap.get(tsFile.getTsFilePath()).decrementAndGet();
+        unclosedReferenceMap.get(tsFile.getTsFilePath()).remove(queryId);
       } else if (closedReferenceMap.containsKey(tsFile.getTsFilePath())) {
-        closedReferenceMap.get(tsFile.getTsFilePath()).decrementAndGet();
+        closedReferenceMap.get(tsFile.getTsFilePath()).remove(queryId);
       }
     }
     tsFile.readUnlock("fileReaderReference");
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java 
b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
index d75bbe3..b7604d6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
@@ -77,7 +77,8 @@ public class QueryFileManager {
             !isClosed ? unsealedFilePathsMap : sealedFilePathsMap;
         // This resource may be removed by other threads of this query.
         if (pathMap.get(queryId).remove(tsFileResource)) {
-          
FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, 
isClosed);
+          FileReaderManager.getInstance()
+              .decreaseFileReaderReference(tsFileResource, isClosed, queryId);
         }
         iterator.remove();
       }
@@ -94,7 +95,7 @@ public class QueryFileManager {
         queryId,
         (k, v) -> {
           for (TsFileResource tsFile : v) {
-            
FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
+            
FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true, 
queryId);
           }
           return null;
         });
@@ -102,7 +103,7 @@ public class QueryFileManager {
         queryId,
         (k, v) -> {
           for (TsFileResource tsFile : v) {
-            
FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, false);
+            
FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, false, 
queryId);
           }
           return null;
         });
@@ -119,7 +120,7 @@ public class QueryFileManager {
     // TODO this is not an atomic operation, is there concurrent problem?
     if (!pathMap.get(queryId).contains(tsFile)) {
       pathMap.get(queryId).add(tsFile);
-      FileReaderManager.getInstance().increaseFileReaderReference(tsFile, 
isClosed);
+      FileReaderManager.getInstance().increaseFileReaderReference(tsFile, 
isClosed, queryId);
     }
   }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
index ea5b75b..5b6cfd5 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
@@ -88,7 +88,7 @@ public class FileReaderManagerTest {
                 }
                 for (int i = 1; i <= 6; i++) {
                   TsFileResource tsFile = tsFileResources[i];
-                  manager.decreaseFileReaderReference(tsFile, false);
+                  manager.decreaseFileReaderReference(tsFile, false, 1L);
                 }
 
               } catch (IOException e) {
@@ -111,7 +111,7 @@ public class FileReaderManagerTest {
                 }
                 for (int i = 4; i <= MAX_FILE_SIZE; i++) {
                   TsFileResource tsFile = tsFileResources[i];
-                  manager.decreaseFileReaderReference(tsFile, false);
+                  manager.decreaseFileReaderReference(tsFile, false, 2L);
                 }
 
               } catch (IOException e) {

Reply via email to