This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch fix_flush_close_file_error in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 9302734bfdc2bf0092abaf3f6a5bb406864a4f62 Author: qiaojialin <[email protected]> AuthorDate: Tue Mar 17 10:39:25 2020 +0800 fix flush --- .../org/apache/iotdb/db/engine/StorageEngine.java | 8 ++-- .../iotdb/db/engine/flush/TsFileFlushPolicy.java | 2 +- .../engine/storagegroup/StorageGroupProcessor.java | 54 ++++++++++++---------- .../db/engine/cache/DeviceMetaDataCacheTest.java | 6 +-- .../storagegroup/StorageGroupProcessorTest.java | 28 +++++------ .../iotdb/db/engine/storagegroup/TTLTest.java | 10 ++-- 6 files changed, 55 insertions(+), 53 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index 93df289..6b742d7 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -303,9 +303,7 @@ public class StorageEngine implements IService { public void syncCloseAllProcessor() { logger.info("Start closing all storage group processor"); for (StorageGroupProcessor processor : processorMap.values()) { - processor.waitForAllCurrentTsFileProcessorsClosed(); - //TODO do we need to wait for all merging tasks to be finished here? - processor.closeAllResources(); + processor.syncCloseAllTsFileProcessors(); } } @@ -321,13 +319,13 @@ public class StorageEngine implements IService { // to avoid concurrent modification problem, we need a new array list for (TsFileProcessor tsfileProcessor : new ArrayList<>( processor.getWorkSequenceTsFileProcessors())) { - processor.moveOneWorkProcessorToClosingList(true, tsfileProcessor); + processor.asyncCloseOneTsFileProcessor(true, tsfileProcessor); } } else { // to avoid concurrent modification problem, we need a new array list for (TsFileProcessor tsfileProcessor : new ArrayList<>( processor.getWorkUnsequenceTsFileProcessor())) { - processor.moveOneWorkProcessorToClosingList(false, tsfileProcessor); + processor.asyncCloseOneTsFileProcessor(false, tsfileProcessor); } } } finally { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java index 0b3b61b..7df2508 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java @@ -45,7 +45,7 @@ public interface TsFileFlushPolicy { tsFileProcessor.getTsFileResource().getFile().getAbsolutePath()); if (tsFileProcessor.shouldClose()) { - storageGroupProcessor.moveOneWorkProcessorToClosingList(isSeq, tsFileProcessor); + storageGroupProcessor.asyncCloseOneTsFileProcessor(isSeq, tsFileProcessor); } else { tsFileProcessor.asyncFlush(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 72e05b1..03ac594 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -94,7 +94,7 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF * (1) when inserting data into the TsFileProcessor, and the TsFileProcessor shouldFlush() (or * shouldClose())<br/> * <p> - * (2) someone calls waitForAllCurrentTsFileProcessorsClosed(). (up to now, only flush command from + * (2) someone calls syncCloseAllTsFileProcessors(). (up to now, only flush command from * cli will call this method)<br/> * <p> * UnSequence data has the similar process as above. @@ -745,7 +745,7 @@ public class StorageGroupProcessor { tsFileProcessorTreeMap.size(), IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup() / 2, storageGroupName); - moveOneWorkProcessorToClosingList(sequence, processorEntry.getValue()); + asyncCloseOneTsFileProcessor(sequence, processorEntry.getValue()); } // build new processor @@ -820,8 +820,7 @@ public class StorageGroupProcessor { /** * thread-safety should be ensured by caller */ - public void moveOneWorkProcessorToClosingList(boolean sequence, - TsFileProcessor tsFileProcessor) { + public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) { //for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed. //for unsequence tsfile, we have maintained the endTimeMap when an insertion comes. if (sequence) { @@ -852,8 +851,8 @@ public class StorageGroupProcessor { */ public void deleteFolder(String systemDir) { logger.info("{} will close all files for deleting data folder {}", storageGroupName, systemDir); - waitForAllCurrentTsFileProcessorsClosed(); writeLock(); + syncCloseAllTsFileProcessors(); try { File storageGroupFolder = SystemFileFactory.INSTANCE.getFile(systemDir, storageGroupName); if (storageGroupFolder.exists()) { @@ -885,7 +884,8 @@ public class StorageGroupProcessor { public void syncDeleteDataFiles() { logger.info("{} will close all files for deleting data files", storageGroupName); - waitForAllCurrentTsFileProcessorsClosed(); + writeLock(); + syncCloseAllTsFileProcessors(); //normally, mergingModification is just need to be closed by after a merge task is finished. //we close it here just for IT test. if (this.mergingModification != null) { @@ -896,7 +896,6 @@ public class StorageGroupProcessor { } } - writeLock(); try { closeAllResources(); List<String> folder = DirectoryManager.getInstance().getAllSequenceFileFolders(); @@ -995,39 +994,44 @@ public class StorageGroupProcessor { /** * This method will be blocked until all tsfile processors are closed. */ - public void waitForAllCurrentTsFileProcessorsClosed() { - synchronized (closeStorageGroupCondition) { - try { - putAllWorkingTsFileProcessorIntoClosingList(); - long startTime = System.currentTimeMillis(); - while (!closingSequenceTsFileProcessor.isEmpty() || !closingUnSequenceTsFileProcessor - .isEmpty()) { - closeStorageGroupCondition.wait(60_000); - if (System.currentTimeMillis() - startTime > 60_000) { - logger.warn("{} has spent {}s to wait for closing all TsFiles.", this.storageGroupName, - (System.currentTimeMillis() - startTime)/1000); + public void syncCloseAllTsFileProcessors() { + writeLock(); + try { + synchronized (closeStorageGroupCondition) { + try { + asyncCloseAllTsFileProcessors(); + long startTime = System.currentTimeMillis(); + while (!closingSequenceTsFileProcessor.isEmpty() || !closingUnSequenceTsFileProcessor + .isEmpty()) { + closeStorageGroupCondition.wait(60_000); + if (System.currentTimeMillis() - startTime > 60_000) { + logger.warn("{} has spent {}s to wait for closing all TsFiles.", this.storageGroupName, + (System.currentTimeMillis() - startTime)/1000); + } } + } catch (InterruptedException e) { + logger.error("CloseFileNodeCondition error occurs while waiting for closing the storage " + + "group {}", storageGroupName, e); } - } catch (InterruptedException e) { - logger.error("CloseFileNodeCondition error occurs while waiting for closing the storage " - + "group {}", storageGroupName, e); } + } finally { + writeUnlock(); } } - public void putAllWorkingTsFileProcessorIntoClosingList() { + public void asyncCloseAllTsFileProcessors() { writeLock(); try { logger.info("async force close all files in storage group: {}", storageGroupName); // to avoid concurrent modification problem, we need a new array list for (TsFileProcessor tsFileProcessor : new ArrayList<>( workSequenceTsFileProcessors.values())) { - moveOneWorkProcessorToClosingList(true, tsFileProcessor); + asyncCloseOneTsFileProcessor(true, tsFileProcessor); } // to avoid concurrent modification problem, we need a new array list for (TsFileProcessor tsFileProcessor : new ArrayList<>( workUnsequenceTsFileProcessors.values())) { - moveOneWorkProcessorToClosingList(false, tsFileProcessor); + asyncCloseOneTsFileProcessor(false, tsFileProcessor); } } finally { writeUnlock(); @@ -1370,7 +1374,7 @@ public class StorageGroupProcessor { } logger.info("{} will close all files for starting a merge (fullmerge = {})", storageGroupName, fullMerge); - waitForAllCurrentTsFileProcessorsClosed(); + syncCloseAllTsFileProcessors(); if (unSequenceFileList.isEmpty() || sequenceFileTreeSet.isEmpty()) { logger.info("{} no files to be merged", storageGroupName); return; diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java index ff05387..2f51e18 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java @@ -106,17 +106,17 @@ public class DeviceMetaDataCacheTest { for (int j = 11; j <= 20; j++) { insertOneRecord(j, j); } - storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList(); + storageGroupProcessor.asyncCloseAllTsFileProcessors(); for (int j = 21; j <= 30; j += 2) { insertOneRecord(j, 0); // will be covered when read } - storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed(); + storageGroupProcessor.syncCloseAllTsFileProcessors(); for (int j = 21; j <= 30; j += 2) { insertOneRecord(j, j); } - storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed(); + storageGroupProcessor.syncCloseAllTsFileProcessors(); insertOneRecord(2, 100); } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java index e86a985..ec645b3 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java @@ -89,7 +89,7 @@ public class StorageGroupProcessorTest { TSRecord record = new TSRecord(10000, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000))); processor.insert(new InsertPlan(record)); - processor.waitForAllCurrentTsFileProcessorsClosed(); + processor.syncCloseAllTsFileProcessors(); for (int j = 1; j <= 10; j++) { record = new TSRecord(j, deviceId); @@ -137,10 +137,10 @@ public class StorageGroupProcessorTest { TSRecord record = new TSRecord(j, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); processor.insert(new InsertPlan(record)); - processor.putAllWorkingTsFileProcessorIntoClosingList(); + processor.asyncCloseAllTsFileProcessors(); } - processor.waitForAllCurrentTsFileProcessorsClosed(); + processor.syncCloseAllTsFileProcessors(); QueryDataSource queryDataSource = processor.query(deviceId, measurementId, context, null, null); @@ -178,7 +178,7 @@ public class StorageGroupProcessorTest { batchInsertPlan1.setRowCount(times.length); processor.insertBatch(batchInsertPlan1); - processor.putAllWorkingTsFileProcessorIntoClosingList(); + processor.asyncCloseAllTsFileProcessors(); BatchInsertPlan batchInsertPlan2 = new BatchInsertPlan("root.vehicle.d0", measurements, dataTypes); @@ -193,8 +193,8 @@ public class StorageGroupProcessorTest { batchInsertPlan2.setRowCount(times.length); processor.insertBatch(batchInsertPlan2); - processor.putAllWorkingTsFileProcessorIntoClosingList(); - processor.waitForAllCurrentTsFileProcessorsClosed(); + processor.asyncCloseAllTsFileProcessors(); + processor.syncCloseAllTsFileProcessors(); QueryDataSource queryDataSource = processor.query(deviceId, measurementId, context, null, null); @@ -214,18 +214,18 @@ public class StorageGroupProcessorTest { TSRecord record = new TSRecord(j, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); processor.insert(new InsertPlan(record)); - processor.putAllWorkingTsFileProcessorIntoClosingList(); + processor.asyncCloseAllTsFileProcessors(); } - processor.waitForAllCurrentTsFileProcessorsClosed(); + processor.syncCloseAllTsFileProcessors(); for (int j = 10; j >= 1; j--) { TSRecord record = new TSRecord(j, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); processor.insert(new InsertPlan(record)); - processor.putAllWorkingTsFileProcessorIntoClosingList(); + processor.asyncCloseAllTsFileProcessors(); } - processor.waitForAllCurrentTsFileProcessorsClosed(); + processor.syncCloseAllTsFileProcessors(); QueryDataSource queryDataSource = processor.query(deviceId, measurementId, context, null, null); @@ -247,18 +247,18 @@ public class StorageGroupProcessorTest { TSRecord record = new TSRecord(j, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); processor.insert(new InsertPlan(record)); - processor.putAllWorkingTsFileProcessorIntoClosingList(); + processor.asyncCloseAllTsFileProcessors(); } - processor.waitForAllCurrentTsFileProcessorsClosed(); + processor.syncCloseAllTsFileProcessors(); for (int j = 10; j >= 1; j--) { TSRecord record = new TSRecord(j, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); processor.insert(new InsertPlan(record)); - processor.putAllWorkingTsFileProcessorIntoClosingList(); + processor.asyncCloseAllTsFileProcessors(); } - processor.waitForAllCurrentTsFileProcessorsClosed(); + processor.syncCloseAllTsFileProcessors(); processor.merge(true); while (mergeLock.get() == 0) { // wait diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java index d28006b..55eb0a8 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java @@ -83,7 +83,7 @@ public class TTLTest { @After public void tearDown() throws IOException, StorageEngineException { - storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed(); + storageGroupProcessor.syncCloseAllTsFileProcessors(); EnvironmentUtils.cleanEnv(); } @@ -160,7 +160,7 @@ public class TTLTest { insertPlan.setTime(initTime - 2000 + i); storageGroupProcessor.insert(insertPlan); if ((i + 1) % 300 == 0) { - storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList(); + storageGroupProcessor.asyncCloseAllTsFileProcessors(); } } // unsequence data @@ -168,7 +168,7 @@ public class TTLTest { insertPlan.setTime(initTime - 2000 + i); storageGroupProcessor.insert(insertPlan); if ((i + 1) % 300 == 0) { - storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList(); + storageGroupProcessor.asyncCloseAllTsFileProcessors(); } } } @@ -225,7 +225,7 @@ public class TTLTest { public void testTTLRemoval() throws StorageEngineException, QueryProcessException { prepareData(); - storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed(); + storageGroupProcessor.syncCloseAllTsFileProcessors(); // files before ttl File seqDir = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(), sg1); @@ -335,7 +335,7 @@ public class TTLTest { @Test public void testTTLCleanFile() throws QueryProcessException { prepareData(); - storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed(); + storageGroupProcessor.syncCloseAllTsFileProcessors(); assertEquals(4, storageGroupProcessor.getSequenceFileTreeSet().size()); assertEquals(4, storageGroupProcessor.getUnSequenceFileList().size());
