This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 9a0e9d4d8c6 Speed up recover (#13068) (#13345)
9a0e9d4d8c6 is described below
commit 9a0e9d4d8c6836a009065e33a64f0e21b7e8d23f
Author: Haonan <[email protected]>
AuthorDate: Sun Sep 1 21:16:00 2024 +0800
Speed up recover (#13068) (#13345)
* init
* init
* init
* dev
* dev
* dev
* dev reader
* support read and write FileTimeIndexCache
* support read and write FileTimeIndexCache
* dev recover progress
* update last flush time after async recover finished
* fix package structure
* finish compact logic
* fix UT
* Fix repair data error
* adapt pipe
* try to fix 1c3d IT
* fixing recover from wal not recode fileTimeIndex
* recover from wal need to recode file timeindex cache
* fix sonar bug
* update more
* fix empty FileTimeIndexCache
* batch serialize
* control the thread number of recover
* fix cannot start recover task
* fix delete region
* fix compile issue
* fixing review
* fix review
* fix review and small issue
* fix review and small issue
---
.../iotdb/db/it/IoTDBPartialInsertionIT.java | 2 +-
.../PipeHistoricalDataRegionTsFileExtractor.java | 3 +
.../impl/DataNodeInternalRPCServiceImpl.java | 2 +-
.../config/executor/ClusterConfigTaskExecutor.java | 2 +-
.../java/org/apache/iotdb/db/service/DataNode.java | 40 ++-
.../iotdb/db/storageengine/StorageEngine.java | 100 +++++--
.../db/storageengine/dataregion/DataRegion.java | 331 +++++++++++++--------
.../dataregion/DeviceLastFlushTime.java | 4 +
.../dataregion/HashLastFlushTimeMap.java | 92 ++++--
.../dataregion/ILastFlushTimeMap.java | 11 +-
.../schedule/CompactionScheduleTaskWorker.java | 2 +-
.../compaction/schedule/TTLScheduleTask.java | 2 +-
.../dataregion/memtable/TsFileProcessor.java | 2 +
.../storageengine/dataregion/tsfile/TsFileID.java | 23 ++
.../dataregion/tsfile/TsFileManager.java | 27 +-
.../dataregion/tsfile/TsFileResource.java | 15 +
.../timeindex/FileTimeIndexCacheRecorder.java | 227 ++++++++++++++
.../FileTimeIndexCacheReader.java | 84 ++++++
.../FileTimeIndexCacheWriter.java | 103 +++++++
.../file/UnsealedTsFileRecoverPerformer.java | 2 +
.../dataregion/LastFlushTimeMapTest.java | 6 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 2 +
.../iotdb/commons/concurrent/ThreadName.java | 2 +
23 files changed, 895 insertions(+), 189 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBPartialInsertionIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBPartialInsertionIT.java
index 074da6873ee..182167be41c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBPartialInsertionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBPartialInsertionIT.java
@@ -108,7 +108,7 @@ public class IoTDBPartialInsertionIT {
EnvironmentUtils.restartDaemon();
StorageEngine.getInstance().recover();
// wait for recover
- while (!StorageEngine.getInstance().isAllSgReady()) {
+ while (!StorageEngine.getInstance().isReadyForReadAndWrite()) {
Thread.sleep(500);
time += 500;
if (time > 10000) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 16f6c6b7d91..a0292d1b245 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -369,6 +369,9 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
@Override
public synchronized void start() {
+ if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
+ return;
+ }
if (!shouldExtractInsertion) {
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 3604f0c00dd..c7f076a476e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1783,7 +1783,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSStatus startRepairData() throws TException {
- if (!storageEngine.isAllSgReady()) {
+ if (!storageEngine.isReadyForNonReadWriteFunctions()) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "not all
sg is ready");
}
IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 7ac6b553e5e..f758c1a9252 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1105,7 +1105,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
future.setException(e);
}
} else {
- if (!StorageEngine.getInstance().isAllSgReady()) {
+ if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
future.setException(
new IoTDBException(
"not all sg is ready",
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index fab7eb620d3..d7be7efff4b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -243,6 +243,25 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
logger.info("IoTDB configuration: {}", config.getConfigMessage());
logger.info("Congratulations, IoTDB DataNode is set up successfully.
Now, enjoy yourself!");
+ if (isUsingPipeConsensus()) {
+ long dataRegionStartTime = System.currentTimeMillis();
+ while (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions())
{
+ try {
+ TimeUnit.MILLISECONDS.sleep(1000);
+ } catch (InterruptedException e) {
+ logger.warn("IoTDB DataNode failed to set up.", e);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ DataRegionConsensusImpl.getInstance().start();
+ long dataRegionEndTime = System.currentTimeMillis();
+ logger.info(
+ "DataRegion consensus start successfully, which takes {} ms.",
+ (dataRegionEndTime - dataRegionStartTime));
+ dataRegionConsensusStarted = true;
+ }
+
} catch (StartupException | IOException e) {
logger.error("Fail to start server", e);
stop();
@@ -663,12 +682,14 @@ public class DataNode extends ServerCommandLine
implements DataNodeMBean {
"SchemaRegion consensus start successfully, which takes {} ms.",
(schemaRegionEndTime - startTime));
schemaRegionConsensusStarted = true;
- DataRegionConsensusImpl.getInstance().start();
- long dataRegionEndTime = System.currentTimeMillis();
- logger.info(
- "DataRegion consensus start successfully, which takes {} ms.",
- (dataRegionEndTime - schemaRegionEndTime));
- dataRegionConsensusStarted = true;
+ if (!isUsingPipeConsensus()) {
+ DataRegionConsensusImpl.getInstance().start();
+ long dataRegionEndTime = System.currentTimeMillis();
+ logger.info(
+ "DataRegion consensus start successfully, which takes {} ms.",
+ (dataRegionEndTime - schemaRegionEndTime));
+ dataRegionConsensusStarted = true;
+ }
} catch (IOException e) {
throw new StartupException(e);
}
@@ -717,7 +738,7 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
logger.info(
"IoTDB DataNode is setting up, some databases may not be ready now,
please wait several seconds...");
long startTime = System.currentTimeMillis();
- while (!StorageEngine.getInstance().isAllSgReady()) {
+ while (!StorageEngine.getInstance().isReadyForReadAndWrite()) {
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
@@ -813,6 +834,11 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
return new TDataNodeConfiguration(location, resource);
}
+ private boolean isUsingPipeConsensus() {
+ return
config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)
+ ||
config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.FAST_IOT_CONSENSUS);
+ }
+
private void registerUdfServices() throws StartupException {
registerManager.register(TemporaryQueryDataFileService.getInstance());
registerManager.register(UDFClassLoaderManager.setupAndGetInstance(config.getUdfDir()));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 1e4f2953fe9..224a20dc196 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -120,7 +120,7 @@ public class StorageEngine implements IService {
* a folder (system/databases/ by default) that persist system info. Each
database will have a
* subfolder under the systemDir.
*/
- private final String systemDir =
+ private static final String systemDir =
FilePathUtils.regularizePath(CONFIG.getSystemDir()) + "databases";
/** DataRegionId -> DataRegion */
@@ -134,19 +134,21 @@ public class StorageEngine implements IService {
/** number of ready data region */
private AtomicInteger readyDataRegionNum;
- private AtomicBoolean isAllSgReady = new AtomicBoolean(false);
+ private final AtomicBoolean isReadyForReadAndWrite = new AtomicBoolean();
+
+ private final AtomicBoolean isReadyForNonReadWriteFunctions = new
AtomicBoolean();
private ScheduledExecutorService seqMemtableTimedFlushCheckThread;
private ScheduledExecutorService unseqMemtableTimedFlushCheckThread;
- private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
+ private final TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
/** used to do short-lived asynchronous tasks */
private ExecutorService cachedThreadPool;
// add customized listeners here for flush and close events
- private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
- private List<FlushListener> customFlushListeners = new ArrayList<>();
+ private final List<CloseFileListener> customCloseFileListeners = new
ArrayList<>();
+ private final List<FlushListener> customFlushListeners = new ArrayList<>();
private int recoverDataRegionNum = 0;
private final LoadTsFileManager loadTsFileManager = new LoadTsFileManager();
@@ -178,17 +180,19 @@ public class StorageEngine implements IService {
}
}
- public boolean isAllSgReady() {
- return isAllSgReady.get();
+ public boolean isReadyForReadAndWrite() {
+ return isReadyForReadAndWrite.get();
}
- public void setAllSgReady(boolean allSgReady) {
- isAllSgReady.set(allSgReady);
+ @SuppressWarnings("BooleanMethodIsAlwaysInverted")
+ public boolean isReadyForNonReadWriteFunctions() {
+ return isReadyForNonReadWriteFunctions.get();
}
- public void asyncRecover() throws StartupException {
+ private void asyncRecoverDataRegion() throws StartupException {
long startRecoverTime = System.currentTimeMillis();
- setAllSgReady(false);
+ isReadyForNonReadWriteFunctions.set(false);
+ isReadyForReadAndWrite.set(false);
cachedThreadPool =
IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.STORAGE_ENGINE_CACHED_POOL.getName());
@@ -209,8 +213,7 @@ public class StorageEngine implements IService {
new Thread(
() -> {
checkResults(futures, "StorageEngine failed to recover.");
- recoverRepairData();
- setAllSgReady(true);
+ isReadyForReadAndWrite.set(true);
LOGGER.info(
"Storage Engine recover cost: {}s.",
(System.currentTimeMillis() - startRecoverTime) / 1000);
@@ -287,11 +290,22 @@ public class StorageEngine implements IService {
throw new StorageEngineFailureException(e);
}
- asyncRecover();
-
- LOGGER.info("start ttl check thread successfully.");
+ asyncRecoverDataRegion();
startTimedService();
+
+ // wait here for dataRegionMap recovered
+ while (!isReadyForReadAndWrite.get()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Storage engine failed to set up.", e);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+
+ asyncRecoverTsFileResource();
}
private void startTimedService() {
@@ -339,6 +353,36 @@ public class StorageEngine implements IService {
}
}
+ private void asyncRecoverTsFileResource() {
+ List<Future<Void>> futures = new LinkedList<>();
+ for (DataRegion dataRegion : dataRegionMap.values()) {
+ if (dataRegion != null) {
+ List<Callable<Void>> asyncTsFileResourceRecoverTasks =
+ dataRegion.getAsyncTsFileResourceRecoverTaskList();
+ if (asyncTsFileResourceRecoverTasks != null) {
+ Callable<Void> taskOfRegion =
+ () -> {
+ for (Callable<Void> task : asyncTsFileResourceRecoverTasks) {
+ task.call();
+ }
+ dataRegion.initCompactionSchedule();
+ return null;
+ };
+ futures.add(cachedThreadPool.submit(taskOfRegion));
+ }
+ }
+ }
+ Thread recoverEndTrigger =
+ new Thread(
+ () -> {
+ checkResults(futures, "async recover tsfile resource meets
error.");
+ recoverRepairData();
+ isReadyForNonReadWriteFunctions.set(true);
+ },
+ ThreadName.STORAGE_ENGINE_RECOVER_TRIGGER.getName());
+ recoverEndTrigger.start();
+ }
+
@Override
public void stop() {
for (DataRegion dataRegion : dataRegionMap.values()) {
@@ -649,8 +693,6 @@ public class StorageEngine implements IService {
/**
* Add a listener to listen flush start/end events. Notice that this
addition only applies to
* TsFileProcessors created afterwards.
- *
- * @param listener
*/
public void registerFlushListener(FlushListener listener) {
customFlushListeners.add(listener);
@@ -659,8 +701,6 @@ public class StorageEngine implements IService {
/**
* Add a listener to listen file close events. Notice that this addition
only applies to
* TsFileProcessors created afterwards.
- *
- * @param listener
*/
public void registerCloseFileListener(CloseFileListener listener) {
customCloseFileListeners.add(listener);
@@ -676,7 +716,7 @@ public class StorageEngine implements IService {
}
// When registering a new region, the coordinator needs to register the
corresponding region with
- // the local storageengine before adding the corresponding consensusGroup to
the consensus layer
+ // the local storage before adding the corresponding consensusGroup to the
consensus layer
public DataRegion createDataRegion(DataRegionId regionId, String sg) throws
DataRegionException {
makeSureNoOldRegion(regionId);
AtomicReference<DataRegionException> exceptionAtomicReference = new
AtomicReference<>(null);
@@ -953,6 +993,24 @@ public class StorageEngine implements IService {
});
}
+ public static File getDataRegionSystemDir(String dataBaseName, String
dataRegionId) {
+ return SystemFileFactory.INSTANCE.getFile(
+ systemDir + File.separator + dataBaseName, dataRegionId);
+ }
+
+ public Runnable executeCompactFileTimeIndexCache() {
+ return () -> {
+ if (!isReadyForNonReadWriteFunctions()) {
+ return;
+ }
+ for (DataRegion dataRegion : dataRegionMap.values()) {
+ if (dataRegion != null) {
+ dataRegion.compactFileTimeIndexCache();
+ }
+ }
+ };
+ }
+
static class InstanceHolder {
private static final StorageEngine INSTANCE = new StorageEngine();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index e30af047e2d..232e1dd09e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -89,11 +89,15 @@ import
org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegio
import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import
org.apache.iotdb.db.storageengine.dataregion.read.filescan.IFileScanHandle;
import
org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.ClosedFileScanHandleImpl;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.VersionController;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder;
+import
org.apache.iotdb.db.storageengine.dataregion.utils.fileTimeIndexCache.FileTimeIndexCacheReader;
import
org.apache.iotdb.db.storageengine.dataregion.utils.validate.TsFileValidator;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode;
@@ -148,6 +152,7 @@ import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@@ -243,8 +248,8 @@ public class DataRegion implements IDataRegionForQuery {
/** database name. */
private final String databaseName;
- /** database system directory. */
- private File storageGroupSysDir;
+ /** data region system directory. */
+ private File dataRegionSysDir;
/** manage seqFileList and unSeqFileList. */
private final TsFileManager tsFileManager;
@@ -280,6 +285,8 @@ public class DataRegion implements IDataRegionForQuery {
/** whether it's ready from recovery. */
private boolean isReady = false;
+ private List<Callable<Void>> asyncTsFileResourceRecoverTaskList;
+
/** close file listeners. */
private List<CloseFileListener> customCloseFileListeners =
Collections.emptyList();
@@ -320,21 +327,20 @@ public class DataRegion implements IDataRegionForQuery {
this.fileFlushPolicy = fileFlushPolicy;
acquireDirectBufferMemory();
- storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir,
dataRegionId);
- this.tsFileManager =
- new TsFileManager(databaseName, dataRegionId,
storageGroupSysDir.getPath());
- if (storageGroupSysDir.mkdirs()) {
+ dataRegionSysDir = SystemFileFactory.INSTANCE.getFile(systemDir,
dataRegionId);
+ this.tsFileManager = new TsFileManager(databaseName, dataRegionId,
dataRegionSysDir.getPath());
+ if (dataRegionSysDir.mkdirs()) {
logger.info(
- "Database system Directory {} doesn't exist, create it",
storageGroupSysDir.getPath());
- } else if (!storageGroupSysDir.exists()) {
- logger.error("create database system Directory {} failed",
storageGroupSysDir.getPath());
+ "Database system Directory {} doesn't exist, create it",
dataRegionSysDir.getPath());
+ } else if (!dataRegionSysDir.exists()) {
+ logger.error("create database system Directory {} failed",
dataRegionSysDir.getPath());
}
lastFlushTimeMap = new HashLastFlushTimeMap();
- // recover tsfiles unless consensus protocol is ratis and storage
storageengine is not ready
+ // recover tsfiles unless consensus protocol is ratis and storage engine
is not ready
if
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)
- && !StorageEngine.getInstance().isAllSgReady()) {
+ && !StorageEngine.getInstance().isReadyForReadAndWrite()) {
logger.debug(
"Skip recovering data region {}[{}] when consensus protocol is ratis
and storage engine is not ready.",
databaseName,
@@ -356,6 +362,7 @@ public class DataRegion implements IDataRegionForQuery {
}
}
} else {
+ asyncTsFileResourceRecoverTaskList = new ArrayList<>();
recover();
}
@@ -380,17 +387,8 @@ public class DataRegion implements IDataRegionForQuery {
return isReady;
}
- public void setReady(boolean ready) {
- isReady = ready;
- }
-
- private Map<Long, List<TsFileResource>> splitResourcesByPartition(
- List<TsFileResource> resources) {
- Map<Long, List<TsFileResource>> ret = new TreeMap<>();
- for (TsFileResource resource : resources) {
- ret.computeIfAbsent(resource.getTimePartition(), l -> new
ArrayList<>()).add(resource);
- }
- return ret;
+ public List<Callable<Void>> getAsyncTsFileResourceRecoverTaskList() {
+ return asyncTsFileResourceRecoverTaskList;
}
/** this class is used to store recovering context. */
@@ -451,19 +449,16 @@ public class DataRegion implements IDataRegionForQuery {
try {
// collect candidate TsFiles from sequential and unsequential data
directory
- List<TsFileResource> tmpSeqTsFiles =
-
getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders());
- List<TsFileResource> tmpUnseqTsFiles =
-
getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
-
// split by partition so that we can find the last file of each
partition and decide to
// close it or not
- DataRegionRecoveryContext dataRegionRecoveryContext =
- new DataRegionRecoveryContext((long) tmpSeqTsFiles.size() +
tmpUnseqTsFiles.size());
Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles =
- splitResourcesByPartition(tmpSeqTsFiles);
+
getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders());
Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
- splitResourcesByPartition(tmpUnseqTsFiles);
+
getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
+ DataRegionRecoveryContext dataRegionRecoveryContext =
+ new DataRegionRecoveryContext(
+
partitionTmpSeqTsFiles.values().stream().mapToLong(List::size).sum()
+ +
partitionTmpUnseqTsFiles.values().stream().mapToLong(List::size).sum());
// submit unsealed TsFiles to recover
List<WALRecoverListener> recoverListeners = new ArrayList<>();
for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
@@ -544,13 +539,27 @@ public class DataRegion implements IDataRegionForQuery {
((TreeMap<Long, List<TsFileResource>>)
partitionTmpUnseqTsFiles).lastKey());
}
for (Entry<Long, List<TsFileResource>> partitionFiles :
partitionTmpSeqTsFiles.entrySet()) {
- recoverFilesInPartition(
- partitionFiles.getKey(), dataRegionRecoveryContext,
partitionFiles.getValue(), true);
+ Callable<Void> asyncRecoverTask =
+ recoverFilesInPartition(
+ partitionFiles.getKey(),
+ dataRegionRecoveryContext,
+ partitionFiles.getValue(),
+ true);
+ if (asyncRecoverTask != null) {
+ asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
+ }
}
for (Entry<Long, List<TsFileResource>> partitionFiles :
partitionTmpUnseqTsFiles.entrySet()) {
- recoverFilesInPartition(
- partitionFiles.getKey(), dataRegionRecoveryContext,
partitionFiles.getValue(), false);
+ Callable<Void> asyncRecoverTask =
+ recoverFilesInPartition(
+ partitionFiles.getKey(),
+ dataRegionRecoveryContext,
+ partitionFiles.getValue(),
+ false);
+ if (asyncRecoverTask != null) {
+ asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
+ }
}
if (config.isEnableSeparateData()) {
TimePartitionManager.getInstance()
@@ -599,16 +608,25 @@ public class DataRegion implements IDataRegionForQuery {
throw new DataRegionException(e);
}
- initCompactionSchedule();
+ if (asyncTsFileResourceRecoverTaskList.isEmpty()) {
+ initCompactionSchedule();
+ }
- if (StorageEngine.getInstance().isAllSgReady()) {
+ if (StorageEngine.getInstance().isReadyForReadAndWrite()) {
logger.info("The data region {}[{}] is created successfully",
databaseName, dataRegionId);
} else {
logger.info("The data region {}[{}] is recovered successfully",
databaseName, dataRegionId);
}
}
- private void updateLastFlushTime(TsFileResource resource, boolean isSeq) {
+ private void updatePartitionLastFlushTime(TsFileResource resource) {
+ if (config.isEnableSeparateData()) {
+ lastFlushTimeMap.updatePartitionFlushedTime(
+ resource.getTimePartition(),
resource.getTimeIndex().getMaxEndTime());
+ }
+ }
+
+ protected void updateDeviceLastFlushTime(TsFileResource resource) {
long timePartitionId = resource.getTimePartition();
Map<IDeviceID, Long> endTimeMap = new HashMap<>();
for (IDeviceID deviceId : resource.getDevices()) {
@@ -623,6 +641,23 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ protected void upgradeAndUpdateDeviceLastFlushTime(
+ long timePartitionId, List<TsFileResource> resources) {
+ Map<IDeviceID, Long> endTimeMap = new HashMap<>();
+ for (TsFileResource resource : resources) {
+ for (IDeviceID deviceId : resource.getDevices()) {
+ long endTime = resource.getEndTime(deviceId);
+ endTimeMap.put(deviceId, endTime);
+ }
+ }
+ if (config.isEnableSeparateData()) {
+ lastFlushTimeMap.upgradeAndUpdateMultiDeviceFlushedTime(timePartitionId,
endTimeMap);
+ }
+ if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+ lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap);
+ }
+ }
+
public void initCompactionSchedule() {
if (!config.isEnableSeqSpaceCompaction()
&& !config.isEnableUnseqSpaceCompaction()
@@ -646,7 +681,7 @@ public class DataRegion implements IDataRegionForQuery {
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
- private List<TsFileResource> getAllFiles(List<String> folders)
+ private Map<Long, List<TsFileResource>> getAllFiles(List<String> folders)
throws IOException, DataRegionException {
// "{partition id}/{tsfile name}" -> tsfile file, remove duplicate files
in one time partition
Map<String, File> tsFilePartitionPath2File = new HashMap<>();
@@ -684,10 +719,12 @@ public class DataRegion implements IDataRegionForQuery {
sortedFiles.sort(this::compareFileName);
long currentTime = System.currentTimeMillis();
- List<TsFileResource> ret = new ArrayList<>();
+ Map<Long, List<TsFileResource>> ret = new TreeMap<>();
for (File f : sortedFiles) {
checkTsFileTime(f, currentTime);
- ret.add(new TsFileResource(f));
+ TsFileResource resource = new TsFileResource(f);
+ ret.computeIfAbsent(resource.getTsFileID().timePartitionId, l -> new
ArrayList<>())
+ .add(resource);
}
return ret;
}
@@ -743,7 +780,7 @@ public class DataRegion implements IDataRegionForQuery {
tsFileResource.remove();
return;
}
- updateLastFlushTime(tsFileResource, isSeq);
+ updateDeviceLastFlushTime(tsFileResource);
tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
FileMetrics.getInstance()
.addTsFile(
@@ -821,7 +858,102 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- private void recoverFilesInPartition(
+ private Callable<Void> recoverFilesInPartition(
+ long partitionId,
+ DataRegionRecoveryContext context,
+ List<TsFileResource> resourceList,
+ boolean isSeq) {
+
+ File partitionSysDir =
+ SystemFileFactory.INSTANCE.getFile(dataRegionSysDir,
String.valueOf(partitionId));
+ File logFile = SystemFileFactory.INSTANCE.getFile(partitionSysDir,
"FileTimeIndexCache_0");
+ if (logFile.exists()) {
+ Map<TsFileID, FileTimeIndex> fileTimeIndexMap;
+ try {
+ FileTimeIndexCacheReader logReader =
+ new FileTimeIndexCacheReader(logFile, dataRegionId, partitionId);
+ fileTimeIndexMap = logReader.read();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>();
+ List<TsFileResource> resourceListForSyncRecover = new ArrayList<>();
+ Callable<Void> asyncRecoverTask = null;
+ for (TsFileResource tsFileResource : resourceList) {
+ if (fileTimeIndexMap.containsKey(tsFileResource.getTsFileID())) {
+
tsFileResource.setTimeIndex(fileTimeIndexMap.get(tsFileResource.getTsFileID()));
+ tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+ tsFileManager.add(tsFileResource, isSeq);
+ resourceListForAsyncRecover.add(tsFileResource);
+ } else {
+ resourceListForSyncRecover.add(tsFileResource);
+ }
+ }
+ if (!resourceListForAsyncRecover.isEmpty()) {
+ asyncRecoverTask =
+ asyncRecoverFilesInPartition(partitionId, context,
resourceListForAsyncRecover, isSeq);
+ }
+ if (!resourceListForSyncRecover.isEmpty()) {
+ syncRecoverFilesInPartition(partitionId, context,
resourceListForSyncRecover, isSeq);
+ }
+ return asyncRecoverTask;
+ } else {
+ syncRecoverFilesInPartition(partitionId, context, resourceList, isSeq);
+ return null;
+ }
+ }
+
+ private Callable<Void> asyncRecoverFilesInPartition(
+ long partitionId,
+ DataRegionRecoveryContext context,
+ List<TsFileResource> resourceList,
+ boolean isSeq) {
+ if (config.isEnableSeparateData()) {
+ if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId,
false)) {
+ TimePartitionManager.getInstance()
+ .registerTimePartitionInfo(
+ new TimePartitionInfo(
+ new DataRegionId(Integer.parseInt(dataRegionId)),
+ partitionId,
+ false,
+ Long.MAX_VALUE,
+ lastFlushTimeMap.getMemSize(partitionId)));
+ }
+ for (TsFileResource tsFileResource : resourceList) {
+ updatePartitionLastFlushTime(tsFileResource);
+ }
+ TimePartitionManager.getInstance()
+ .updateAfterFlushing(
+ new DataRegionId(Integer.parseInt(dataRegionId)),
+ partitionId,
+ System.currentTimeMillis(),
+ lastFlushTimeMap.getMemSize(partitionId),
+ false);
+ }
+ return () -> {
+ for (TsFileResource tsFileResource : resourceList) {
+ try (SealedTsFileRecoverPerformer recoverPerformer =
+ new SealedTsFileRecoverPerformer(tsFileResource)) {
+ recoverPerformer.recover();
+ tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
+ } catch (Throwable e) {
+ logger.error(
+ "Fail to recover sealed TsFile {}, skip it.",
tsFileResource.getTsFilePath(), e);
+ } finally {
+ // update recovery context
+ context.incrementRecoveredFilesNum();
+ }
+ }
+ // After recover, replace partition last flush time with device last
flush time
+ if (config.isEnableSeparateData()) {
+ upgradeAndUpdateDeviceLastFlushTime(partitionId, resourceList);
+ }
+
+ return null;
+ };
+ }
+
+ private void syncRecoverFilesInPartition(
long partitionId,
DataRegionRecoveryContext context,
List<TsFileResource> resourceList,
@@ -829,8 +961,10 @@ public class DataRegion implements IDataRegionForQuery {
for (TsFileResource tsFileResource : resourceList) {
recoverSealedTsFiles(tsFileResource, context, isSeq);
}
+ FileTimeIndexCacheRecorder.getInstance()
+ .logFileTimeIndex(resourceList.toArray(new TsFileResource[0]));
if (config.isEnableSeparateData()) {
- if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId)) {
+ if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId,
true)) {
TimePartitionManager.getInstance()
.registerTimePartitionInfo(
new TimePartitionInfo(
@@ -841,7 +975,7 @@ public class DataRegion implements IDataRegionForQuery {
lastFlushTimeMap.getMemSize(partitionId)));
}
for (TsFileResource tsFileResource : resourceList) {
- updateLastFlushTime(tsFileResource, isSeq);
+ updateDeviceLastFlushTime(tsFileResource);
}
TimePartitionManager.getInstance()
.updateAfterFlushing(
@@ -890,19 +1024,7 @@ public class DataRegion implements IDataRegionForQuery {
}
// init map
long timePartitionId =
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
-
- if (config.isEnableSeparateData()
- &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
- TimePartitionManager.getInstance()
- .registerTimePartitionInfo(
- new TimePartitionInfo(
- new DataRegionId(Integer.parseInt(dataRegionId)),
- timePartitionId,
- true,
- Long.MAX_VALUE,
- 0));
- }
-
+ initFlushTimeMap(timePartitionId);
boolean isSequence =
config.isEnableSeparateData()
&& insertRowNode.getTime()
@@ -988,18 +1110,7 @@ public class DataRegion implements IDataRegionForQuery {
long beforeTimePartition =
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[before]);
// init map
-
- if (config.isEnableSeparateData()
- &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(beforeTimePartition)) {
- TimePartitionManager.getInstance()
- .registerTimePartitionInfo(
- new TimePartitionInfo(
- new DataRegionId(Integer.parseInt(dataRegionId)),
- beforeTimePartition,
- true,
- Long.MAX_VALUE,
- 0));
- }
+ initFlushTimeMap(beforeTimePartition);
long lastFlushTime =
config.isEnableSeparateData()
@@ -1059,6 +1170,20 @@ public class DataRegion implements IDataRegionForQuery {
return dataTTL == Long.MAX_VALUE || (CommonDateTimeUtils.currentTime() -
time) <= dataTTL;
}
+ private void initFlushTimeMap(long timePartitionId) {
+ if (config.isEnableSeparateData()
+ &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId, true)) {
+ TimePartitionManager.getInstance()
+ .registerTimePartitionInfo(
+ new TimePartitionInfo(
+ new DataRegionId(Integer.parseInt(dataRegionId)),
+ timePartitionId,
+ true,
+ Long.MAX_VALUE,
+ 0));
+ }
+ }
+
/**
* insert batch to tsfile processor thread-safety that the caller need to
guarantee The rows to be
* inserted are in the range [start, end) Null value in each column values
will be replaced by the
@@ -1554,6 +1679,8 @@ public class DataRegion implements IDataRegionForQuery {
"{} will close all files for deleting data folder {}",
databaseName + "-" + dataRegionId,
systemDir);
+ FileTimeIndexCacheRecorder.getInstance()
+ .removeFileTimeIndexCache(Integer.parseInt(dataRegionId));
writeLock("deleteFolder");
try {
File dataRegionSystemFolder =
@@ -2769,17 +2896,8 @@ public class DataRegion implements IDataRegionForQuery {
if (config.isEnableSeparateData()) {
final DataRegionId dataRegionId = new
DataRegionId(Integer.parseInt(this.dataRegionId));
final long timePartitionId = newTsFileResource.getTimePartition();
- if
(!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
- TimePartitionManager.getInstance()
- .registerTimePartitionInfo(
- new TimePartitionInfo(
- dataRegionId,
- timePartitionId,
- false,
- Long.MAX_VALUE,
- lastFlushTimeMap.getMemSize(timePartitionId)));
- }
- updateLastFlushTime(newTsFileResource);
+ initFlushTimeMap(timePartitionId);
+ updateDeviceLastFlushTime(newTsFileResource);
TimePartitionManager.getInstance()
.updateAfterFlushing(
dataRegionId,
@@ -2905,23 +3023,6 @@ public class DataRegion implements IDataRegionForQuery {
return version;
}
- /**
- * Update latest time in latestTimeForEachDevice and
- * partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load
external tsfile module.
- */
- protected void updateLastFlushTime(TsFileResource newTsFileResource) {
- for (IDeviceID device : newTsFileResource.getDevices()) {
- long endTime = newTsFileResource.getEndTime(device);
- long timePartitionId = TimePartitionUtils.getTimePartitionId(endTime);
- if (config.isEnableSeparateData()) {
- lastFlushTimeMap.updateOneDeviceFlushedTime(timePartitionId, device,
endTime);
- }
- if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
- lastFlushTimeMap.updateOneDeviceGlobalFlushedTime(device, endTime);
- }
- }
- }
-
/**
* Execute the loading process by the type.
*
@@ -3292,17 +3393,7 @@ public class DataRegion implements IDataRegionForQuery {
// init map
long timePartitionId =
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
- if (config.isEnableSeparateData()
- &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
- TimePartitionManager.getInstance()
- .registerTimePartitionInfo(
- new TimePartitionInfo(
- new DataRegionId(Integer.valueOf(dataRegionId)),
- timePartitionId,
- true,
- Long.MAX_VALUE,
- 0));
- }
+ initFlushTimeMap(timePartitionId);
boolean isSequence =
config.isEnableSeparateData()
@@ -3407,17 +3498,7 @@ public class DataRegion implements IDataRegionForQuery {
// init map
timePartitionIds[i] =
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
- if (config.isEnableSeparateData()
- &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionIds[i])) {
- TimePartitionManager.getInstance()
- .registerTimePartitionInfo(
- new TimePartitionInfo(
- new DataRegionId(Integer.parseInt(dataRegionId)),
- timePartitionIds[i],
- true,
- Long.MAX_VALUE,
- 0));
- }
+ initFlushTimeMap(timePartitionIds[i]);
areSequence[i] =
config.isEnableSeparateData()
&& insertRowNode.getTime()
@@ -3516,6 +3597,10 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ public File getDataRegionSysDir() {
+ return dataRegionSysDir;
+ }
+
public void addSettleFilesToList(
List<TsFileResource> seqResourcesToBeSettled,
List<TsFileResource> unseqResourcesToBeSettled,
@@ -3694,6 +3779,12 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ public void compactFileTimeIndexCache() {
+ for (long timePartition : partitionMaxFileVersions.keySet()) {
+ tsFileManager.compactFileTimeIndexCache(timePartition);
+ }
+ }
+
@TestOnly
public ILastFlushTimeMap getLastFlushTimeMap() {
return lastFlushTimeMap;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
index fe62176469b..f02044b0414 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
@@ -49,4 +49,8 @@ public class DeviceLastFlushTime implements ILastFlushTime {
}
return new PartitionLastFlushTime(maxTime);
}
+
+ Map<IDeviceID, Long> getDeviceLastFlushTimeMap() {
+ return deviceLastFlushTimeMap;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
index 4060eb97bb2..3f0abfd3e24 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.storageengine.dataregion;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+
import org.apache.tsfile.file.metadata.IDeviceID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,22 +61,7 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
/** record memory cost of map for each partitionId */
private final Map<Long, Long> memCostForEachPartition = new
ConcurrentHashMap<>();
- // For load
- @Override
- public void updateOneDeviceFlushedTime(long timePartitionId, IDeviceID
deviceId, long time) {
- ILastFlushTime flushTimeMapForPartition =
- partitionLatestFlushedTime.computeIfAbsent(
- timePartitionId, id -> new DeviceLastFlushTime());
- long lastFlushTime = flushTimeMapForPartition.getLastFlushTime(deviceId);
- if (lastFlushTime == Long.MIN_VALUE) {
- long memCost = HASHMAP_NODE_BASIC_SIZE + deviceId.ramBytesUsed();
- memCostForEachPartition.compute(
- timePartitionId, (k1, v1) -> v1 == null ? memCost : v1 + memCost);
- }
- flushTimeMapForPartition.updateLastFlushTime(deviceId, time);
- }
-
- // For recover
+ // For sync recover resource without fileTimeIndexCache and load
@Override
public void updateMultiDeviceFlushedTime(
long timePartitionId, Map<IDeviceID, Long> flushedTimeMap) {
@@ -82,7 +69,7 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
partitionLatestFlushedTime.computeIfAbsent(
timePartitionId, id -> new DeviceLastFlushTime());
- long memIncr = 0;
+ long memIncr = 0L;
for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
if (flushTimeMapForPartition.getLastFlushTime(entry.getKey()) ==
Long.MIN_VALUE) {
memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
@@ -94,10 +81,60 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 +
finalMemIncr);
}
+ // For async recover resource with fileTimeIndexCache
@Override
- public void updateOneDeviceGlobalFlushedTime(IDeviceID path, long time) {
- globalLatestFlushedTimeForEachDevice.compute(
- path, (k, v) -> v == null ? time : Math.max(v, time));
+ public void upgradeAndUpdateMultiDeviceFlushedTime(
+ long timePartitionId, Map<IDeviceID, Long> flushedTimeMap) {
+ ILastFlushTime flushTimeMapForPartition =
+ partitionLatestFlushedTime.computeIfAbsent(
+ timePartitionId, id -> new DeviceLastFlushTime());
+ // upgrade DeviceLastFlushTime to PartitionLastFlushTime
+ if (flushTimeMapForPartition instanceof PartitionLastFlushTime) {
+ long maxFlushTime = flushTimeMapForPartition.getLastFlushTime(null);
+ ILastFlushTime newDeviceLastFlushTime = new DeviceLastFlushTime();
+ long memIncr = 0;
+ for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
+ memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
+ newDeviceLastFlushTime.updateLastFlushTime(entry.getKey(),
entry.getValue());
+ maxFlushTime = Math.max(maxFlushTime, entry.getValue());
+ }
+ long finalMemIncr = memIncr;
+ memCostForEachPartition.compute(
+ timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 +
finalMemIncr);
+ } else {
+ // go here when DeviceLastFlushTime was recovered by wal recovery
+ long memIncr = 0;
+ for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
+ if (flushTimeMapForPartition.getLastFlushTime(entry.getKey()) ==
Long.MIN_VALUE) {
+ memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
+ }
+ flushTimeMapForPartition.updateLastFlushTime(entry.getKey(),
entry.getValue());
+ }
+ long finalMemIncr = memIncr;
+ memCostForEachPartition.compute(
+ timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 +
finalMemIncr);
+ }
+ }
+
+ // For fileTimeIndexCache recovered before the async resource recover start
+ @Override
+ public void updatePartitionFlushedTime(long timePartitionId, long
maxFlushedTime) {
+ ILastFlushTime flushTimeMapForPartition =
+ partitionLatestFlushedTime.computeIfAbsent(
+ timePartitionId, id -> new PartitionLastFlushTime(maxFlushedTime));
+
+ if (flushTimeMapForPartition instanceof PartitionLastFlushTime) {
+ long memIncr = Long.BYTES;
+ flushTimeMapForPartition.updateLastFlushTime(null, maxFlushedTime);
+ memCostForEachPartition.putIfAbsent(timePartitionId, memIncr);
+ } else {
+ // go here when DeviceLastFlushTime was recovered by wal recovery
+ DeviceLastFlushTime deviceLastFlushTime = (DeviceLastFlushTime)
flushTimeMapForPartition;
+ Map<IDeviceID, Long> flushedTimeMap =
deviceLastFlushTime.getDeviceLastFlushTimeMap();
+ for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
+ flushTimeMapForPartition.updateLastFlushTime(entry.getKey(),
entry.getValue());
+ }
+ }
}
@Override
@@ -108,9 +145,14 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
}
@Override
- public boolean checkAndCreateFlushedTimePartition(long timePartitionId) {
+ public boolean checkAndCreateFlushedTimePartition(
+ long timePartitionId, boolean usingDeviceFlushTime) {
if (!partitionLatestFlushedTime.containsKey(timePartitionId)) {
- partitionLatestFlushedTime.put(timePartitionId, new
DeviceLastFlushTime());
+ partitionLatestFlushedTime.put(
+ timePartitionId,
+ usingDeviceFlushTime
+ ? new DeviceLastFlushTime()
+ : new PartitionLastFlushTime(Long.MIN_VALUE));
return false;
}
return true;
@@ -135,8 +177,14 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
return
partitionLatestFlushedTime.get(timePartitionId).getLastFlushTime(deviceId);
}
+ // This method is for creating last cache entry when insert
@Override
public long getGlobalFlushedTime(IDeviceID path) {
+ // If TsFileResource is not fully recovered, we should return
Long.MAX_VALUE
+ // to avoid create Last cache entry
+ if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
+ return Long.MAX_VALUE;
+ }
return globalLatestFlushedTimeForEachDevice.getOrDefault(path,
Long.MIN_VALUE);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
index e809730cc0e..7bdd141bf6b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
@@ -27,23 +27,22 @@ import java.util.Map;
public interface ILastFlushTimeMap {
// region update
- /** Update partitionLatestFlushedTime. */
- void updateOneDeviceFlushedTime(long timePartitionId, IDeviceID deviceId,
long time);
-
void updateMultiDeviceFlushedTime(long timePartitionId, Map<IDeviceID, Long>
flushedTimeMap);
- /** Update globalLatestFlushedTimeForEachDevice. */
- void updateOneDeviceGlobalFlushedTime(IDeviceID path, long time);
+ void updatePartitionFlushedTime(long timePartitionId, long maxFlushedTime);
void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long>
globalFlushedTimeMap);
+ void upgradeAndUpdateMultiDeviceFlushedTime(
+ long timePartitionId, Map<IDeviceID, Long> flushedTimeMap);
+
/** Update both partitionLatestFlushedTime and
globalLatestFlushedTimeForEachDevice. */
void updateLatestFlushTime(long partitionId, Map<IDeviceID, Long> updateMap);
// endregion
// region ensure
- boolean checkAndCreateFlushedTimePartition(long timePartitionId);
+ boolean checkAndCreateFlushedTimePartition(long timePartitionId, boolean
usingDeviceFlushTime);
// endregion
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
index 3469a0a6dec..17ad0dd4334 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
@@ -53,7 +53,7 @@ public class CompactionScheduleTaskWorker implements
Callable<Void> {
while (true) {
try {
Thread.sleep(IoTDBDescriptor.getInstance().getConfig().getCompactionScheduleIntervalInMs());
- if (!StorageEngine.getInstance().isAllSgReady()) {
+ if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
continue;
}
List<DataRegion> dataRegionListSnapshot = new
ArrayList<>(dataRegionList);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
index f7341876ec5..d7757bb3ff9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
@@ -53,7 +53,7 @@ public class TTLScheduleTask implements Callable<Void> {
while (true) {
try {
Thread.sleep(ttlCheckInterval);
- if (!StorageEngine.getInstance().isAllSgReady()) {
+ if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
continue;
}
List<DataRegion> dataRegionListSnapshot = new
ArrayList<>(dataRegionList);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index f30dd1799d4..bc69d29d10d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -64,6 +64,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.DiskAlign
import
org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.DiskChunkHandleImpl;
import
org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.UnclosedFileScanHandleImpl;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder;
import org.apache.iotdb.db.storageengine.dataregion.utils.SharedTimeDataBuffer;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode;
@@ -1583,6 +1584,7 @@ public class TsFileProcessor {
}
writer.endFile();
tsFileResource.serialize();
+ FileTimeIndexCacheRecorder.getInstance().logFileTimeIndex(tsFileResource);
if (logger.isDebugEnabled()) {
logger.debug("Ended file {}", tsFileResource);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
index e3bb6adc748..c9656382e3d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileID.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile;
+import java.util.Objects;
+
import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
import static org.apache.tsfile.utils.FilePathUtils.splitTsFilePath;
@@ -104,6 +106,27 @@ public class TsFileID {
return versionArray;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TsFileID that = (TsFileID) o;
+ return regionId == that.regionId
+ && timePartitionId == that.timePartitionId
+ && timestamp == that.timestamp
+ && fileVersion == that.fileVersion
+ && compactionVersion == that.compactionVersion;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(regionId, timePartitionId, timestamp, fileVersion,
compactionVersion);
+ }
+
public long getTimestamp() {
return timestamp;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
index dc242858bce..06409cf3cf8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder;
import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
import org.apache.tsfile.read.filter.basic.Filter;
@@ -39,7 +40,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TsFileManager {
private final String storageGroupName;
private String dataRegionId;
- private final String storageGroupDir;
+ private final String dataRegionSysDir;
/** Serialize queries, delete resource files, compaction cleanup files */
private final ReadWriteLock resourceListLock = new ReentrantReadWriteLock();
@@ -52,9 +53,9 @@ public class TsFileManager {
private volatile boolean allowCompaction = true;
private final AtomicLong currentCompactionTaskSerialId = new AtomicLong(0);
- public TsFileManager(String storageGroupName, String dataRegionId, String
storageGroupDir) {
+ public TsFileManager(String storageGroupName, String dataRegionId, String
dataRegionSysDir) {
this.storageGroupName = storageGroupName;
- this.storageGroupDir = storageGroupDir;
+ this.dataRegionSysDir = dataRegionSysDir;
this.dataRegionId = dataRegionId;
}
@@ -260,6 +261,7 @@ public class TsFileManager {
.computeIfAbsent(timePartition, t -> new TsFileResourceList())
.keepOrderInsert(resource);
}
+ FileTimeIndexCacheRecorder.getInstance().logFileTimeIndex(resource);
}
}
} finally {
@@ -339,8 +341,8 @@ public class TsFileManager {
return storageGroupName;
}
- public String getStorageGroupDir() {
- return storageGroupDir;
+ public String getDataRegionSysDir() {
+ return dataRegionSysDir;
}
public Set<Long> getTimePartitions() {
@@ -389,4 +391,19 @@ public class TsFileManager {
return (sequenceFiles.higherKey(timePartitionId) == null
&& unsequenceFiles.higherKey(timePartitionId) == null);
}
+
+ public void compactFileTimeIndexCache(long timePartition) {
+ readLock();
+ try {
+ FileTimeIndexCacheRecorder.getInstance()
+ .compactFileTimeIndexIfNeeded(
+ storageGroupName,
+ Integer.parseInt(dataRegionId),
+ timePartition,
+ sequenceFiles.get(timePartition),
+ unsequenceFiles.get(timePartition));
+ } finally {
+ readUnlock();
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 6159149d9f4..bc24dec1049 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -60,6 +60,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -292,6 +293,20 @@ public class TsFileResource {
}
}
+ public static int getFileTimeIndexSerializedSize() {
+ // 5 * 8 Byte means 5 long numbers of tsFileID.timestamp,
tsFileID.fileVersion
+ // tsFileID.compactionVersion, timeIndex.getMinStartTime(),
timeIndex.getMaxStartTime()
+ return 5 * Long.BYTES;
+ }
+
+ public void serializeFileTimeIndexToByteBuffer(ByteBuffer buffer) {
+ buffer.putLong(tsFileID.timestamp);
+ buffer.putLong(tsFileID.fileVersion);
+ buffer.putLong(tsFileID.compactionVersion);
+ buffer.putLong(timeIndex.getMinStartTime());
+ buffer.putLong(timeIndex.getMaxEndTime());
+ }
+
public void updateStartTime(IDeviceID device, long time) {
timeIndex.updateStartTime(device, time);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java
new file mode 100644
index 00000000000..b0c805bba6b
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList;
+import
org.apache.iotdb.db.storageengine.dataregion.utils.fileTimeIndexCache.FileTimeIndexCacheWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent;
+import static
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource.getFileTimeIndexSerializedSize;
+
+public class FileTimeIndexCacheRecorder {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileTimeIndexCacheRecorder.class);
+
+ private static final int VERSION = 0;
+
+ protected static final String FILE_NAME = "FileTimeIndexCache_" + VERSION;
+
+ private final ScheduledExecutorService recordFileIndexThread;
+
+ private final BlockingQueue<Runnable> taskQueue = new
LinkedBlockingQueue<>();
+
+ private final Map<Integer, Map<Long, FileTimeIndexCacheWriter>> writerMap =
+ new ConcurrentHashMap<>();
+
+ private FileTimeIndexCacheRecorder() {
+ recordFileIndexThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.FILE_TIME_INDEX_RECORD.getName());
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ recordFileIndexThread, this::executeTasks, 100, 100,
TimeUnit.MILLISECONDS);
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ recordFileIndexThread,
+ StorageEngine.getInstance().executeCompactFileTimeIndexCache(),
+ 120_000L,
+ 120_000L,
+ TimeUnit.MILLISECONDS);
+ }
+
+ private void executeTasks() {
+ Runnable task;
+ while ((task = taskQueue.poll()) != null) {
+ recordFileIndexThread.submit(task);
+ }
+ }
+
+ public void logFileTimeIndex(TsFileResource... tsFileResources) {
+ if (tsFileResources != null && tsFileResources.length > 0) {
+ TsFileResource firstResource = tsFileResources[0];
+ TsFileID tsFileID = firstResource.getTsFileID();
+ int dataRegionId = tsFileID.regionId;
+ long partitionId = tsFileID.timePartitionId;
+ File dataRegionSysDir =
+ StorageEngine.getDataRegionSystemDir(
+ firstResource.getDatabaseName(),
firstResource.getDataRegionId());
+ FileTimeIndexCacheWriter writer = getWriter(dataRegionId, partitionId,
dataRegionSysDir);
+ boolean result =
+ taskQueue.offer(
+ () -> {
+ try {
+ ByteBuffer buffer =
+ ByteBuffer.allocate(
+ getFileTimeIndexSerializedSize() *
tsFileResources.length);
+ for (TsFileResource tsFileResource : tsFileResources) {
+ tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+ }
+ buffer.flip();
+ writer.write(buffer);
+ } catch (IOException e) {
+ LOGGER.warn("Meet error when record FileTimeIndexCache: {}",
e.getMessage());
+ }
+ });
+ if (!result) {
+ LOGGER.warn("Meet error when record FileTimeIndexCache");
+ }
+ }
+ }
+
+ public void compactFileTimeIndexIfNeeded(
+ String dataBaseName,
+ int dataRegionId,
+ long partitionId,
+ TsFileResourceList sequenceFiles,
+ TsFileResourceList unsequenceFiles) {
+ FileTimeIndexCacheWriter writer =
+ getWriter(
+ dataRegionId,
+ partitionId,
+ StorageEngine.getDataRegionSystemDir(dataBaseName,
String.valueOf(dataRegionId)));
+
+ int currentResourceCount =
+ (sequenceFiles == null ? 0 : sequenceFiles.size())
+ + (unsequenceFiles == null ? 0 : unsequenceFiles.size());
+ if (writer.getLogFile().length()
+ > currentResourceCount * getFileTimeIndexSerializedSize() * 100L) {
+
+ boolean result =
+ taskQueue.offer(
+ () -> {
+ try {
+ writer.clearFile();
+ if (sequenceFiles != null && !sequenceFiles.isEmpty()) {
+ ByteBuffer buffer =
+ ByteBuffer.allocate(
+ getFileTimeIndexSerializedSize() *
sequenceFiles.size());
+ for (TsFileResource tsFileResource : sequenceFiles) {
+
tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+ }
+ buffer.flip();
+ writer.write(buffer);
+ }
+ if (unsequenceFiles != null && !unsequenceFiles.isEmpty()) {
+ ByteBuffer buffer =
+ ByteBuffer.allocate(
+ getFileTimeIndexSerializedSize() *
unsequenceFiles.size());
+ for (TsFileResource tsFileResource : unsequenceFiles) {
+
tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+ }
+ buffer.flip();
+ writer.write(buffer);
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Meet error when compact FileTimeIndexCache:
{}", e.getMessage());
+ }
+ });
+ if (!result) {
+ LOGGER.warn("Meet error when compact FileTimeIndexCache");
+ }
+ }
+ }
+
+ private FileTimeIndexCacheWriter getWriter(
+ int dataRegionId, long partitionId, File dataRegionSysDir) {
+ return writerMap
+ .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(
+ partitionId,
+ k -> {
+ File partitionDir =
+ SystemFileFactory.INSTANCE.getFile(dataRegionSysDir,
String.valueOf(partitionId));
+ File logFile = SystemFileFactory.INSTANCE.getFile(partitionDir,
FILE_NAME);
+ try {
+ if (!partitionDir.exists() && !partitionDir.mkdirs()) {
+ LOGGER.debug(
+ "Partition directory has existed,filePath:{}",
+ partitionDir.getAbsolutePath());
+ }
+ if (!logFile.createNewFile()) {
+ LOGGER.debug(
+ "Partition log file has existed,filePath:{}",
logFile.getAbsolutePath());
+ }
+ return new FileTimeIndexCacheWriter(logFile, true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ public void close() throws IOException {
+ for (Map<Long, FileTimeIndexCacheWriter> partitionWriterMap :
writerMap.values()) {
+ for (FileTimeIndexCacheWriter writer : partitionWriterMap.values()) {
+ writer.close();
+ }
+ }
+ }
+
+ public void removeFileTimeIndexCache(int dataRegionId) {
+ Map<Long, FileTimeIndexCacheWriter> partitionWriterMap =
writerMap.get(dataRegionId);
+ if (partitionWriterMap != null) {
+ for (FileTimeIndexCacheWriter writer : partitionWriterMap.values()) {
+ try {
+ writer.close();
+ deleteDirectoryAndEmptyParent(writer.getLogFile());
+ } catch (IOException e) {
+ LOGGER.warn("Meet error when close FileTimeIndexCache: {}",
e.getMessage());
+ }
+ }
+ }
+ }
+
+ public static FileTimeIndexCacheRecorder getInstance() {
+ return FileTimeIndexCacheRecorder.InstanceHolder.INSTANCE;
+ }
+
+ private static class InstanceHolder {
+ private InstanceHolder() {}
+
+ private static final FileTimeIndexCacheRecorder INSTANCE = new
FileTimeIndexCacheRecorder();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java
new file mode 100644
index 00000000000..35682bd8020
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheReader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils.fileTimeIndexCache;
+
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource.getFileTimeIndexSerializedSize;
+
+public class FileTimeIndexCacheReader {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileTimeIndexCacheReader.class);
+
+ private final File logFile;
+ private final long fileLength;
+ private final int dataRegionId;
+ private final long partitionId;
+
+ public FileTimeIndexCacheReader(File logFile, String dataRegionId, long
partitionId) {
+ this.logFile = logFile;
+ this.fileLength = logFile.length();
+ this.dataRegionId = Integer.parseInt(dataRegionId);
+ this.partitionId = partitionId;
+ }
+
+ public Map<TsFileID, FileTimeIndex> read() throws IOException {
+ Map<TsFileID, FileTimeIndex> fileTimeIndexMap = new HashMap<>();
+ long readLength = 0L;
+ try (DataInputStream logStream =
+ new DataInputStream(new
BufferedInputStream(Files.newInputStream(logFile.toPath())))) {
+ while (readLength < fileLength) {
+ long timestamp = logStream.readLong();
+ long fileVersion = logStream.readLong();
+ long compactionVersion = logStream.readLong();
+ long minStartTime = logStream.readLong();
+ long maxEndTime = logStream.readLong();
+ TsFileID tsFileID =
+ new TsFileID(dataRegionId, partitionId, timestamp, fileVersion,
compactionVersion);
+ FileTimeIndex fileTimeIndex = new FileTimeIndex(minStartTime,
maxEndTime);
+ fileTimeIndexMap.put(tsFileID, fileTimeIndex);
+ readLength += getFileTimeIndexSerializedSize();
+ }
+ } catch (IOException ignored) {
+ // the error can be ignored
+ }
+ if (readLength != fileLength) {
+ // if the file is complete, we can truncate the file
+ try (FileChannel channel = FileChannel.open(logFile.toPath(),
StandardOpenOption.WRITE)) {
+ channel.truncate(readLength);
+ }
+ }
+ return fileTimeIndexMap;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheWriter.java
new file mode 100644
index 00000000000..dce4f8b1103
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/fileTimeIndexCache/FileTimeIndexCacheWriter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.utils.fileTimeIndexCache;
+
+import org.apache.iotdb.db.utils.writelog.ILogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+
+public class FileTimeIndexCacheWriter implements ILogWriter {
+ private static final Logger logger =
LoggerFactory.getLogger(FileTimeIndexCacheWriter.class);
+
+ private final File logFile;
+ private FileOutputStream fileOutputStream;
+ private FileChannel channel;
+ private final boolean forceEachWrite;
+
+ public FileTimeIndexCacheWriter(File logFile, boolean forceEachWrite)
+ throws FileNotFoundException {
+ this.logFile = logFile;
+ this.forceEachWrite = forceEachWrite;
+
+ fileOutputStream = new FileOutputStream(logFile, true);
+ channel = fileOutputStream.getChannel();
+ }
+
+ @Override
+ public void write(ByteBuffer logBuffer) throws IOException {
+
+ try {
+ channel.write(logBuffer);
+ if (this.forceEachWrite) {
+ channel.force(true);
+ }
+ } catch (ClosedChannelException ignored) {
+ logger.warn("someone interrupt current thread, so no need to do write
for io safety");
+ }
+ }
+
+ @Override
+ public void force() throws IOException {
+ if (channel != null && channel.isOpen()) {
+ channel.force(true);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (channel != null) {
+ if (channel.isOpen()) {
+ channel.force(false);
+ }
+ fileOutputStream.close();
+ fileOutputStream = null;
+ channel.close();
+ channel = null;
+ }
+ }
+
+ public void clearFile() throws IOException {
+ close();
+ Files.delete(this.logFile.toPath());
+ if (!logFile.createNewFile()) {
+ logger.warn("Partition log file has existed,filePath:{}",
logFile.getAbsolutePath());
+ }
+ fileOutputStream = new FileOutputStream(logFile, true);
+ channel = fileOutputStream.getChannel();
+ }
+
+ @Override
+ public String toString() {
+ return "LogWriter{" + "logFile=" + logFile + '}';
+ }
+
+ public File getLogFile() {
+ return logFile;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
index 72b4237c868..c7af6e2e518 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALRecoverException;
import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALRecoverListener;
@@ -285,6 +286,7 @@ public class UnsealedTsFileRecoverPerformer extends
AbstractTsFileRecoverPerform
// currently, we close this file anyway
writer.endFile();
tsFileResource.serialize();
+
FileTimeIndexCacheRecorder.getInstance().logFileTimeIndex(tsFileResource);
} catch (IOException | ExecutionException e) {
throw new WALRecoverException(e);
} catch (InterruptedException e) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
index 880a5732bc7..20b3dfe378a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
@@ -205,7 +205,7 @@ public class LastFlushTimeMapTest {
unseqResource1.setFile(unseqResourceFile1);
unseqResource1.updateStartTime(device, 1);
unseqResource1.updateEndTime(device, 100);
- dataRegion.updateLastFlushTime(unseqResource1);
+ dataRegion.updateDeviceLastFlushTime(unseqResource1);
File unseqResourceFile2 = new File(unseqDirPath + File.separator +
"5-5-0-0.tsfile.resource");
TsFileResource unseqResource2 = new TsFileResource();
@@ -213,7 +213,7 @@ public class LastFlushTimeMapTest {
unseqResource2.setFile(unseqResourceFile2);
unseqResource2.updateStartTime(device, 1);
unseqResource2.updateEndTime(device, 10);
- dataRegion.updateLastFlushTime(unseqResource2);
+ dataRegion.updateDeviceLastFlushTime(unseqResource2);
File unseqResourceFile3 = new File(unseqDirPath + File.separator +
"6-6-0-0.tsfile.resource");
TsFileResource unseqResource3 = new TsFileResource();
@@ -221,7 +221,7 @@ public class LastFlushTimeMapTest {
unseqResource3.setFile(unseqResourceFile3);
unseqResource3.updateStartTime(device, 1);
unseqResource3.updateEndTime(device, 70);
- dataRegion.updateLastFlushTime(unseqResource3);
+ dataRegion.updateDeviceLastFlushTime(unseqResource3);
Assert.assertEquals(100,
dataRegion.getLastFlushTimeMap().getFlushedTime(0, device));
dataRegion.getLastFlushTimeMap().degradeLastFlushTime(0);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 04443ba99fe..1f160e07758 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -38,6 +38,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.flush.FlushManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.QueryResourceManager;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import
org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
@@ -224,6 +225,7 @@ public class EnvironmentUtils {
for (String path : tierManager.getAllLocalUnSequenceFileFolders()) {
cleanDir(path);
}
+ FileTimeIndexCacheRecorder.getInstance().close();
// delete system info
cleanDir(config.getSystemDir());
// delete query
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index e731f50628c..102022417e9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -183,6 +183,8 @@ public enum ThreadName {
REGION_MIGRATE("Region-Migrate-Pool"),
STORAGE_ENGINE_RECOVER_TRIGGER("StorageEngine-RecoverTrigger"),
REPAIR_DATA("RepairData"),
+ FILE_TIME_INDEX_RECORD("FileTimeIndexRecord"),
+
// the unknown thread name is used for metrics
UNKOWN("UNKNOWN");