This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_flush_close_file_error
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 9302734bfdc2bf0092abaf3f6a5bb406864a4f62
Author: qiaojialin <[email protected]>
AuthorDate: Tue Mar 17 10:39:25 2020 +0800

    fix flush
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  8 ++--
 .../iotdb/db/engine/flush/TsFileFlushPolicy.java   |  2 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 54 ++++++++++++----------
 .../db/engine/cache/DeviceMetaDataCacheTest.java   |  6 +--
 .../storagegroup/StorageGroupProcessorTest.java    | 28 +++++------
 .../iotdb/db/engine/storagegroup/TTLTest.java      | 10 ++--
 6 files changed, 55 insertions(+), 53 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java 
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 93df289..6b742d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -303,9 +303,7 @@ public class StorageEngine implements IService {
   public void syncCloseAllProcessor() {
     logger.info("Start closing all storage group processor");
     for (StorageGroupProcessor processor : processorMap.values()) {
-      processor.waitForAllCurrentTsFileProcessorsClosed();
-      //TODO do we need to wait for all merging tasks to be finished here?
-      processor.closeAllResources();
+      processor.syncCloseAllTsFileProcessors();
     }
   }
 
@@ -321,13 +319,13 @@ public class StorageEngine implements IService {
           // to avoid concurrent modification problem, we need a new array list
           for (TsFileProcessor tsfileProcessor : new ArrayList<>(
               processor.getWorkSequenceTsFileProcessors())) {
-            processor.moveOneWorkProcessorToClosingList(true, tsfileProcessor);
+            processor.asyncCloseOneTsFileProcessor(true, tsfileProcessor);
           }
         } else {
           // to avoid concurrent modification problem, we need a new array list
           for (TsFileProcessor tsfileProcessor : new ArrayList<>(
               processor.getWorkUnsequenceTsFileProcessor())) {
-            processor.moveOneWorkProcessorToClosingList(false, 
tsfileProcessor);
+            processor.asyncCloseOneTsFileProcessor(false, tsfileProcessor);
           }
         }
       } finally {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
index 0b3b61b..7df2508 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
@@ -45,7 +45,7 @@ public interface TsFileFlushPolicy {
           tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
 
       if (tsFileProcessor.shouldClose()) {
-        storageGroupProcessor.moveOneWorkProcessorToClosingList(isSeq, 
tsFileProcessor);
+        storageGroupProcessor.asyncCloseOneTsFileProcessor(isSeq, 
tsFileProcessor);
       } else {
         tsFileProcessor.asyncFlush();
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 72e05b1..03ac594 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -94,7 +94,7 @@ import static 
org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF
  * (1) when inserting data into the TsFileProcessor, and the TsFileProcessor 
shouldFlush() (or
  * shouldClose())<br/>
  * <p>
- * (2) someone calls waitForAllCurrentTsFileProcessorsClosed(). (up to now, 
only flush command from
+ * (2) someone calls syncCloseAllTsFileProcessors(). (up to now, only flush 
command from
  * cli will call this method)<br/>
  * <p>
  * UnSequence data has the similar process as above.
@@ -745,7 +745,7 @@ public class StorageGroupProcessor {
               tsFileProcessorTreeMap.size(),
               
IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup() / 
2,
               storageGroupName);
-          moveOneWorkProcessorToClosingList(sequence, 
processorEntry.getValue());
+          asyncCloseOneTsFileProcessor(sequence, processorEntry.getValue());
         }
 
         // build new processor
@@ -820,8 +820,7 @@ public class StorageGroupProcessor {
   /**
    * thread-safety should be ensured by caller
    */
-  public void moveOneWorkProcessorToClosingList(boolean sequence,
-      TsFileProcessor tsFileProcessor) {
+  public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor 
tsFileProcessor) {
     //for sequence tsfile, we update the endTimeMap only when the file is 
prepared to be closed.
     //for unsequence tsfile, we have maintained the endTimeMap when an 
insertion comes.
     if (sequence) {
@@ -852,8 +851,8 @@ public class StorageGroupProcessor {
    */
   public void deleteFolder(String systemDir) {
     logger.info("{} will close all files for deleting data folder {}", 
storageGroupName, systemDir);
-    waitForAllCurrentTsFileProcessorsClosed();
     writeLock();
+    syncCloseAllTsFileProcessors();
     try {
       File storageGroupFolder = SystemFileFactory.INSTANCE.getFile(systemDir, 
storageGroupName);
       if (storageGroupFolder.exists()) {
@@ -885,7 +884,8 @@ public class StorageGroupProcessor {
 
   public void syncDeleteDataFiles() {
     logger.info("{} will close all files for deleting data files", 
storageGroupName);
-    waitForAllCurrentTsFileProcessorsClosed();
+    writeLock();
+    syncCloseAllTsFileProcessors();
     //normally, mergingModification is just need to be closed by after a merge 
task is finished.
     //we close it here just for IT test.
     if (this.mergingModification != null) {
@@ -896,7 +896,6 @@ public class StorageGroupProcessor {
       }
 
     }
-    writeLock();
     try {
       closeAllResources();
       List<String> folder = 
DirectoryManager.getInstance().getAllSequenceFileFolders();
@@ -995,39 +994,44 @@ public class StorageGroupProcessor {
   /**
    * This method will be blocked until all tsfile processors are closed.
    */
-  public void waitForAllCurrentTsFileProcessorsClosed() {
-    synchronized (closeStorageGroupCondition) {
-      try {
-        putAllWorkingTsFileProcessorIntoClosingList();
-        long startTime = System.currentTimeMillis();
-        while (!closingSequenceTsFileProcessor.isEmpty() || 
!closingUnSequenceTsFileProcessor
-            .isEmpty()) {
-          closeStorageGroupCondition.wait(60_000);
-          if (System.currentTimeMillis() - startTime > 60_000) {
-            logger.warn("{} has spent {}s to wait for closing all TsFiles.", 
this.storageGroupName,
-                (System.currentTimeMillis() - startTime)/1000);
+  public void syncCloseAllTsFileProcessors() {
+    writeLock();
+    try {
+      synchronized (closeStorageGroupCondition) {
+        try {
+          asyncCloseAllTsFileProcessors();
+          long startTime = System.currentTimeMillis();
+          while (!closingSequenceTsFileProcessor.isEmpty() || 
!closingUnSequenceTsFileProcessor
+              .isEmpty()) {
+            closeStorageGroupCondition.wait(60_000);
+            if (System.currentTimeMillis() - startTime > 60_000) {
+              logger.warn("{} has spent {}s to wait for closing all TsFiles.", 
this.storageGroupName,
+                  (System.currentTimeMillis() - startTime)/1000);
+            }
           }
+        } catch (InterruptedException e) {
+          logger.error("CloseFileNodeCondition error occurs while waiting for 
closing the storage "
+              + "group {}", storageGroupName, e);
         }
-      } catch (InterruptedException e) {
-        logger.error("CloseFileNodeCondition error occurs while waiting for 
closing the storage "
-            + "group {}", storageGroupName, e);
       }
+    } finally {
+      writeUnlock();
     }
   }
 
-  public void putAllWorkingTsFileProcessorIntoClosingList() {
+  public void asyncCloseAllTsFileProcessors() {
     writeLock();
     try {
       logger.info("async force close all files in storage group: {}", 
storageGroupName);
       // to avoid concurrent modification problem, we need a new array list
       for (TsFileProcessor tsFileProcessor : new ArrayList<>(
           workSequenceTsFileProcessors.values())) {
-        moveOneWorkProcessorToClosingList(true, tsFileProcessor);
+        asyncCloseOneTsFileProcessor(true, tsFileProcessor);
       }
       // to avoid concurrent modification problem, we need a new array list
       for (TsFileProcessor tsFileProcessor : new ArrayList<>(
           workUnsequenceTsFileProcessors.values())) {
-        moveOneWorkProcessorToClosingList(false, tsFileProcessor);
+        asyncCloseOneTsFileProcessor(false, tsFileProcessor);
       }
     } finally {
       writeUnlock();
@@ -1370,7 +1374,7 @@ public class StorageGroupProcessor {
       }
       logger.info("{} will close all files for starting a merge (fullmerge = 
{})", storageGroupName,
           fullMerge);
-      waitForAllCurrentTsFileProcessorsClosed();
+      syncCloseAllTsFileProcessors();
       if (unSequenceFileList.isEmpty() || sequenceFileTreeSet.isEmpty()) {
         logger.info("{} no files to be merged", storageGroupName);
         return;
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
index ff05387..2f51e18 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
@@ -106,17 +106,17 @@ public class DeviceMetaDataCacheTest {
     for (int j = 11; j <= 20; j++) {
       insertOneRecord(j, j);
     }
-    storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
+    storageGroupProcessor.asyncCloseAllTsFileProcessors();
 
     for (int j = 21; j <= 30; j += 2) {
       insertOneRecord(j, 0); // will be covered when read
     }
-    storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+    storageGroupProcessor.syncCloseAllTsFileProcessors();
 
     for (int j = 21; j <= 30; j += 2) {
       insertOneRecord(j, j);
     }
-    storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+    storageGroupProcessor.syncCloseAllTsFileProcessors();
 
     insertOneRecord(2, 100);
   }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index e86a985..ec645b3 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -89,7 +89,7 @@ public class StorageGroupProcessorTest {
     TSRecord record = new TSRecord(10000, deviceId);
     record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(1000)));
     processor.insert(new InsertPlan(record));
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.syncCloseAllTsFileProcessors();
 
     for (int j = 1; j <= 10; j++) {
       record = new TSRecord(j, deviceId);
@@ -137,10 +137,10 @@ public class StorageGroupProcessorTest {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(j)));
       processor.insert(new InsertPlan(record));
-      processor.putAllWorkingTsFileProcessorIntoClosingList();
+      processor.asyncCloseAllTsFileProcessors();
     }
 
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.syncCloseAllTsFileProcessors();
     QueryDataSource queryDataSource = processor.query(deviceId, measurementId, 
context,
         null, null);
 
@@ -178,7 +178,7 @@ public class StorageGroupProcessorTest {
     batchInsertPlan1.setRowCount(times.length);
 
     processor.insertBatch(batchInsertPlan1);
-    processor.putAllWorkingTsFileProcessorIntoClosingList();
+    processor.asyncCloseAllTsFileProcessors();
 
     BatchInsertPlan batchInsertPlan2 = new BatchInsertPlan("root.vehicle.d0", 
measurements,
         dataTypes);
@@ -193,8 +193,8 @@ public class StorageGroupProcessorTest {
     batchInsertPlan2.setRowCount(times.length);
 
     processor.insertBatch(batchInsertPlan2);
-    processor.putAllWorkingTsFileProcessorIntoClosingList();
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.asyncCloseAllTsFileProcessors();
+    processor.syncCloseAllTsFileProcessors();
 
     QueryDataSource queryDataSource = processor.query(deviceId, measurementId, 
context,
         null, null);
@@ -214,18 +214,18 @@ public class StorageGroupProcessorTest {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(j)));
       processor.insert(new InsertPlan(record));
-      processor.putAllWorkingTsFileProcessorIntoClosingList();
+      processor.asyncCloseAllTsFileProcessors();
     }
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.syncCloseAllTsFileProcessors();
 
     for (int j = 10; j >= 1; j--) {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(j)));
       processor.insert(new InsertPlan(record));
-      processor.putAllWorkingTsFileProcessorIntoClosingList();
+      processor.asyncCloseAllTsFileProcessors();
     }
 
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.syncCloseAllTsFileProcessors();
 
     QueryDataSource queryDataSource = processor.query(deviceId, measurementId, 
context,
         null, null);
@@ -247,18 +247,18 @@ public class StorageGroupProcessorTest {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(j)));
       processor.insert(new InsertPlan(record));
-      processor.putAllWorkingTsFileProcessorIntoClosingList();
+      processor.asyncCloseAllTsFileProcessors();
     }
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.syncCloseAllTsFileProcessors();
 
     for (int j = 10; j >= 1; j--) {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(j)));
       processor.insert(new InsertPlan(record));
-      processor.putAllWorkingTsFileProcessorIntoClosingList();
+      processor.asyncCloseAllTsFileProcessors();
     }
 
-    processor.waitForAllCurrentTsFileProcessorsClosed();
+    processor.syncCloseAllTsFileProcessors();
     processor.merge(true);
     while (mergeLock.get() == 0) {
       // wait
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index d28006b..55eb0a8 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -83,7 +83,7 @@ public class TTLTest {
 
   @After
   public void tearDown() throws IOException, StorageEngineException {
-    storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+    storageGroupProcessor.syncCloseAllTsFileProcessors();
     EnvironmentUtils.cleanEnv();
   }
 
@@ -160,7 +160,7 @@ public class TTLTest {
       insertPlan.setTime(initTime - 2000 + i);
       storageGroupProcessor.insert(insertPlan);
       if ((i + 1) % 300 == 0) {
-        storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
+        storageGroupProcessor.asyncCloseAllTsFileProcessors();
       }
     }
     // unsequence data
@@ -168,7 +168,7 @@ public class TTLTest {
       insertPlan.setTime(initTime - 2000 + i);
       storageGroupProcessor.insert(insertPlan);
       if ((i + 1) % 300 == 0) {
-        storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
+        storageGroupProcessor.asyncCloseAllTsFileProcessors();
       }
     }
   }
@@ -225,7 +225,7 @@ public class TTLTest {
   public void testTTLRemoval() throws StorageEngineException, 
QueryProcessException {
     prepareData();
 
-    storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+    storageGroupProcessor.syncCloseAllTsFileProcessors();
 
     // files before ttl
     File seqDir = new 
File(DirectoryManager.getInstance().getNextFolderForSequenceFile(), sg1);
@@ -335,7 +335,7 @@ public class TTLTest {
   @Test
   public void testTTLCleanFile() throws QueryProcessException {
     prepareData();
-    storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+    storageGroupProcessor.syncCloseAllTsFileProcessors();
 
     assertEquals(4, storageGroupProcessor.getSequenceFileTreeSet().size());
     assertEquals(4, storageGroupProcessor.getUnSequenceFileList().size());

Reply via email to