This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f699e15762 Implement DataNode cache TableSchema function to prevent 
OOM (#16412)
9f699e15762 is described below

commit 9f699e15762ed9ffe6b8986560024fa3178c1d7f
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Sep 16 14:20:39 2025 +0800

    Implement DataNode cache TableSchema function to prevent OOM (#16412)
    
    * Implement DataNode cache TableSchema function to prevent OOM
    
    * add config
    
    * update
    
    * update
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 ++++++++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  6 ++++++
 .../db/storageengine/dataregion/DataRegion.java    | 23 +++++++++++++++++++++-
 .../conf/iotdb-system.properties.template          |  5 +++++
 4 files changed, 46 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f55414712a9..58958ad4415 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -193,6 +193,8 @@ public class IoTDBConfig {
   /** Minimum ratio of effective information in wal files */
   private volatile double walMinEffectiveInfoRatio = 0.1;
 
+  private volatile long dataNodeTableSchemaCacheSize = 1 << 20;
+
   /**
    * MemTable size threshold for triggering MemTable snapshot in wal. When a 
memTable's size exceeds
    * this, wal can flush this memtable to disk, otherwise wal will snapshot 
this memtable in wal.
@@ -1924,6 +1926,17 @@ public class IoTDBConfig {
     this.walMinEffectiveInfoRatio = walMinEffectiveInfoRatio;
   }
 
+  public long getDataNodeTableSchemaCacheSize() {
+    return dataNodeTableSchemaCacheSize;
+  }
+
+  public void setDataNodeTableSchemaCacheSize(long 
dataNodeTableSchemaCacheSize) {
+    if (dataNodeTableSchemaCacheSize < 0) {
+      return;
+    }
+    this.dataNodeTableSchemaCacheSize = dataNodeTableSchemaCacheSize;
+  }
+
   public long getWalMemTableSnapshotThreshold() {
     return walMemTableSnapshotThreshold;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 2aa679e6db8..1f793f70aa2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -982,6 +982,12 @@ public class IoTDBDescriptor {
                 "coordinator_write_executor_size",
                 Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
 
+    conf.setDataNodeTableSchemaCacheSize(
+        Long.parseLong(
+            properties.getProperty(
+                "data_node_table_schema_cache_max_size_in_bytes",
+                String.valueOf(conf.getDataNodeTableSchemaCacheSize()))));
+
     // Commons
     commonDescriptor.loadCommonProps(properties);
     commonDescriptor.initCommonConfigDir(conf.getSystemDir());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index d88085974af..285f3afd3f2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -63,6 +63,7 @@ import 
org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource.Status;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
 import 
org.apache.iotdb.db.pipe.consensus.deletion.persist.PageCacheDeletionBuffer;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
@@ -157,6 +158,8 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import org.apache.commons.io.FileUtils;
 import org.apache.thrift.TException;
 import org.apache.tsfile.file.metadata.ChunkMetadata;
@@ -249,6 +252,15 @@ public class DataRegion implements IDataRegionForQuery {
 
   private static final Logger logger = 
LoggerFactory.getLogger(DataRegion.class);
 
+  // Cache TableSchema to prevent OOM
+  private static final Cache<String, 
org.apache.tsfile.file.metadata.TableSchema> SCHEMA_CACHE =
+      Caffeine.newBuilder()
+          .maximumWeight(config.getDataNodeTableSchemaCacheSize())
+          .weigher(
+              (String k, org.apache.tsfile.file.metadata.TableSchema v) ->
+                  (int) PipeMemoryWeightUtil.calculateTableSchemaBytesUsed(v))
+          .build();
+
   /**
    * A read write lock for guaranteeing concurrent safety when accessing all 
fields in this class
    * (i.e., schema, (un)sequenceFileList, work(un)SequenceTsFileProcessor,
@@ -1447,7 +1459,16 @@ public class DataRegion implements IDataRegionForQuery {
                 throw new TableLostRuntimeException(getDatabaseName(), 
tableName);
               }
             }
-            return TableSchema.of(tsTable).toTsFileTableSchemaNoAttribute();
+
+            org.apache.tsfile.file.metadata.TableSchema tableSchema =
+                TableSchema.of(tsTable).toTsFileTableSchemaNoAttribute();
+            org.apache.tsfile.file.metadata.TableSchema cachedSchema =
+                SCHEMA_CACHE.getIfPresent(tableName);
+            if (Objects.equals(cachedSchema, tableSchema)) {
+              return cachedSchema;
+            }
+            SCHEMA_CACHE.put(tableName, tableSchema);
+            return tableSchema;
           });
     }
   }
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index c03f49a3ac3..d35a191b97d 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -807,6 +807,11 @@ device_path_cache_proportion=0.05
 # Datatype: double
 write_memory_variation_report_proportion=0.001
 
+# The maximum memory size in bytes for DataNode to table schema cache
+# effectiveMode: restart
+# Datatype: long
+data_node_table_schema_cache_max_size_in_bytes=1048576
+
 # When an inserting is rejected, waiting period (in ms) to check system again, 
50 by default.
 # If the insertion has been rejected and the read load is low, it can be set 
larger.
 # effectiveMode: restart

Reply via email to