This is an automated email from the ASF dual-hosted git repository.
yschengzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new afde755d4f7 [IOTDB-6273] Load: Memory Management Framework (#11756)
afde755d4f7 is described below
commit afde755d4f7d68013282e8e83701b4e20ada9e48
Author: Itami Sho <[email protected]>
AuthorDate: Fri Dec 29 19:45:21 2023 +0800
[IOTDB-6273] Load: Memory Management Framework (#11756)
* done
* fix
* refactor tsfile analyze schema
* add memory control for data cache
* add release policy for data cache memory block
* fix iterator
* try to load liu zhen dataset
* fix send data and improve writing deletions
* improve deletion data writing on load tsfile manager
* done
* fix
* refactor tsfile analyze schema
* add memory control for data cache
* add release policy for data cache memory block
* fix iterator
* try to load liu zhen dataset
* fix send data and improve writing deletions
* improve deletion data writing on load tsfile manager
* add LoadMemoryMetrics
* remove debug log
* fix config
* queryEngine provides tryAllocate, forceAllocate & release methods for load
* fix UsedMemorySize calculation inaccuracy
* adjust args
---------
Co-authored-by: yschengzi <[email protected]>
---
.../it/env/cluster/config/MppDataNodeConfig.java | 9 +
.../it/env/remote/config/RemoteDataNodeConfig.java | 6 +
.../apache/iotdb/itbase/env/DataNodeConfig.java | 3 +
.../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java | 4 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 57 ++++-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 11 +-
.../exception/LoadRuntimeOutOfMemoryException.java | 15 +-
.../db/queryengine/execution/load/ChunkData.java | 3 +
.../queryengine/execution/load/DeletionData.java | 17 +-
.../execution/load/LoadTsFileManager.java | 51 +++-
.../db/queryengine/execution/load/TsFileData.java | 3 -
.../load/LoadTsFileAbstractMemoryBlock.java | 66 +++++
.../load/LoadTsFileAnalyzeSchemaMemoryBlock.java | 98 +++++++
.../load/LoadTsFileDataCacheMemoryBlock.java | 147 +++++++++++
.../queryengine/load/LoadTsFileMemoryManager.java | 149 +++++++++++
.../queryengine/metric/LoadTsFileMemMetricSet.java | 101 ++++++++
.../plan/analyze/LoadTsfileAnalyzer.java | 284 +++++++++++++++++----
.../plan/planner/LocalExecutionPlanner.java | 33 ++-
.../plan/scheduler/load/LoadTsFileScheduler.java | 34 ++-
.../db/service/metrics/DataNodeMetricsHelper.java | 4 +
.../dataregion/modification/Deletion.java | 4 +
.../dataregion/modification/ModificationFile.java | 19 ++
.../io/LocalTextModificationAccessor.java | 9 +-
.../modification/io/ModificationWriter.java | 9 +
.../iotdb/commons/service/metric/enums/Metric.java | 3 +-
...leSequenceReaderTimeseriesMetadataIterator.java | 42 ++-
...quenceReaderTimeseriesMetadataIteratorTest.java | 8 +-
27 files changed, 1076 insertions(+), 113 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index c188ad388c4..0f22e0d4286 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -67,4 +67,13 @@ public class MppDataNodeConfig extends MppBaseConfig
implements DataNodeConfig {
properties.setProperty("dn_connection_timeout_ms",
String.valueOf(connectionTimeoutInMS));
return this;
}
+
+ @Override
+ public DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
+ long loadTsFileAnalyzeSchemaMemorySizeInBytes) {
+ properties.setProperty(
+ "load_tsfile_analyze_schema_memory_size_in_bytes",
+ String.valueOf(loadTsFileAnalyzeSchemaMemorySizeInBytes));
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index 19f0ab36132..fe89997bc41 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -37,4 +37,10 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
public DataNodeConfig setConnectionTimeoutInMS(int connectionTimeoutInMS) {
return this;
}
+
+ @Override
+ public DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
+ long loadTsFileAnalyzeSchemaMemorySizeInBytes) {
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index db39fa62b42..2887b0a9871 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -28,4 +28,7 @@ public interface DataNodeConfig {
DataNodeConfig setEnableRestService(boolean enableRestService);
DataNodeConfig setConnectionTimeoutInMS(int connectionTimeoutInMS);
+
+ DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
+ long loadTsFileAnalyzeSchemaMemorySizeInBytes);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
index b449afc553b..d887bc5a53b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java
@@ -65,6 +65,7 @@ public class IOTDBLoadTsFileIT {
private static final Logger LOGGER =
LoggerFactory.getLogger(IOTDBLoadTsFileIT.class);
private static final long PARTITION_INTERVAL = 10 * 1000L;
private static final int connectionTimeoutInMS = (int)
TimeUnit.SECONDS.toMillis(300);
+ private static final long loadTsFileAnalyzeSchemaMemorySizeInBytes = 10 *
1024L;
private File tmpDir;
@@ -75,7 +76,8 @@ public class IOTDBLoadTsFileIT {
EnvFactory.getEnv()
.getConfig()
.getDataNodeConfig()
- .setConnectionTimeoutInMS(connectionTimeoutInMS);
+ .setConnectionTimeoutInMS(connectionTimeoutInMS)
+
.setLoadTsFileAnalyzeSchemaMemorySizeInBytes(loadTsFileAnalyzeSchemaMemorySizeInBytes);
EnvFactory.getEnv().initClusterEnvironment();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5cc59bd4837..a12192cf29c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -174,8 +174,6 @@ public class IoTDBConfig {
/** The proportion of write memory for loading TsFile */
private double loadTsFileProportion = 0.125;
- private int maxLoadingTimeseriesNumber = 2000;
-
/**
* If memory cost of data region increased more than proportion of
{@linkplain
* IoTDBConfig#getAllocateMemoryForStorageEngine()}*{@linkplain
@@ -235,6 +233,7 @@ public class IoTDBConfig {
/** The period when outdated wal files are periodically deleted. Unit:
millisecond */
private volatile long deleteWalFilesPeriodInMs = 20 * 1000L;
+
// endregion
/**
@@ -543,6 +542,7 @@ public class IoTDBConfig {
* tasks containing mods files are selected first.
*/
private long innerCompactionTaskSelectionModsFileThreshold = 10 * 1024 *
1024L;
+
/**
* When disk availability is lower than the sum of
(disk_space_warning_threshold +
* inner_compaction_task_selection_disk_redundancy), inner compaction tasks
containing mods files
@@ -968,6 +968,7 @@ public class IoTDBConfig {
/** Number of queues per forwarding trigger */
private int triggerForwardMaxQueueNumber = 8;
+
/** The length of one of the queues per forwarding trigger */
private int triggerForwardMaxSizePerQueue = 2000;
@@ -1093,6 +1094,15 @@ public class IoTDBConfig {
private int maxPendingBatchesNum = 5;
private double maxMemoryRatioForQueue = 0.6;
+ /** Load related */
+ private int loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber = 4096;
+
+ private long loadTsFileAnalyzeSchemaMemorySizeInBytes =
+ 0; // 0 means that the decision will be adaptive based on the number of
sequences
+
+ private long loadMemoryAllocateRetryIntervalMs = 1000;
+ private int loadMemoryAllocateMaxRetries = 5;
+
/** Pipe related */
/** initialized as empty, updated based on the latest `systemDir` during
querying */
private String[] pipeReceiverFileDirs = new String[0];
@@ -3273,14 +3283,6 @@ public class IoTDBConfig {
return loadTsFileProportion;
}
- public int getMaxLoadingTimeseriesNumber() {
- return maxLoadingTimeseriesNumber;
- }
-
- public void setMaxLoadingTimeseriesNumber(int maxLoadingTimeseriesNumber) {
- this.maxLoadingTimeseriesNumber = maxLoadingTimeseriesNumber;
- }
-
public static String getEnvironmentVariables() {
return "\n\t"
+ IoTDBConstant.IOTDB_HOME
@@ -3735,6 +3737,41 @@ public class IoTDBConfig {
return modeMapSizeThreshold;
}
+ public int getLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber() {
+ return loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber;
+ }
+
+ public void setLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber(
+ int loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber) {
+ this.loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber =
+ loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber;
+ }
+
+ public long getLoadTsFileAnalyzeSchemaMemorySizeInBytes() {
+ return loadTsFileAnalyzeSchemaMemorySizeInBytes;
+ }
+
+ public void setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
+ long loadTsFileAnalyzeSchemaMemorySizeInBytes) {
+ this.loadTsFileAnalyzeSchemaMemorySizeInBytes =
loadTsFileAnalyzeSchemaMemorySizeInBytes;
+ }
+
+ public long getLoadMemoryAllocateRetryIntervalMs() {
+ return loadMemoryAllocateRetryIntervalMs;
+ }
+
+ public void setLoadMemoryAllocateRetryIntervalMs(long
loadMemoryAllocateRetryIntervalMs) {
+ this.loadMemoryAllocateRetryIntervalMs = loadMemoryAllocateRetryIntervalMs;
+ }
+
+ public int getLoadMemoryAllocateMaxRetries() {
+ return loadMemoryAllocateMaxRetries;
+ }
+
+ public void setLoadMemoryAllocateMaxRetries(int
loadMemoryAllocateMaxRetries) {
+ this.loadMemoryAllocateMaxRetries = loadMemoryAllocateMaxRetries;
+ }
+
public void setPipeReceiverFileDirs(String[] pipeReceiverFileDirs) {
this.pipeReceiverFileDirs = pipeReceiverFileDirs;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6a8eb822521..494e9d5d1c4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -893,11 +893,16 @@ public class IoTDBDescriptor {
conf.setIntoOperationExecutionThreadCount(2);
}
- conf.setMaxLoadingTimeseriesNumber(
+ conf.setLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber(
Integer.parseInt(
properties.getProperty(
- "max_loading_timeseries_number",
- String.valueOf(conf.getMaxLoadingTimeseriesNumber()))));
+ "load_tsfile_analyze_schema_batch_flush_time_series_number",
+
String.valueOf(conf.getLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber()))));
+ conf.setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
+ Long.parseLong(
+ properties.getProperty(
+ "load_tsfile_analyze_schema_memory_size_in_bytes",
+
String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes()))));
conf.setExtPipeDir(properties.getProperty("ext_pipe_dir",
conf.getExtPipeDir()).trim());
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadRuntimeOutOfMemoryException.java
similarity index 67%
copy from
integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadRuntimeOutOfMemoryException.java
index db39fa62b42..050274e9940 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadRuntimeOutOfMemoryException.java
@@ -17,15 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.itbase.env;
+package org.apache.iotdb.db.exception;
-import java.util.List;
-
-/** This interface is used to handle properties in iotdb-datanode.properties.
*/
-public interface DataNodeConfig {
- DataNodeConfig setMetricReporterType(List<String> metricReporterTypes);
-
- DataNodeConfig setEnableRestService(boolean enableRestService);
-
- DataNodeConfig setConnectionTimeoutInMS(int connectionTimeoutInMS);
+public class LoadRuntimeOutOfMemoryException extends RuntimeException {
+ public LoadRuntimeOutOfMemoryException(String message) {
+ super(message);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java
index a30a62e2b16..fde207266e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import java.io.IOException;
import java.io.InputStream;
@@ -45,6 +46,8 @@ public interface ChunkData extends TsFileData {
void writeDecodePage(long[] times, Object[] values, int satisfiedLength)
throws IOException;
+ void writeToFileWriter(TsFileIOWriter writer) throws IOException;
+
@Override
default boolean isModification() {
return false;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
index 3f486af4adb..18bb50c9c6c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java
@@ -23,11 +23,9 @@ import
org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -40,18 +38,13 @@ public class DeletionData implements TsFileData {
@Override
public long getDataSize() {
- return Long.BYTES;
+ return deletion.getSerializedSize();
}
- @Override
- public void writeToFileWriter(TsFileIOWriter writer) throws IOException {
- File tsFile = writer.getFile();
- try (ModificationFile modificationFile =
- new ModificationFile(tsFile.getAbsolutePath() +
ModificationFile.FILE_SUFFIX)) {
- writer.flush();
- deletion.setFileOffset(tsFile.length());
- modificationFile.write(deletion);
- }
+ public void writeToModificationFile(ModificationFile modificationFile, long
fileOffset)
+ throws IOException {
+ deletion.setFileOffset(fileOffset);
+ modificationFile.writeWithoutSync(deletion);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 7c92b327a39..8b8eb97946c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePie
import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
import
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -132,7 +133,7 @@ public class LoadTsFileManager {
writerManager.write(
new DataPartitionInfo(dataRegion,
chunkData.getTimePartitionSlot()), chunkData);
} else {
- writerManager.writeDeletion(tsFileData);
+ writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData);
}
}
}
@@ -198,12 +199,14 @@ public class LoadTsFileManager {
private final File taskDir;
private Map<DataPartitionInfo, TsFileIOWriter> dataPartition2Writer;
private Map<DataPartitionInfo, String> dataPartition2LastDevice;
+ private Map<DataPartitionInfo, ModificationFile>
dataPartition2ModificationFile;
private boolean isClosed;
private TsFileWriterManager(File taskDir) {
this.taskDir = taskDir;
this.dataPartition2Writer = new HashMap<>();
this.dataPartition2LastDevice = new HashMap<>();
+ this.dataPartition2ModificationFile = new HashMap<>();
this.isClosed = false;
clearDir(taskDir);
@@ -245,12 +248,32 @@ public class LoadTsFileManager {
chunkData.writeToFileWriter(writer);
}
- private void writeDeletion(TsFileData deletionData) throws IOException {
+ private void writeDeletion(DataRegion dataRegion, DeletionData
deletionData)
+ throws IOException {
if (isClosed) {
throw new
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
}
for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry :
dataPartition2Writer.entrySet()) {
- deletionData.writeToFileWriter(entry.getValue());
+ final DataPartitionInfo partitionInfo = entry.getKey();
+ if (partitionInfo.getDataRegion().equals(dataRegion)) {
+ final TsFileIOWriter writer = entry.getValue();
+ if (!dataPartition2ModificationFile.containsKey(partitionInfo)) {
+ File newModificationFile =
+ SystemFileFactory.INSTANCE.getFile(
+ writer.getFile().getAbsolutePath() +
ModificationFile.FILE_SUFFIX);
+ if (!newModificationFile.createNewFile()) {
+ LOGGER.error(
+ "Can not create ModificationFile {} for writing.",
newModificationFile.getPath());
+ return;
+ }
+
+ dataPartition2ModificationFile.put(
+ partitionInfo, new
ModificationFile(newModificationFile.getAbsolutePath()));
+ }
+ ModificationFile modificationFile =
dataPartition2ModificationFile.get(partitionInfo);
+ writer.flush();
+ deletionData.writeToModificationFile(modificationFile,
writer.getFile().length());
+ }
}
}
@@ -258,6 +281,10 @@ public class LoadTsFileManager {
if (isClosed) {
throw new
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
}
+ for (Map.Entry<DataPartitionInfo, ModificationFile> entry :
+ dataPartition2ModificationFile.entrySet()) {
+ entry.getValue().close();
+ }
for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry :
dataPartition2Writer.entrySet()) {
TsFileIOWriter writer = entry.getValue();
if (writer.isWritingChunkGroup()) {
@@ -302,7 +329,7 @@ public class LoadTsFileManager {
if (dataPartition2Writer != null) {
for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry :
dataPartition2Writer.entrySet()) {
try {
- TsFileIOWriter writer = entry.getValue();
+ final TsFileIOWriter writer = entry.getValue();
if (writer.canWrite()) {
writer.close();
}
@@ -315,6 +342,21 @@ public class LoadTsFileManager {
}
}
}
+ if (dataPartition2ModificationFile != null) {
+ for (Map.Entry<DataPartitionInfo, ModificationFile> entry :
+ dataPartition2ModificationFile.entrySet()) {
+ try {
+ final ModificationFile modificationFile = entry.getValue();
+ modificationFile.close();
+ final Path modificationFilePath = new
File(modificationFile.getFilePath()).toPath();
+ if (Files.exists(modificationFilePath)) {
+ Files.delete(modificationFilePath);
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Close ModificationFile {} error.",
entry.getValue().getFilePath(), e);
+ }
+ }
+ }
try {
Files.delete(taskDir.toPath());
} catch (DirectoryNotEmptyException e) {
@@ -324,6 +366,7 @@ public class LoadTsFileManager {
}
dataPartition2Writer = null;
dataPartition2LastDevice = null;
+ dataPartition2ModificationFile = null;
isClosed = true;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
index a53a3f5fc30..c5df4f3f8d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.queryengine.execution.load;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -31,8 +30,6 @@ import java.io.InputStream;
public interface TsFileData {
long getDataSize();
- void writeToFileWriter(TsFileIOWriter writer) throws IOException;
-
boolean isModification();
void serialize(DataOutputStream stream) throws IOException;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAbstractMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAbstractMemoryBlock.java
new file mode 100644
index 00000000000..4f2ad607ccb
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAbstractMemoryBlock.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.load;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class LoadTsFileAbstractMemoryBlock implements AutoCloseable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsFileAbstractMemoryBlock.class);
+ protected static final LoadTsFileMemoryManager MEMORY_MANAGER =
+ LoadTsFileMemoryManager.getInstance();
+
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+ public boolean hasEnoughMemory() {
+ return hasEnoughMemory(0L);
+ }
+
+ public abstract boolean hasEnoughMemory(long memoryTobeAddedInBytes);
+
+ public abstract void addMemoryUsage(long memoryInBytes);
+
+ public abstract void reduceMemoryUsage(long memoryInBytes);
+
+ /**
+ * Release all memory of this block.
+ *
+ * <p>NOTE: This method should be called only by {@link
LoadTsFileAbstractMemoryBlock#close()}.
+ */
+ protected abstract void releaseAllMemory();
+
+ public boolean isClosed() {
+ return isClosed.get();
+ }
+
+ @Override
+ public void close() {
+ if (isClosed.compareAndSet(false, true)) {
+ try {
+ releaseAllMemory();
+ } catch (Exception e) {
+ LOGGER.error("Release memory block {} failed", this, e);
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java
new file mode 100644
index 00000000000..17aa4150bcd
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.load;
+
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.queryengine.metric.LoadTsFileMemMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class LoadTsFileAnalyzeSchemaMemoryBlock extends
LoadTsFileAbstractMemoryBlock {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(LoadTsFileAnalyzeSchemaMemoryBlock.class);
+
+ private final long totalMemorySizeInBytes;
+ private final AtomicLong memoryUsageInBytes;
+
+ LoadTsFileAnalyzeSchemaMemoryBlock(long totalMemorySizeInBytes) {
+ super();
+
+ this.totalMemorySizeInBytes = totalMemorySizeInBytes;
+ this.memoryUsageInBytes = new AtomicLong(0);
+ }
+
+ @Override
+ public boolean hasEnoughMemory(long memoryTobeAddedInBytes) {
+ return memoryUsageInBytes.get() + memoryTobeAddedInBytes <=
totalMemorySizeInBytes;
+ }
+
+ @Override
+ public void addMemoryUsage(long memoryInBytes) {
+ memoryUsageInBytes.addAndGet(memoryInBytes);
+
+ MetricService.getInstance()
+ .getOrCreateGauge(
+ Metric.LOAD_MEM.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ LoadTsFileMemMetricSet.LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY)
+ .incr(memoryInBytes);
+ }
+
+ @Override
+ public void reduceMemoryUsage(long memoryInBytes) {
+ if (memoryUsageInBytes.addAndGet(-memoryInBytes) < 0) {
+ LOGGER.warn("{} has reduce memory usage to negative", this);
+ }
+
+ MetricService.getInstance()
+ .getOrCreateGauge(
+ Metric.LOAD_MEM.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ LoadTsFileMemMetricSet.LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY)
+ .decr(memoryInBytes);
+ }
+
+ @Override
+ protected void releaseAllMemory() {
+ if (memoryUsageInBytes.get() != 0) {
+ LOGGER.warn(
+ "Try to release memory from a memory block {} which has not released
all memory", this);
+ }
+ MEMORY_MANAGER.releaseToQuery(totalMemorySizeInBytes);
+ }
+
+ @Override
+ public String toString() {
+ return "LoadTsFileAnalyzeSchemaMemoryBlock{"
+ + "totalMemorySizeInBytes="
+ + totalMemorySizeInBytes
+ + ", memoryUsageInBytes="
+ + memoryUsageInBytes.get()
+ + '}';
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileDataCacheMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileDataCacheMemoryBlock.java
new file mode 100644
index 00000000000..7a22951d672
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileDataCacheMemoryBlock.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.load;
+
+import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class LoadTsFileDataCacheMemoryBlock extends
LoadTsFileAbstractMemoryBlock {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(LoadTsFileDataCacheMemoryBlock.class);
+ private static final long MINIMUM_MEMORY_SIZE_IN_BYTES = 1024 * 1024L; // 1
MB
+ private static final int MAX_ASK_FOR_MEMORY_COUNT = 256; // must be a power
of 2
+ private static final long EACH_ASK_MEMORY_SIZE_IN_BYTES =
+ Math.max(
+ MINIMUM_MEMORY_SIZE_IN_BYTES,
+ LoadTsFileMemoryManager.MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 4);
+
+ private final AtomicLong limitedMemorySizeInBytes;
+ private final AtomicLong memoryUsageInBytes;
+ private final AtomicInteger askForMemoryCount;
+ private final AtomicInteger referenceCount;
+
+ LoadTsFileDataCacheMemoryBlock(long initialLimitedMemorySizeInBytes) {
+ super();
+
+ if (initialLimitedMemorySizeInBytes < MINIMUM_MEMORY_SIZE_IN_BYTES) {
+ throw new LoadRuntimeOutOfMemoryException(
+ String.format(
+ "The initial limited memory size %d is less than the minimum
memory size %d",
+ initialLimitedMemorySizeInBytes, MINIMUM_MEMORY_SIZE_IN_BYTES));
+ }
+
+ this.limitedMemorySizeInBytes = new
AtomicLong(initialLimitedMemorySizeInBytes);
+ this.memoryUsageInBytes = new AtomicLong(0L);
+ this.askForMemoryCount = new AtomicInteger(1);
+ this.referenceCount = new AtomicInteger(0);
+ }
+
+ @Override
+ public boolean hasEnoughMemory(long memoryTobeAddedInBytes) {
+ return memoryUsageInBytes.get() + memoryTobeAddedInBytes <=
limitedMemorySizeInBytes.get();
+ }
+
+ @Override
+ public void addMemoryUsage(long memoryInBytes) {
+ memoryUsageInBytes.addAndGet(memoryInBytes);
+
+ askForMemoryCount.getAndUpdate(
+ count -> {
+ if ((count & (count - 1)) == 0) {
+ // count is a power of 2
+ long actuallyAllocateMemorySizeInBytes =
+
MEMORY_MANAGER.tryAllocateFromQuery(EACH_ASK_MEMORY_SIZE_IN_BYTES);
+
limitedMemorySizeInBytes.addAndGet(actuallyAllocateMemorySizeInBytes);
+ if (actuallyAllocateMemorySizeInBytes <
EACH_ASK_MEMORY_SIZE_IN_BYTES) {
+ return (count & (MAX_ASK_FOR_MEMORY_COUNT - 1)) + 1;
+ } else {
+ return 1;
+ }
+ }
+ return (count & (MAX_ASK_FOR_MEMORY_COUNT - 1)) + 1;
+ });
+ }
+
+ @Override
+ public void reduceMemoryUsage(long memoryInBytes) {
+ memoryUsageInBytes.addAndGet(-memoryInBytes);
+ }
+
+ @Override
+ protected void releaseAllMemory() {
+ if (memoryUsageInBytes.get() != 0) {
+ LOGGER.warn(
+ "Try to release memory from a memory block {} which has not released
all memory", this);
+ }
+ MEMORY_MANAGER.releaseToQuery(limitedMemorySizeInBytes.get());
+ }
+
+ public boolean doShrink(long shrinkMemoryInBytes) {
+ if (shrinkMemoryInBytes < 0) {
+ LOGGER.warn(
+ "Try to shrink a negative memory size {} from memory block {}",
+ shrinkMemoryInBytes,
+ this);
+ return false;
+ } else if (shrinkMemoryInBytes == 0) {
+ return true;
+ }
+
+ if (limitedMemorySizeInBytes.get() - shrinkMemoryInBytes <=
MINIMUM_MEMORY_SIZE_IN_BYTES) {
+ return false;
+ }
+
+ limitedMemorySizeInBytes.addAndGet(-shrinkMemoryInBytes);
+ return true;
+ }
+
+ void updateReferenceCount(int delta) {
+ referenceCount.addAndGet(delta);
+ }
+
+ int getReferenceCount() {
+ return referenceCount.get();
+ }
+
+ long getMemoryUsageInBytes() {
+ return memoryUsageInBytes.get();
+ }
+
+ long getLimitedMemorySizeInBytes() {
+ return limitedMemorySizeInBytes.get();
+ }
+
+ @Override
+ public String toString() {
+ return "LoadTsFileDataCacheMemoryBlock{"
+ + "limitedMemorySizeInBytes="
+ + limitedMemorySizeInBytes.get()
+ + ", memoryUsageInBytes="
+ + memoryUsageInBytes.get()
+ + ", askForMemoryCount="
+ + askForMemoryCount.get()
+ + '}';
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
new file mode 100644
index 00000000000..aa4d28bbd02
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.load;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class LoadTsFileMemoryManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsFileMemoryManager.class);
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+ private static final LocalExecutionPlanner QUERY_ENGINE_MEMORY_MANAGER =
+ LocalExecutionPlanner.getInstance();
+ public static final long MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES =
+ QUERY_ENGINE_MEMORY_MANAGER.getAllocateMemoryForOperators();
+ private static final int MEMORY_ALLOCATE_MAX_RETRIES =
CONFIG.getLoadMemoryAllocateMaxRetries();
+ private static final long MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS =
+ CONFIG.getLoadMemoryAllocateRetryIntervalMs();
+
+ private final AtomicLong usedMemorySizeInBytes = new AtomicLong(0);
+ private LoadTsFileDataCacheMemoryBlock dataCacheMemoryBlock;
+
+ private synchronized void forceAllocatedFromQuery(long sizeInBytes)
+ throws LoadRuntimeOutOfMemoryException {
+ for (int i = 0; i < MEMORY_ALLOCATE_MAX_RETRIES; i++) {
+ // allocate memory from queryEngine
+ if
(QUERY_ENGINE_MEMORY_MANAGER.forceAllocateFreeMemoryForOperators(sizeInBytes)) {
+ usedMemorySizeInBytes.addAndGet(sizeInBytes);
+ return;
+ }
+
+ // wait for available memory
+ try {
+ this.wait(MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("forceAllocate: interrupted while waiting for available
memory", e);
+ }
+ }
+
+ throw new LoadRuntimeOutOfMemoryException(
+ String.format(
+ "forceAllocate: failed to allocate memory from query engine after
%d retries, "
+ + "total query memory %s, used memory size %d bytes, "
+ + "requested memory size %d bytes",
+ MEMORY_ALLOCATE_MAX_RETRIES,
+
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators(),
+ usedMemorySizeInBytes.get(),
+ sizeInBytes));
+ }
+
+ public synchronized long tryAllocateFromQuery(long sizeInBytes) {
+ long actuallyAllocateMemoryInBytes =
+ Math.max(0L,
QUERY_ENGINE_MEMORY_MANAGER.tryAllocateFreeMemoryForOperators(sizeInBytes));
+ usedMemorySizeInBytes.addAndGet(actuallyAllocateMemoryInBytes);
+ return actuallyAllocateMemoryInBytes;
+ }
+
+ public synchronized void releaseToQuery(long sizeInBytes) {
+ usedMemorySizeInBytes.addAndGet(-sizeInBytes);
+ QUERY_ENGINE_MEMORY_MANAGER.releaseToFreeMemoryForOperators(sizeInBytes);
+ this.notifyAll();
+ }
+
+ public synchronized LoadTsFileAnalyzeSchemaMemoryBlock
allocateAnalyzeSchemaMemoryBlock(
+ long sizeInBytes) throws LoadRuntimeOutOfMemoryException {
+ try {
+ forceAllocatedFromQuery(sizeInBytes);
+ } catch (LoadRuntimeOutOfMemoryException e) {
+ if (dataCacheMemoryBlock != null &&
dataCacheMemoryBlock.doShrink(sizeInBytes)) {
+ return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes);
+ }
+ throw e;
+ }
+ return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes);
+ }
+
+ public synchronized LoadTsFileDataCacheMemoryBlock
allocateDataCacheMemoryBlock()
+ throws LoadRuntimeOutOfMemoryException {
+ if (dataCacheMemoryBlock == null) {
+ long actuallyAllocateMemoryInBytes =
+ tryAllocateFromQuery(MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 2);
+ dataCacheMemoryBlock = new
LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes);
+ LOGGER.info(
+ "Create Data Cache Memory Block {}, allocate memory {}",
+ dataCacheMemoryBlock,
+ actuallyAllocateMemoryInBytes);
+ }
+ dataCacheMemoryBlock.updateReferenceCount(1);
+ return dataCacheMemoryBlock;
+ }
+
+ public synchronized void releaseDataCacheMemoryBlock() {
+ dataCacheMemoryBlock.updateReferenceCount(-1);
+ if (dataCacheMemoryBlock.getReferenceCount() == 0) {
+ LOGGER.info("Release Data Cache Memory Block {}", dataCacheMemoryBlock);
+ dataCacheMemoryBlock.close();
+ dataCacheMemoryBlock = null;
+ }
+ }
+
+ // used for Metrics
+ public long getUsedMemorySizeInBytes() {
+ return usedMemorySizeInBytes.get();
+ }
+
+ public long getDataCacheUsedMemorySizeInBytes() {
+ return dataCacheMemoryBlock == null ? 0 :
dataCacheMemoryBlock.getMemoryUsageInBytes();
+ }
+
+ public long getDataCacheLimitedMemorySizeInBytes() {
+ return dataCacheMemoryBlock == null ? 0 :
dataCacheMemoryBlock.getLimitedMemorySizeInBytes();
+ }
+
+ ///////////////////////////// SINGLETON /////////////////////////////
+ private LoadTsFileMemoryManager() {}
+
+ public static LoadTsFileMemoryManager getInstance() {
+ return LoadTsFileMemoryManagerHolder.INSTANCE;
+ }
+
+ public static class LoadTsFileMemoryManagerHolder {
+ private static final LoadTsFileMemoryManager INSTANCE = new
LoadTsFileMemoryManager();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/LoadTsFileMemMetricSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/LoadTsFileMemMetricSet.java
new file mode 100644
index 00000000000..7e822421302
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/LoadTsFileMemMetricSet.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class LoadTsFileMemMetricSet implements IMetricSet {
+
+ private static final String LOAD_TSFILE_USED_MEMORY = "LoadTsFileUsedMemory";
+ public static final String LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY =
"LoadTsFileAnalyzeSchemaMemory";
+
+ private static final String LOAD_TSFILE_DATA_CACHE_MEMORY =
"LoadTsFileDataCacheMemory";
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ metricService.createAutoGauge(
+ Metric.LOAD_MEM.toString(),
+ MetricLevel.IMPORTANT,
+ LoadTsFileMemoryManager.getInstance(),
+ LoadTsFileMemoryManager::getUsedMemorySizeInBytes,
+ Tag.NAME.toString(),
+ LOAD_TSFILE_USED_MEMORY);
+
+ metricService.createAutoGauge(
+ Metric.LOAD_MEM.toString(),
+ MetricLevel.IMPORTANT,
+ LoadTsFileMemoryManager.getInstance(),
+ LoadTsFileMemoryManager::getDataCacheUsedMemorySizeInBytes,
+ Tag.NAME.toString(),
+ LOAD_TSFILE_DATA_CACHE_MEMORY);
+
+ metricService
+ .getOrCreateGauge(
+ Metric.LOAD_MEM.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY)
+ .set(0L);
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.LOAD_MEM.toString(),
+ Tag.NAME.toString(),
+ LOAD_TSFILE_USED_MEMORY);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.LOAD_MEM.toString(),
+ Tag.NAME.toString(),
+ LOAD_TSFILE_DATA_CACHE_MEMORY);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.LOAD_MEM.toString(),
+ Tag.NAME.toString(),
+ LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY);
+ }
+
+ //////////////////////////// singleton ////////////////////////////
+
+ private static class LoadTsFileMemMetricSetHolder {
+
+ private static final LoadTsFileMemMetricSet INSTANCE = new
LoadTsFileMemMetricSet();
+
+ private LoadTsFileMemMetricSetHolder() {
+ // empty constructor
+ }
+ }
+
+ public static LoadTsFileMemMetricSet getInstance() {
+ return LoadTsFileMemMetricSet.LoadTsFileMemMetricSetHolder.INSTANCE;
+ }
+
+ private LoadTsFileMemMetricSet() {
+ // empty constructor
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
index 57b15dbdc02..84860f5d847 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
@@ -33,9 +33,11 @@ import
org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.LoadReadOnlyException;
+import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException;
import org.apache.iotdb.db.exception.VerifyMetadataException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
@@ -45,6 +47,8 @@ import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.queryengine.load.LoadTsFileAnalyzeSchemaMemoryBlock;
+import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator;
@@ -81,6 +85,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -92,6 +97,19 @@ public class LoadTsfileAnalyzer {
private static final IClientManager<ConfigRegionId, ConfigNodeClient>
CONFIG_NODE_CLIENT_MANAGER =
ConfigNodeClientManager.getInstance();
+ private static final int BATCH_FLUSH_TIME_SERIES_NUMBER;
+ private static final long ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES;
+ private static final long FLUSH_ALIGNED_CACHE_MEMORY_SIZE_IN_BYTES;
+
+ static {
+ final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+ BATCH_FLUSH_TIME_SERIES_NUMBER =
CONFIG.getLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber();
+ ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES =
+ CONFIG.getLoadTsFileAnalyzeSchemaMemorySizeInBytes() <= 0
+ ? ((long) BATCH_FLUSH_TIME_SERIES_NUMBER) << 10
+ : CONFIG.getLoadTsFileAnalyzeSchemaMemorySizeInBytes();
+ FLUSH_ALIGNED_CACHE_MEMORY_SIZE_IN_BYTES =
ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES >> 1;
+ }
private final LoadTsFileStatement loadTsFileStatement;
private final MPPQueryContext context;
@@ -99,8 +117,7 @@ public class LoadTsfileAnalyzer {
private final IPartitionFetcher partitionFetcher;
private final ISchemaFetcher schemaFetcher;
- private final SchemaAutoCreatorAndVerifier schemaAutoCreatorAndVerifier =
- new SchemaAutoCreatorAndVerifier();
+ private final SchemaAutoCreatorAndVerifier schemaAutoCreatorAndVerifier;
LoadTsfileAnalyzer(
LoadTsFileStatement loadTsFileStatement,
@@ -112,6 +129,16 @@ public class LoadTsfileAnalyzer {
this.partitionFetcher = partitionFetcher;
this.schemaFetcher = schemaFetcher;
+
+ try {
+ this.schemaAutoCreatorAndVerifier = new SchemaAutoCreatorAndVerifier();
+ } catch (LoadRuntimeOutOfMemoryException e) {
+ LOGGER.warn("Can not allocate memory for analyze TsFile schema.", e);
+ throw new SemanticException(
+ String.format(
+ "Can not allocate memory for analyze TsFile schema when
executing statement %s.",
+ loadTsFileStatement));
+ }
}
public Analysis analyzeFileByFile() {
@@ -150,15 +177,16 @@ public class LoadTsfileAnalyzer {
i + 1, tsfileNum, String.format("%.3f", (i + 1) * 100.00 /
tsfileNum));
}
} catch (IllegalArgumentException e) {
- schemaAutoCreatorAndVerifier.clear();
+ schemaAutoCreatorAndVerifier.close();
LOGGER.warn(
"Parse file {} to resource error, this TsFile maybe empty.",
tsFile.getPath(), e);
throw new SemanticException(
String.format("TsFile %s is empty or incomplete.",
tsFile.getPath()));
} catch (AuthException e) {
+ schemaAutoCreatorAndVerifier.close();
return createFailAnalysisForAuthException(e);
} catch (Exception e) {
- schemaAutoCreatorAndVerifier.clear();
+ schemaAutoCreatorAndVerifier.close();
LOGGER.warn("Parse file {} to resource error.", tsFile.getPath(), e);
throw new SemanticException(
String.format(
@@ -168,9 +196,10 @@ public class LoadTsfileAnalyzer {
try {
schemaAutoCreatorAndVerifier.flush();
- schemaAutoCreatorAndVerifier.clear();
} catch (AuthException e) {
return createFailAnalysisForAuthException(e);
+ } finally {
+ schemaAutoCreatorAndVerifier.close();
}
LOGGER.info("Load - Analysis Stage: all tsfiles have been analyzed.");
@@ -185,7 +214,7 @@ public class LoadTsfileAnalyzer {
try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
// can be reused when constructing tsfile resource
final TsFileSequenceReaderTimeseriesMetadataIterator
timeseriesMetadataIterator =
- new TsFileSequenceReaderTimeseriesMetadataIterator(reader, true);
+ new TsFileSequenceReaderTimeseriesMetadataIterator(reader, true, 1);
long writePointCount = 0;
@@ -239,7 +268,6 @@ public class LoadTsfileAnalyzer {
}
private Analysis createFailAnalysisForAuthException(AuthException e) {
- schemaAutoCreatorAndVerifier.clear();
Analysis analysis = new Analysis();
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(RpcUtils.getStatus(e.getCode(), e.getMessage()));
@@ -247,14 +275,11 @@ public class LoadTsfileAnalyzer {
}
private final class SchemaAutoCreatorAndVerifier {
+ private final LoadTsFileAnalyzeSchemaCache schemaCache;
- private final Map<String, Boolean> tsfileDevice2IsAligned = new
HashMap<>();
- private final Map<String, Set<MeasurementSchema>>
currentBatchDevice2TimeseriesSchemas =
- new HashMap<>();
-
- private final Set<PartialPath> alreadySetDatabases = new HashSet<>();
-
- private SchemaAutoCreatorAndVerifier() {}
+ private SchemaAutoCreatorAndVerifier() throws
LoadRuntimeOutOfMemoryException {
+ this.schemaCache = new LoadTsFileAnalyzeSchemaCache();
+ }
public void autoCreateAndVerify(
TsFileSequenceReader reader,
@@ -267,7 +292,9 @@ public class LoadTsfileAnalyzer {
for (final TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
final TSDataType dataType = timeseriesMetadata.getTsDataType();
if (TSDataType.VECTOR.equals(dataType)) {
- tsfileDevice2IsAligned.put(device, true);
+ schemaCache
+ .clearDeviceIsAlignedCacheIfNecessary(); // must execute
before add aligned cache
+ schemaCache.addIsAlignedCache(device, true, false);
// not a timeseries, skip
} else {
@@ -301,21 +328,25 @@ public class LoadTsfileAnalyzer {
}
final Pair<CompressionType, TSEncoding> compressionEncodingPair =
reader.readTimeseriesCompressionTypeAndEncoding(timeseriesMetadata);
- currentBatchDevice2TimeseriesSchemas
- .computeIfAbsent(device, o -> new HashSet<>())
- .add(
- new MeasurementSchema(
- timeseriesMetadata.getMeasurementId(),
- dataType,
- compressionEncodingPair.getRight(),
- compressionEncodingPair.getLeft()));
-
- tsfileDevice2IsAligned.putIfAbsent(device, false);
+ schemaCache.addTimeSeries(
+ device,
+ new MeasurementSchema(
+ timeseriesMetadata.getMeasurementId(),
+ dataType,
+ compressionEncodingPair.getRight(),
+ compressionEncodingPair.getLeft()));
+
+ schemaCache.addIsAlignedCache(device, false, true);
+ if (!schemaCache.getDeviceIsAligned(device)) {
+ schemaCache.clearDeviceIsAlignedCacheIfNecessary();
+ }
+ }
+
+ if (schemaCache.shouldFlushTimeSeries()) {
+ flush();
}
}
}
-
- flush();
}
/**
@@ -326,20 +357,17 @@ public class LoadTsfileAnalyzer {
throws SemanticException, AuthException {
// avoid OOM when loading a tsfile with too many timeseries
// or loading too many tsfiles at the same time
- if (tsfileDevice2IsAligned.size() > 10000) {
- flush();
- tsfileDevice2IsAligned.clear();
- }
+ schemaCache.clearDeviceIsAlignedCacheIfNecessary();
}
public void flush() throws AuthException {
doAutoCreateAndVerify();
- currentBatchDevice2TimeseriesSchemas.clear();
+ schemaCache.clearTimeSeries();
}
private void doAutoCreateAndVerify() throws SemanticException,
AuthException {
- if (currentBatchDevice2TimeseriesSchemas.isEmpty()) {
+ if (schemaCache.getDevice2TimeSeries().isEmpty()) {
return;
}
@@ -372,7 +400,7 @@ public class LoadTsfileAnalyzer {
private void makeSureNoDuplicatedMeasurementsInDevices() throws
VerifyMetadataException {
for (final Map.Entry<String, Set<MeasurementSchema>> entry :
- currentBatchDevice2TimeseriesSchemas.entrySet()) {
+ schemaCache.getDevice2TimeSeries().entrySet()) {
final String device = entry.getKey();
final Map<String, MeasurementSchema> measurement2Schema = new
HashMap<>();
for (final MeasurementSchema timeseriesSchema : entry.getValue()) {
@@ -391,7 +419,7 @@ public class LoadTsfileAnalyzer {
final int databasePrefixNodesLength =
loadTsFileStatement.getDatabaseLevel() + 1;
final Set<PartialPath> databasesNeededToBeSet = new HashSet<>();
- for (final String device :
currentBatchDevice2TimeseriesSchemas.keySet()) {
+ for (final String device : schemaCache.getDevice2TimeSeries().keySet()) {
final PartialPath devicePath = new PartialPath(device);
final String[] devicePrefixNodes = devicePath.getNodes();
@@ -409,7 +437,7 @@ public class LoadTsfileAnalyzer {
}
// 1. filter out the databases that already exist
- if (alreadySetDatabases.isEmpty()) {
+ if (schemaCache.getAlreadySetDatabases().isEmpty()) {
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TGetDatabaseReq req =
@@ -422,13 +450,13 @@ public class LoadTsfileAnalyzer {
final TShowDatabaseResp resp = configNodeClient.showDatabase(req);
for (final String databaseName : resp.getDatabaseInfoMap().keySet())
{
- alreadySetDatabases.add(new PartialPath(databaseName));
+ schemaCache.addAlreadySetDatabase(new PartialPath(databaseName));
}
} catch (IOException | TException | ClientManagerException e) {
throw new LoadFileException(e);
}
}
- databasesNeededToBeSet.removeAll(alreadySetDatabases);
+ databasesNeededToBeSet.removeAll(schemaCache.getAlreadySetDatabases());
// 2. create the databases that do not exist
for (final PartialPath databasePath : databasesNeededToBeSet) {
@@ -439,7 +467,7 @@ public class LoadTsfileAnalyzer {
statement.setEnablePrintExceptionLog(false);
executeSetDatabaseStatement(statement);
- alreadySetDatabases.add(databasePath);
+ schemaCache.addAlreadySetDatabase(databasePath);
}
}
@@ -484,7 +512,7 @@ public class LoadTsfileAnalyzer {
final List<Boolean> isAlignedList = new ArrayList<>();
for (final Map.Entry<String, Set<MeasurementSchema>> entry :
- currentBatchDevice2TimeseriesSchemas.entrySet()) {
+ schemaCache.getDevice2TimeSeries().entrySet()) {
final int measurementSize = entry.getValue().size();
final String[] measurements = new String[measurementSize];
final TSDataType[] tsDataTypes = new TSDataType[measurementSize];
@@ -504,7 +532,7 @@ public class LoadTsfileAnalyzer {
dataTypeList.add(tsDataTypes);
encodingsList.add(encodings);
compressionTypesList.add(compressionTypes);
- isAlignedList.add(tsfileDevice2IsAligned.get(entry.getKey()));
+ isAlignedList.add(schemaCache.getDeviceIsAligned(entry.getKey()));
}
return SchemaValidator.validate(
@@ -521,7 +549,7 @@ public class LoadTsfileAnalyzer {
private void verifySchema(ISchemaTree schemaTree)
throws VerifyMetadataException, IllegalPathException {
for (final Map.Entry<String, Set<MeasurementSchema>> entry :
- currentBatchDevice2TimeseriesSchemas.entrySet()) {
+ schemaCache.getDevice2TimeSeries().entrySet()) {
final String device = entry.getKey();
final List<MeasurementSchema> tsfileTimeseriesSchemas = new
ArrayList<>(entry.getValue());
final DeviceSchemaInfo iotdbDeviceSchemaInfo =
@@ -540,7 +568,7 @@ public class LoadTsfileAnalyzer {
}
// check device schema: is aligned or not
- final boolean isAlignedInTsFile = tsfileDevice2IsAligned.get(device);
+ final boolean isAlignedInTsFile =
schemaCache.getDeviceIsAligned(device);
final boolean isAlignedInIoTDB = iotdbDeviceSchemaInfo.isAligned();
if (isAlignedInTsFile != isAlignedInIoTDB) {
throw new VerifyMetadataException(
@@ -608,10 +636,174 @@ public class LoadTsfileAnalyzer {
}
}
- public void clear() {
- tsfileDevice2IsAligned.clear();
- currentBatchDevice2TimeseriesSchemas.clear();
+ public void close() {
+ schemaCache.close();
+ }
+ }
+
+ private static class LoadTsFileAnalyzeSchemaCache {
+
+ private final LoadTsFileAnalyzeSchemaMemoryBlock block;
+
+ private Map<String, Set<MeasurementSchema>>
currentBatchDevice2TimeSeriesSchemas;
+ private Map<String, Boolean> tsFileDevice2IsAligned;
+ private Set<PartialPath> alreadySetDatabases;
+
+ private long batchDevice2TimeSeriesSchemasMemoryUsageSizeInBytes = 0;
+ private long tsFileDevice2IsAlignedMemoryUsageSizeInBytes = 0;
+ private long alreadySetDatabasesMemoryUsageSizeInBytes = 0;
+
+ private int currentBatchTimeSeriesCount = 0;
+
+ public LoadTsFileAnalyzeSchemaCache() throws
LoadRuntimeOutOfMemoryException {
+ this.block =
+ LoadTsFileMemoryManager.getInstance()
+
.allocateAnalyzeSchemaMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES);
+ this.currentBatchDevice2TimeSeriesSchemas = new HashMap<>();
+ this.tsFileDevice2IsAligned = new HashMap<>();
+ this.alreadySetDatabases = new HashSet<>();
+ }
+
+ public Map<String, Set<MeasurementSchema>> getDevice2TimeSeries() {
+ return currentBatchDevice2TimeSeriesSchemas;
+ }
+
+ public boolean getDeviceIsAligned(String device) {
+ if (!tsFileDevice2IsAligned.containsKey(device)) {
+ LOGGER.warn(
+ "Device {} is not in the tsFileDevice2IsAligned cache {}.",
+ device,
+ tsFileDevice2IsAligned);
+ }
+ return tsFileDevice2IsAligned.get(device);
+ }
+
+ public Set<PartialPath> getAlreadySetDatabases() {
+ return alreadySetDatabases;
+ }
+
+ public void addTimeSeries(String device, MeasurementSchema
measurementSchema) {
+ long memoryUsageSizeInBytes = 0;
+ if (!currentBatchDevice2TimeSeriesSchemas.containsKey(device)) {
+ memoryUsageSizeInBytes += estimateStringSize(device);
+ }
+ if (currentBatchDevice2TimeSeriesSchemas
+ .computeIfAbsent(device, k -> new HashSet<>())
+ .add(measurementSchema)) {
+ memoryUsageSizeInBytes += measurementSchema.serializedSize();
+ currentBatchTimeSeriesCount++;
+ }
+
+ if (memoryUsageSizeInBytes > 0) {
+ batchDevice2TimeSeriesSchemasMemoryUsageSizeInBytes +=
memoryUsageSizeInBytes;
+ block.addMemoryUsage(memoryUsageSizeInBytes);
+ }
+ }
+
+ public void addIsAlignedCache(String device, boolean isAligned, boolean
addIfAbsent) {
+ long memoryUsageSizeInBytes = 0;
+ if (!tsFileDevice2IsAligned.containsKey(device)) {
+ memoryUsageSizeInBytes += estimateStringSize(device);
+ }
+ if (addIfAbsent
+ ? (tsFileDevice2IsAligned.putIfAbsent(device, isAligned) == null)
+ : (tsFileDevice2IsAligned.put(device, isAligned) == null)) {
+ memoryUsageSizeInBytes += Byte.BYTES;
+ }
+
+ if (memoryUsageSizeInBytes > 0) {
+ tsFileDevice2IsAlignedMemoryUsageSizeInBytes += memoryUsageSizeInBytes;
+ block.addMemoryUsage(memoryUsageSizeInBytes);
+ }
+ }
+
+ public void addAlreadySetDatabase(PartialPath database) {
+ long memoryUsageSizeInBytes = 0;
+ if (alreadySetDatabases.add(database)) {
+ memoryUsageSizeInBytes += PartialPath.estimateSize(database);
+ }
+
+ if (memoryUsageSizeInBytes > 0) {
+ alreadySetDatabasesMemoryUsageSizeInBytes += memoryUsageSizeInBytes;
+ block.addMemoryUsage(memoryUsageSizeInBytes);
+ }
+ }
+
+ public boolean shouldFlushTimeSeries() {
+ return !block.hasEnoughMemory()
+ || currentBatchTimeSeriesCount >= BATCH_FLUSH_TIME_SERIES_NUMBER;
+ }
+
+ public boolean shouldFlushAlignedCache() {
+ return tsFileDevice2IsAlignedMemoryUsageSizeInBytes
+ >= FLUSH_ALIGNED_CACHE_MEMORY_SIZE_IN_BYTES;
+ }
+
+ public void clearTimeSeries() {
+ currentBatchDevice2TimeSeriesSchemas.clear();
+
block.reduceMemoryUsage(batchDevice2TimeSeriesSchemasMemoryUsageSizeInBytes);
+ batchDevice2TimeSeriesSchemasMemoryUsageSizeInBytes = 0;
+ currentBatchTimeSeriesCount = 0;
+ }
+
+ public void clearAlignedCache() {
+ tsFileDevice2IsAligned.clear();
+ block.reduceMemoryUsage(tsFileDevice2IsAlignedMemoryUsageSizeInBytes);
+ tsFileDevice2IsAlignedMemoryUsageSizeInBytes = 0;
+ }
+
+ public void clearDeviceIsAlignedCacheIfNecessary() {
+ if (!shouldFlushAlignedCache()) {
+ return;
+ }
+
+ long releaseMemoryInBytes = 0;
+ final Set<String> timeSeriesCacheKeySet =
+ new HashSet<>(currentBatchDevice2TimeSeriesSchemas.keySet());
+ Iterator<Map.Entry<String, Boolean>> iterator =
tsFileDevice2IsAligned.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, Boolean> entry = iterator.next();
+ if (!timeSeriesCacheKeySet.contains(entry.getKey())) {
+ releaseMemoryInBytes += estimateStringSize(entry.getKey()) +
Byte.BYTES;
+ iterator.remove();
+ }
+ }
+ if (releaseMemoryInBytes > 0) {
+ tsFileDevice2IsAlignedMemoryUsageSizeInBytes -= releaseMemoryInBytes;
+ block.reduceMemoryUsage(releaseMemoryInBytes);
+ }
+ }
+
+ private void clearDatabasesCache() {
alreadySetDatabases.clear();
+ block.reduceMemoryUsage(alreadySetDatabasesMemoryUsageSizeInBytes);
+ alreadySetDatabasesMemoryUsageSizeInBytes = 0;
+ }
+
+ public void close() {
+ clearTimeSeries();
+ clearAlignedCache();
+ clearDatabasesCache();
+
+ block.close();
+
+ currentBatchDevice2TimeSeriesSchemas = null;
+ tsFileDevice2IsAligned = null;
+ alreadySetDatabases = null;
+ }
+
+ /**
+ * String basic total, 32B
+ *
+ * <ul>
+ * <li>Object header, 8B
+ * <li>char[] reference + header + length, 8 + 4 + 8= 20B
+ * <li>hash code, 4B
+ * </ul>
+ */
+ private static int estimateStringSize(String string) {
+ // each char takes 2B in Java
+ return string == null ? 0 : 32 + 2 * string.length();
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index d4d51195607..0b1fc6cecd7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -45,10 +45,11 @@ import java.util.List;
public class LocalExecutionPlanner {
private static final Logger LOGGER =
LoggerFactory.getLogger(LocalExecutionPlanner.class);
+ private static final long ALLOCATE_MEMORY_FOR_OPERATORS =
+
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators();
/** allocated memory for operator execution */
- private long freeMemoryForOperators =
-
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators();
+ private long freeMemoryForOperators = ALLOCATE_MEMORY_FOR_OPERATORS;
public long getFreeMemoryForOperators() {
return freeMemoryForOperators;
@@ -163,6 +164,34 @@ public class LocalExecutionPlanner {
return sourcePaths;
}
+ public synchronized boolean forceAllocateFreeMemoryForOperators(long
memoryInBytes) {
+ if (freeMemoryForOperators < memoryInBytes) {
+ return false;
+ } else {
+ freeMemoryForOperators -= memoryInBytes;
+ return true;
+ }
+ }
+
+ public synchronized long tryAllocateFreeMemoryForOperators(long
memoryInBytes) {
+ if (freeMemoryForOperators < memoryInBytes) {
+ long result = freeMemoryForOperators;
+ freeMemoryForOperators = 0;
+ return result;
+ } else {
+ freeMemoryForOperators -= memoryInBytes;
+ return memoryInBytes;
+ }
+ }
+
+ public synchronized void releaseToFreeMemoryForOperators(long memoryInBytes)
{
+ freeMemoryForOperators += memoryInBytes;
+ }
+
+ public long getAllocateMemoryForOperators() {
+ return ALLOCATE_MEMORY_FOR_OPERATORS;
+ }
+
private static class InstanceHolder {
private InstanceHolder() {}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 92a3ecd1047..51c4b44df38 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -46,6 +46,8 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInfo;
import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
import org.apache.iotdb.db.queryengine.execution.load.TsFileSplitter;
+import org.apache.iotdb.db.queryengine.load.LoadTsFileDataCacheMemoryBlock;
+import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
@@ -93,15 +95,17 @@ import java.util.stream.IntStream;
public class LoadTsFileScheduler implements IScheduler {
private static final Logger logger =
LoggerFactory.getLogger(LoadTsFileScheduler.class);
public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 900L; // 15min
- private static final long MAX_MEMORY_SIZE;
+ private static final long SINGLE_SCHEDULER_MAX_MEMORY_SIZE;
private static final int TRANSMIT_LIMIT;
static {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- MAX_MEMORY_SIZE =
+ SINGLE_SCHEDULER_MAX_MEMORY_SIZE =
Math.min(
config.getThriftMaxFrameSize() >> 2,
- (long) (config.getAllocateMemoryForStorageEngine() *
config.getLoadTsFileProportion()));
+ (long)
+ (config.getAllocateMemoryForStorageEngine()
+ * config.getLoadTsFileProportion())); // TODO: change it
to query engine
TRANSMIT_LIMIT =
CommonDescriptor.getInstance().getConfig().getTTimePartitionSlotTransmitLimit();
}
@@ -114,6 +118,7 @@ public class LoadTsFileScheduler implements IScheduler {
private final PlanFragmentId fragmentId;
private final Set<TRegionReplicaSet> allReplicaSets;
private final boolean isGeneratedByPipe;
+ private final LoadTsFileDataCacheMemoryBlock block;
public LoadTsFileScheduler(
DistributedQueryPlan distributedQueryPlan,
@@ -130,6 +135,7 @@ public class LoadTsFileScheduler implements IScheduler {
this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
this.allReplicaSets = new HashSet<>();
this.isGeneratedByPipe = isGeneratedByPipe;
+ this.block =
LoadTsFileMemoryManager.getInstance().allocateDataCacheMemoryBlock();
for (FragmentInstance fragmentInstance :
distributedQueryPlan.getInstances()) {
tsFileNodeList.add((LoadSingleTsFileNode)
fragmentInstance.getFragment().getPlanNodeTree());
@@ -200,10 +206,11 @@ public class LoadTsFileScheduler implements IScheduler {
if (isLoadSuccess) {
stateMachine.transitionToFinished();
}
+ LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock();
}
private boolean firstPhase(LoadSingleTsFileNode node) {
- final TsFileDataManager tsFileDataManager = new TsFileDataManager(this,
node);
+ final TsFileDataManager tsFileDataManager = new TsFileDataManager(this,
node, block);
try {
new TsFileSplitter(
node.getTsFileResource().getTsFile(),
tsFileDataManager::addOrSendTsFileData)
@@ -407,13 +414,18 @@ public class LoadTsFileScheduler implements IScheduler {
private long dataSize;
private final Map<TRegionReplicaSet, LoadTsFilePieceNode> replicaSet2Piece;
private final List<ChunkData> nonDirectionalChunkData;
+ private final LoadTsFileDataCacheMemoryBlock block;
- public TsFileDataManager(LoadTsFileScheduler scheduler,
LoadSingleTsFileNode singleTsFileNode) {
+ public TsFileDataManager(
+ LoadTsFileScheduler scheduler,
+ LoadSingleTsFileNode singleTsFileNode,
+ LoadTsFileDataCacheMemoryBlock block) {
this.scheduler = scheduler;
this.singleTsFileNode = singleTsFileNode;
this.dataSize = 0;
this.replicaSet2Piece = new HashMap<>();
this.nonDirectionalChunkData = new ArrayList<>();
+ this.block = block;
}
private boolean addOrSendTsFileData(TsFileData tsFileData) {
@@ -422,11 +434,16 @@ public class LoadTsFileScheduler implements IScheduler {
: addOrSendChunkData((ChunkData) tsFileData);
}
+ private boolean isMemoryEnough() {
+ return dataSize <= SINGLE_SCHEDULER_MAX_MEMORY_SIZE &&
block.hasEnoughMemory();
+ }
+
private boolean addOrSendChunkData(ChunkData chunkData) {
nonDirectionalChunkData.add(chunkData);
dataSize += chunkData.getDataSize();
+ block.addMemoryUsage(chunkData.getDataSize());
- if (dataSize > MAX_MEMORY_SIZE) {
+ if (!isMemoryEnough()) {
routeChunkData();
// start to dispatch from the biggest TsFilePieceNode
@@ -446,6 +463,7 @@ public class LoadTsFileScheduler implements IScheduler {
}
dataSize -= pieceNode.getDataSize();
+ block.reduceMemoryUsage(pieceNode.getDataSize());
replicaSet2Piece.put(
sortedReplicaSet,
new LoadTsFilePieceNode(
@@ -453,7 +471,7 @@ public class LoadTsFileScheduler implements IScheduler {
singleTsFileNode
.getTsFileResource()
.getTsFile())); // can not just remove, because of
deletion
- if (dataSize <= MAX_MEMORY_SIZE) {
+ if (isMemoryEnough()) {
break;
}
}
@@ -492,6 +510,7 @@ public class LoadTsFileScheduler implements IScheduler {
for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry :
replicaSet2Piece.entrySet()) {
dataSize += deletionData.getDataSize();
+ block.addMemoryUsage(deletionData.getDataSize());
entry.getValue().addTsFileData(deletionData);
}
return true;
@@ -501,6 +520,7 @@ public class LoadTsFileScheduler implements IScheduler {
routeChunkData();
for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry :
replicaSet2Piece.entrySet()) {
+ block.reduceMemoryUsage(entry.getValue().getDataSize());
if (!scheduler.dispatchOnePieceNode(entry.getValue(), entry.getKey()))
{
logger.warn(
"Dispatch piece node {} of TsFile {} error.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
index 6e6e2afa2e0..ef06d98cebb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.pipe.metric.PipeDataNodeMetrics;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
import org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet;
+import org.apache.iotdb.db.queryengine.metric.LoadTsFileMemMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
@@ -85,6 +86,9 @@ public class DataNodeMetricsHelper {
// bind pipe related metrics
MetricService.getInstance().addMetricSet(PipeDataNodeMetrics.getInstance());
+
+ // bind load tsfile memory related metrics
+
MetricService.getInstance().addMetricSet(LoadTsFileMemMetricSet.getInstance());
}
private static void initSystemMetrics() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java
index 5d0c53aa611..67c8969dd84 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java
@@ -118,6 +118,10 @@ public class Deletion extends Modification implements
Cloneable {
new PartialPath(ReadWriteIOUtils.readString(stream)), 0, startTime,
endTime);
}
+ public long getSerializedSize() {
+ return Long.BYTES * 2 + Integer.BYTES + (long) getPathString().length() *
Character.BYTES;
+ }
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
index 6491f4c4eaa..f4f8ab9f153 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
@@ -66,6 +66,7 @@ public class ModificationFile implements AutoCloseable {
private static final long COMPACT_THRESHOLD = 1024 * 1024;
private boolean hasCompacted = false;
+
/**
* Construct a ModificationFile using a file as its storage.
*
@@ -103,6 +104,24 @@ public class ModificationFile implements AutoCloseable {
}
}
+ /**
+ * Write a modification in this file. The modification will first be written
to the persistent
+ * store then the memory cache. Notice that this method does not synchronize
to physical disk
+ * after
+ *
+ * @param mod the modification to be written.
+ * @throws IOException if IOException is thrown when writing the
modification to the store.
+ */
+ public void writeWithoutSync(Modification mod) throws IOException {
+ synchronized (this) {
+ if (needVerify && new File(filePath).exists()) {
+ writer.mayTruncateLastLine();
+ needVerify = false;
+ }
+ writer.writeWithOutSync(mod);
+ }
+ }
+
@GuardedBy("TsFileResource-WriteLock")
public void truncate(long size) {
writer.truncate(size);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
index 75c32adf5e8..7f666eab511 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
@@ -163,12 +163,17 @@ public class LocalTextModificationAccessor
@Override
public void write(Modification mod) throws IOException {
+ writeWithOutSync(mod);
+ force();
+ }
+
+ @Override
+ public void writeWithOutSync(Modification mod) throws IOException {
if (fos == null) {
fos = new FileOutputStream(filePath, true);
}
fos.write(encodeModification(mod).getBytes());
fos.write(System.lineSeparator().getBytes());
- force();
}
@TestOnly
@@ -210,7 +215,7 @@ public class LocalTextModificationAccessor
public void mayTruncateLastLine() {
try (RandomAccessFile file = new RandomAccessFile(filePath, "r")) {
long filePointer = file.length() - 1;
- if (filePointer == 0) {
+ if (filePointer <= 0) {
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
index 314405c633c..b4f538ac62d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
@@ -37,6 +37,15 @@ public interface ModificationWriter {
*/
void write(Modification mod) throws IOException;
+ /**
+ * Write a new modification to the persistent medium. Notice that after
calling write(), a
+ * fileWriter is opened. Notice that this method does not synchronize to
physical disk after
+ * writing.
+ *
+ * @param mod the modification to be written.
+ */
+ void writeWithOutSync(Modification mod) throws IOException;
+
void truncate(long size);
void mayTruncateLastLine();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index d2b0eb89e1c..61aff528e38 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -147,7 +147,8 @@ public enum Metric {
PIPE_EVENT_COMMIT_QUEUE_SIZE("pipe_event_commit_queue_size"),
PIPE_PROCEDURE("pipe_procedure"),
PIPE_TASK_STATUS("pipe_task_status"),
- ;
+ // load related
+ LOAD_MEM("load_mem");
final String value;
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIterator.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIterator.java
index 9f077c6806e..dacdbc54fc5 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIterator.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIterator.java
@@ -39,17 +39,22 @@ import java.util.NoSuchElementException;
public class TsFileSequenceReaderTimeseriesMetadataIterator
implements Iterator<Map<String, List<TimeseriesMetadata>>> {
- private static final int MAX_TIMESERIES_METADATA_COUNT = 2000;
+ private static final int DEFAULT_TIMESERIES_BATCH_READ_NUMBER = 4000;
private final TsFileSequenceReader reader;
private final boolean needChunkMetadata;
+ private final int timeseriesBatchReadNumber;
private ByteBuffer currentBuffer = null;
+ private long currentEndOffset = Long.MIN_VALUE;
private final Deque<MetadataIndexEntryInfo> metadataIndexEntryStack = new
ArrayDeque<>();
private String currentDeviceId;
private int currentTimeseriesMetadataCount = 0;
public TsFileSequenceReaderTimeseriesMetadataIterator(
- TsFileSequenceReader reader, boolean needChunkMetadata) throws
IOException {
+ TsFileSequenceReader reader, boolean needChunkMetadata, int
timeseriesBatchReadNumber)
+ throws IOException {
this.reader = reader;
+ this.needChunkMetadata = needChunkMetadata;
+ this.timeseriesBatchReadNumber = timeseriesBatchReadNumber;
if (this.reader.tsFileMetaData == null) {
this.reader.readFileMetadata();
@@ -58,7 +63,6 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator
final MetadataIndexNode metadataIndexNode =
reader.tsFileMetaData.getMetadataIndex();
long curEntryEndOffset = metadataIndexNode.getEndOffset();
List<MetadataIndexEntry> metadataIndexEntryList =
metadataIndexNode.getChildren();
- this.needChunkMetadata = needChunkMetadata;
for (int i = metadataIndexEntryList.size() - 1; i >= 0; i--) {
metadataIndexEntryStack.push(
@@ -68,6 +72,11 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator
}
}
+ public TsFileSequenceReaderTimeseriesMetadataIterator(
+ TsFileSequenceReader reader, boolean needChunkMetadata) throws
IOException {
+ this(reader, needChunkMetadata, DEFAULT_TIMESERIES_BATCH_READ_NUMBER);
+ }
+
@Override
public boolean hasNext() {
return !metadataIndexEntryStack.isEmpty()
@@ -82,7 +91,7 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator
final Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new
HashMap<>();
- while (currentTimeseriesMetadataCount < MAX_TIMESERIES_METADATA_COUNT) {
+ while (currentTimeseriesMetadataCount < timeseriesBatchReadNumber) {
// 1. Check Buffer
// currentTimeseriesMetadataCount has reached the limit in the previous
// loop and maybe there is still some data that remains in the buffer.
@@ -90,9 +99,22 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator
timeseriesMetadataMap
.computeIfAbsent(currentDeviceId, k -> new ArrayList<>())
.addAll(deserializeTimeseriesMetadata());
+ } else if (currentEndOffset > Long.MIN_VALUE) {
+ try {
+ timeseriesMetadataMap
+ .computeIfAbsent(currentDeviceId, k -> new ArrayList<>())
+
.addAll(deserializeTimeseriesMetadataUsingTsFileInput(currentEndOffset));
+ } catch (IOException e) {
+ throw new TsFileSequenceReaderTimeseriesMetadataIteratorException(
+ String.format(
+ "TsFileSequenceReaderTimeseriesMetadataIterator:
deserializeTimeseriesMetadataUsingTsFileInput failed, "
+ + "currentEndOffset: %d, "
+ + e.getMessage(),
+ currentEndOffset));
+ }
}
- if (currentTimeseriesMetadataCount >= MAX_TIMESERIES_METADATA_COUNT
+ if (currentTimeseriesMetadataCount >= timeseriesBatchReadNumber
|| metadataIndexEntryStack.isEmpty()) {
break;
}
@@ -113,7 +135,7 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator
}
// 3. Reset currentTimeseriesMetadataCount
- if (currentTimeseriesMetadataCount >= MAX_TIMESERIES_METADATA_COUNT) {
+ if (currentTimeseriesMetadataCount >= timeseriesBatchReadNumber) {
currentTimeseriesMetadataCount = 0;
}
@@ -157,6 +179,7 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator
.computeIfAbsent(currentDeviceId, k -> new ArrayList<>())
.addAll(deserializeTimeseriesMetadata());
} else {
+ currentEndOffset = endOffset;
reader.position(metadataIndexEntry.getOffset());
timeseriesMetadataMap
.computeIfAbsent(currentDeviceId, k -> new ArrayList<>())
@@ -167,7 +190,7 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator
private List<TimeseriesMetadata> deserializeTimeseriesMetadata() {
final List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
while (currentBuffer.hasRemaining()
- && currentTimeseriesMetadataCount < MAX_TIMESERIES_METADATA_COUNT) {
+ && currentTimeseriesMetadataCount < timeseriesBatchReadNumber) {
timeseriesMetadataList.add(
TimeseriesMetadata.deserializeFrom(currentBuffer,
needChunkMetadata));
currentTimeseriesMetadataCount++;
@@ -179,11 +202,14 @@ public class
TsFileSequenceReaderTimeseriesMetadataIterator
throws IOException {
final List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
while (reader.position() < endOffset
- && currentTimeseriesMetadataCount < MAX_TIMESERIES_METADATA_COUNT) {
+ && currentTimeseriesMetadataCount <
DEFAULT_TIMESERIES_BATCH_READ_NUMBER) {
timeseriesMetadataList.add(
TimeseriesMetadata.deserializeFrom(reader.tsFileInput,
needChunkMetadata));
currentTimeseriesMetadataCount++;
}
+ if (reader.position() >= endOffset) {
+ currentEndOffset = Long.MIN_VALUE;
+ }
return timeseriesMetadataList;
}
diff --git
a/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIteratorTest.java
b/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIteratorTest.java
index 54ddaabf03c..5648473662b 100644
---
a/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIteratorTest.java
+++
b/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIteratorTest.java
@@ -36,8 +36,8 @@ public class
TsFileSequenceReaderTimeseriesMetadataIteratorTest {
@Before
public void before() throws IOException {
- // create 2020 timeseries, 101 measurements per device.
- FileGenerator.generateFile(100, 20, 101);
+ // create 4040 timeseries, 101 measurements per device.
+ FileGenerator.generateFile(100, 40, 101);
TsFileSequenceReader fileReader = new TsFileSequenceReader(FILE_PATH);
tsFile = new TsFileReader(fileReader);
}
@@ -55,8 +55,8 @@ public class
TsFileSequenceReaderTimeseriesMetadataIteratorTest {
new TsFileSequenceReaderTimeseriesMetadataIterator(fileReader, false);
Assert.assertTrue(iterator.hasNext());
- Assert.assertEquals(2000,
iterator.next().values().stream().mapToLong(List::size).sum());
+ Assert.assertEquals(4000,
iterator.next().values().stream().mapToLong(List::size).sum());
Assert.assertTrue(iterator.hasNext());
- Assert.assertEquals(20,
iterator.next().values().stream().mapToLong(List::size).sum());
+ Assert.assertEquals(40,
iterator.next().values().stream().mapToLong(List::size).sum());
}
}