This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch jira1306_011
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/jira1306_011 by this push:
new e0c34d8 skip flushing the awaiting memtable
e0c34d8 is described below
commit e0c34d834f002e9f606764bcd55c2e8398a9c2ef
Author: HTHou <[email protected]>
AuthorDate: Wed Apr 21 13:56:03 2021 +0800
skip flushing the awaiting memtable
---
.../iotdb/db/engine/storagegroup/StorageGroupProcessor.java | 3 ++-
.../apache/iotdb/db/engine/storagegroup/TsFileProcessor.java | 10 ++++++++--
2 files changed, 10 insertions(+), 3 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index c6afa3f..19a0e24 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1014,7 +1014,8 @@ public class StorageGroupProcessor {
public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor
tsFileProcessor) {
writeLock();
try {
- if (!closingSequenceTsFileProcessor.contains(tsFileProcessor)
+ if (!tsFileProcessor.isAwaiting()
+ && !closingSequenceTsFileProcessor.contains(tsFileProcessor)
&& !closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1ab2b39..4bcbf50 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.CompressionRatio;
-import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.flush.CloseFileListener;
import org.apache.iotdb.db.engine.flush.FlushListener;
import org.apache.iotdb.db.engine.flush.FlushManager;
@@ -117,6 +116,7 @@ public class TsFileProcessor {
private WriteLogNode logNode;
private final boolean sequence;
private long totalMemTableSize;
+ private boolean isAwaiting = false;
private boolean shouldFlush = false;
private static final String FLUSH_QUERY_WRITE_LOCKED = "{}: {} get
flushQueryLock write lock";
@@ -366,11 +366,13 @@ public class TsFileProcessor {
long startTime = System.currentTimeMillis();
while (SystemInfo.getInstance().isRejected()) {
try {
+ isAwaiting = true;
storageGroupInfo.getStorageGroupProcessor().rejectConditionAwait();
+ isAwaiting = false;
if (System.currentTimeMillis() - startTime
> config.getMaxWaitingTimeWhenInsertBlocked()) {
throw new WriteProcessRejectException(
- "System rejected over " +
config.getMaxWaitingTimeWhenInsertBlocked() + "ms");
+ "System rejected over " + (System.currentTimeMillis() -
startTime) + "ms");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -450,6 +452,10 @@ public class TsFileProcessor {
return false;
}
+ public boolean isAwaiting() {
+ return isAwaiting;
+ }
+
private long getMemtableSizeThresholdBasedOnSeriesNum() {
return config.getMemtableSizeThreshold();
}