This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rel_0.12_debug_compaction_stop
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel_0.12_debug_compaction_stop
by this push:
new 66a3f40 try to fix
66a3f40 is described below
commit 66a3f40915fbf43f1db2301e43a36dce4ff799c8
Author: LebronAl <[email protected]>
AuthorDate: Wed Nov 10 18:28:31 2021 +0800
try to fix
---
.../apache/iotdb/cluster/server/ClientServer.java | 6 +++--
.../cluster/server/service/DataSyncService.java | 10 ++++++--
.../iotdb/db/query/control/FileReaderManager.java | 29 +++++++++++-----------
.../iotdb/db/query/control/QueryFileManager.java | 9 ++++---
.../db/query/control/FileReaderManagerTest.java | 4 +--
5 files changed, 34 insertions(+), 24 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index 107624f..8582c3b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@ -292,7 +292,7 @@ public class ClientServer extends TSServiceImpl {
protected QueryContext genQueryContext(long queryId, boolean debug) {
RemoteQueryContext context = new RemoteQueryContext(queryId, debug);
queryContextMap.put(queryId, context);
- logger.warn("regist queryid {}", queryId);
+ logger.warn("regist remote queryid {}", queryId);
return context;
}
@@ -309,7 +309,7 @@ public class ClientServer extends TSServiceImpl {
// release resources remotely
RemoteQueryContext context = queryContextMap.remove(queryId);
if (context != null) {
- logger.warn("release queryid {}", queryId);
+ logger.warn("release remote queryid {}", queryId);
// release the resources in every queried node
for (Entry<Node, Set<Node>> headerEntry :
context.getQueriedNodesMap().entrySet()) {
Node header = headerEntry.getKey();
@@ -341,6 +341,8 @@ public class ClientServer extends TSServiceImpl {
}
}
}
+ } else {
+ logger.warn("release remote queryid null {}", queryId);
}
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index 8ff3c35..c3903b7 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -196,7 +196,10 @@ public class DataSyncService extends BaseSyncService
implements TSDataService.If
@Override
public long querySingleSeries(SingleSeriesQueryRequest request) throws
TException {
try {
- return
dataGroupMember.getLocalQueryExecutor().querySingleSeries(request);
+ long result =
dataGroupMember.getLocalQueryExecutor().querySingleSeries(request);
+ logger.error(
+ "querySingleSeriesByTimestamp: queryId: {}, readerId: {}",
request.getQueryId(), result);
+ return result;
} catch (Exception e) {
throw new TException(e);
}
@@ -214,7 +217,10 @@ public class DataSyncService extends BaseSyncService
implements TSDataService.If
@Override
public long querySingleSeriesByTimestamp(SingleSeriesQueryRequest request)
throws TException {
try {
- return
dataGroupMember.getLocalQueryExecutor().querySingleSeriesByTimestamp(request);
+ long result =
dataGroupMember.getLocalQueryExecutor().querySingleSeriesByTimestamp(request);
+ logger.error(
+ "querySingleSeriesByTimestamp: queryId: {}, readerId: {}",
request.getQueryId(), result);
+ return result;
} catch (Exception e) {
throw new TException(e);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index 3fc199f..15463e3 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -32,12 +32,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* FileReaderManager is a singleton, which is used to manage all file
readers(opened file streams)
@@ -66,12 +67,12 @@ public class FileReaderManager implements IService {
* the key of closedFileReaderMap is the file path and the value of
closedFileReaderMap is the
* file's reference count.
*/
- private Map<String, AtomicInteger> closedReferenceMap;
+ private Map<String, Set<Long>> closedReferenceMap;
/**
* the key of unclosedFileReaderMap is the file path and the value of
unclosedFileReaderMap is the
* file's reference count.
*/
- private Map<String, AtomicInteger> unclosedReferenceMap;
+ private Map<String, Set<Long>> unclosedReferenceMap;
private ScheduledExecutorService executorService;
@@ -119,14 +120,14 @@ public class FileReaderManager implements IService {
}
private void clearMap(
- Map<String, TsFileSequenceReader> readerMap, Map<String, AtomicInteger>
refMap) {
+ Map<String, TsFileSequenceReader> readerMap, Map<String, Set<Long>>
refMap) {
Iterator<Map.Entry<String, TsFileSequenceReader>> iterator =
readerMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, TsFileSequenceReader> entry = iterator.next();
TsFileSequenceReader reader = entry.getValue();
- AtomicInteger refAtom = refMap.get(entry.getKey());
+ Set<Long> refAtom = refMap.get(entry.getKey());
- if (refAtom != null && refAtom.get() == 0) {
+ if (refAtom != null && refAtom.size() == 0) {
try {
reader.close();
} catch (IOException e) {
@@ -191,17 +192,17 @@ public class FileReaderManager implements IService {
* Increase the reference count of the reader specified by filePath. Only
when the reference count
* of a reader equals zero, the reader can be closed and removed.
*/
- void increaseFileReaderReference(TsFileResource tsFile, boolean isClosed) {
+ void increaseFileReaderReference(TsFileResource tsFile, boolean isClosed,
long queryId) {
tsFile.readLock("fileReaderReference");
synchronized (this) {
if (!isClosed) {
unclosedReferenceMap
- .computeIfAbsent(tsFile.getTsFilePath(), k -> new AtomicInteger())
- .getAndIncrement();
+ .computeIfAbsent(tsFile.getTsFilePath(), k -> new HashSet<>())
+ .add(queryId);
} else {
closedReferenceMap
- .computeIfAbsent(tsFile.getTsFilePath(), k -> new AtomicInteger())
- .getAndIncrement();
+ .computeIfAbsent(tsFile.getTsFilePath(), k -> new HashSet<>())
+ .add(queryId);
}
}
}
@@ -210,12 +211,12 @@ public class FileReaderManager implements IService {
* Decrease the reference count of the reader specified by filePath. This
method is latch-free.
* Only when the reference count of a reader equals zero, the reader can be
closed and removed.
*/
- void decreaseFileReaderReference(TsFileResource tsFile, boolean isClosed) {
+ void decreaseFileReaderReference(TsFileResource tsFile, boolean isClosed,
long queryId) {
synchronized (this) {
if (!isClosed &&
unclosedReferenceMap.containsKey(tsFile.getTsFilePath())) {
- unclosedReferenceMap.get(tsFile.getTsFilePath()).decrementAndGet();
+ unclosedReferenceMap.get(tsFile.getTsFilePath()).remove(queryId);
} else if (closedReferenceMap.containsKey(tsFile.getTsFilePath())) {
- closedReferenceMap.get(tsFile.getTsFilePath()).decrementAndGet();
+ closedReferenceMap.get(tsFile.getTsFilePath()).remove(queryId);
}
}
tsFile.readUnlock("fileReaderReference");
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
index d75bbe3..b7604d6 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
@@ -77,7 +77,8 @@ public class QueryFileManager {
!isClosed ? unsealedFilePathsMap : sealedFilePathsMap;
// This resource may be removed by other threads of this query.
if (pathMap.get(queryId).remove(tsFileResource)) {
-
FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource,
isClosed);
+ FileReaderManager.getInstance()
+ .decreaseFileReaderReference(tsFileResource, isClosed, queryId);
}
iterator.remove();
}
@@ -94,7 +95,7 @@ public class QueryFileManager {
queryId,
(k, v) -> {
for (TsFileResource tsFile : v) {
-
FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
+
FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true,
queryId);
}
return null;
});
@@ -102,7 +103,7 @@ public class QueryFileManager {
queryId,
(k, v) -> {
for (TsFileResource tsFile : v) {
-
FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, false);
+
FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, false,
queryId);
}
return null;
});
@@ -119,7 +120,7 @@ public class QueryFileManager {
// TODO this is not an atomic operation, is there concurrent problem?
if (!pathMap.get(queryId).contains(tsFile)) {
pathMap.get(queryId).add(tsFile);
- FileReaderManager.getInstance().increaseFileReaderReference(tsFile,
isClosed);
+ FileReaderManager.getInstance().increaseFileReaderReference(tsFile,
isClosed, queryId);
}
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
b/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
index ea5b75b..5b6cfd5 100644
---
a/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
@@ -88,7 +88,7 @@ public class FileReaderManagerTest {
}
for (int i = 1; i <= 6; i++) {
TsFileResource tsFile = tsFileResources[i];
- manager.decreaseFileReaderReference(tsFile, false);
+ manager.decreaseFileReaderReference(tsFile, false, 1L);
}
} catch (IOException e) {
@@ -111,7 +111,7 @@ public class FileReaderManagerTest {
}
for (int i = 4; i <= MAX_FILE_SIZE; i++) {
TsFileResource tsFile = tsFileResources[i];
- manager.decreaseFileReaderReference(tsFile, false);
+ manager.decreaseFileReaderReference(tsFile, false, 2L);
}
} catch (IOException e) {