This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9f699e15762 Implement DataNode cache TableSchema function to prevent
OOM (#16412)
9f699e15762 is described below
commit 9f699e15762ed9ffe6b8986560024fa3178c1d7f
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Sep 16 14:20:39 2025 +0800
Implement DataNode cache TableSchema function to prevent OOM (#16412)
* Implement DataNode cache TableSchema function to prevent OOM
* add config
* update
* update
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 ++++++++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 ++++++
.../db/storageengine/dataregion/DataRegion.java | 23 +++++++++++++++++++++-
.../conf/iotdb-system.properties.template | 5 +++++
4 files changed, 46 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f55414712a9..58958ad4415 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -193,6 +193,8 @@ public class IoTDBConfig {
/** Minimum ratio of effective information in wal files */
private volatile double walMinEffectiveInfoRatio = 0.1;
+ private volatile long dataNodeTableSchemaCacheSize = 1 << 20;
+
/**
* MemTable size threshold for triggering MemTable snapshot in wal. When a
memTable's size exceeds
* this, wal can flush this memtable to disk, otherwise wal will snapshot
this memtable in wal.
@@ -1924,6 +1926,17 @@ public class IoTDBConfig {
this.walMinEffectiveInfoRatio = walMinEffectiveInfoRatio;
}
+ public long getDataNodeTableSchemaCacheSize() {
+ return dataNodeTableSchemaCacheSize;
+ }
+
+ public void setDataNodeTableSchemaCacheSize(long
dataNodeTableSchemaCacheSize) {
+ if (dataNodeTableSchemaCacheSize < 0) {
+ return;
+ }
+ this.dataNodeTableSchemaCacheSize = dataNodeTableSchemaCacheSize;
+ }
+
public long getWalMemTableSnapshotThreshold() {
return walMemTableSnapshotThreshold;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 2aa679e6db8..1f793f70aa2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -982,6 +982,12 @@ public class IoTDBDescriptor {
"coordinator_write_executor_size",
Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
+ conf.setDataNodeTableSchemaCacheSize(
+ Long.parseLong(
+ properties.getProperty(
+ "data_node_table_schema_cache_max_size_in_bytes",
+ String.valueOf(conf.getDataNodeTableSchemaCacheSize()))));
+
// Commons
commonDescriptor.loadCommonProps(properties);
commonDescriptor.initCommonConfigDir(conf.getSystemDir());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index d88085974af..285f3afd3f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -63,6 +63,7 @@ import
org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource.Status;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
import
org.apache.iotdb.db.pipe.consensus.deletion.persist.PageCacheDeletionBuffer;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
@@ -157,6 +158,8 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.commons.io.FileUtils;
import org.apache.thrift.TException;
import org.apache.tsfile.file.metadata.ChunkMetadata;
@@ -249,6 +252,15 @@ public class DataRegion implements IDataRegionForQuery {
private static final Logger logger =
LoggerFactory.getLogger(DataRegion.class);
+ // Cache TableSchema to prevent OOM
+ private static final Cache<String,
org.apache.tsfile.file.metadata.TableSchema> SCHEMA_CACHE =
+ Caffeine.newBuilder()
+ .maximumWeight(config.getDataNodeTableSchemaCacheSize())
+ .weigher(
+ (String k, org.apache.tsfile.file.metadata.TableSchema v) ->
+ (int) PipeMemoryWeightUtil.calculateTableSchemaBytesUsed(v))
+ .build();
+
/**
* A read write lock for guaranteeing concurrent safety when accessing all
fields in this class
* (i.e., schema, (un)sequenceFileList, work(un)SequenceTsFileProcessor,
@@ -1447,7 +1459,16 @@ public class DataRegion implements IDataRegionForQuery {
throw new TableLostRuntimeException(getDatabaseName(),
tableName);
}
}
- return TableSchema.of(tsTable).toTsFileTableSchemaNoAttribute();
+
+ org.apache.tsfile.file.metadata.TableSchema tableSchema =
+ TableSchema.of(tsTable).toTsFileTableSchemaNoAttribute();
+ org.apache.tsfile.file.metadata.TableSchema cachedSchema =
+ SCHEMA_CACHE.getIfPresent(tableName);
+ if (Objects.equals(cachedSchema, tableSchema)) {
+ return cachedSchema;
+ }
+ SCHEMA_CACHE.put(tableName, tableSchema);
+ return tableSchema;
});
}
}
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index c03f49a3ac3..d35a191b97d 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -807,6 +807,11 @@ device_path_cache_proportion=0.05
# Datatype: double
write_memory_variation_report_proportion=0.001
+# The maximum memory size in bytes for DataNode to table schema cache
+# effectiveMode: restart
+# Datatype: long
+data_node_table_schema_cache_max_size_in_bytes=1048576
+
# When an inserting is rejected, waiting period (in ms) to check system again,
50 by default.
# If the insertion has been rejected and the read load is low, it can be set
larger.
# effectiveMode: restart