This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8dc398a011 [IOTDB-3539] Adjust the capacity of SchemaCache according
to schema memory (#6403)
8dc398a011 is described below
commit 8dc398a011409e538b6fa22d207e22532e05da2a
Author: Marcos_Zyk <[email protected]>
AuthorDate: Fri Jun 24 21:33:44 2022 +0800
[IOTDB-3539] Adjust the capacity of SchemaCache according to schema memory
(#6403)
---
.../org/apache/iotdb/commons/path/PartialPath.java | 32 ++++++++++++
.../schemaregion/rocksdb/RSchemaConfLoader.java | 4 +-
.../resources/conf/iotdb-engine.properties | 10 ++--
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 58 ++++++++++++++++------
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 56 ++++++++++++++++++---
.../db/metadata/cache/DataNodeSchemaCache.java | 8 ++-
.../iotdb/db/metadata/cache/SchemaCacheEntry.java | 23 +++++++++
.../mtree/snapshot/MemMTreeSnapshotUtil.java | 12 +++++
.../iotdb/db/metadata/rescon/MemoryStatistics.java | 2 +-
.../schemaregion/SchemaRegionMemoryImpl.java | 5 +-
.../db/metadata/mtree/disk/MemManagerTest.java | 6 +--
.../db/metadata/schemaRegion/SchemaRegionTest.java | 6 +--
12 files changed, 186 insertions(+), 36 deletions(-)
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index e2d51f429f..9f5d5d2a11 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -568,4 +568,36 @@ public class PartialPath extends Path implements
Comparable<Path>, Cloneable {
public PartialPath transformToPartialPath() {
return this;
}
+
+ /**
+ * PartialPath basic total, 52B
+ *
+ * <ul>
+ * <li>Object header, 8B
+ * <li>String[] reference + header + length, 8 + 4 + 8= 20B
+ * <li>Path attributes' references, 8 * 3 = 24B
+ * </ul>
+ */
+ public static int estimateSize(PartialPath partialPath) {
+ int size = 52;
+ for (String node : partialPath.getNodes()) {
+ size += estimateStringSize(node);
+ }
+ size += estimateStringSize(partialPath.getFullPath());
+ return size;
+ }
+
+ /**
+ * String basic total, 32B
+ *
+ * <ul>
+ * <li>Object header, 8B
+ * <li>char[] reference + header + length, 8 + 4 + 8= 20B
+ * <li>hash code, 4B
+ * </ul>
+ */
+ private static int estimateStringSize(String string) {
+ // each char takes 2B in Java
+ return string == null ? 0 : 32 + 2 * string.length();
+ }
}
diff --git
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaConfLoader.java
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaConfLoader.java
index adb14caffa..6fede5e624 100644
---
a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaConfLoader.java
+++
b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaConfLoader.java
@@ -46,9 +46,9 @@ public class RSchemaConfLoader {
private long writeBufferSize = 64 * SizeUnit.KB;
private long maxTotalWalSize = 64 * SizeUnit.KB;
private long blockCache =
- IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForSchema() *
2 / 3;
+
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForSchemaRegion() *
2 / 3;
private long blockCacheCompressed =
- IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForSchema() /
3;
+
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForSchemaRegion() /
3;
private static final String ROCKSDB_CONFIG_FILE_NAME =
"schema-rocksdb.properties";
private static final Logger logger =
LoggerFactory.getLogger(RSchemaConfLoader.class);
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 9fa24c0fa6..684bc6f045 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -410,6 +410,11 @@ timestamp_precision=ms
# If you have high level of writing pressure and low level of reading
pressure, please adjust it to for example 6:1:1:2
# write_read_schema_free_memory_proportion=4:3:1:2
+# Schema Memory Allocation Ratio: SchemaRegion, SchemaCache, PartitionCache
and LastCache.
+# The parameter form is a:b:c:d, where a, b, c and d are integers. for
example: 1:1:1:1 , 6:2:1:1
+# In cluster mode, we recommend 5:3:1:1. In standalone mode, we recommend
8:1:0:1
+# schema_memory_allocate_proportion=5:3:1:1
+
# Max number of concurrent writing time partitions in one storage group
# This parameter is used to control total memTable number when memory control
is disabled
# The max number of memTable is 4 * concurrent_writing_time_partition *
storage group number
@@ -1015,11 +1020,6 @@ timestamp_precision=ms
# Datatype: int
# schema_region_device_node_cache_size=10000
-# cache size for DataNode.
-# This cache is used to improve insert speed where each datanode has its
metadata cache and can do path consistency check locally.
-# Datatype: int
-# datanode_schema_cache_size=1000000
-
# thread pool size for read operation in DataNode's coordinator.
# Datatype: int
# coordinator_read_executor_size=50
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4d9c447db8..258bfecee3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -892,12 +892,6 @@ public class IoTDBConfig {
? Runtime.getRuntime().availableProcessors() / 4
: 1;
- /**
- * Cache size of dataNodeSchemaCache in{@link
- * org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache}.
- */
- private int dataNodeSchemaCacheSize = 1000000;
-
/**
* Cache size of partition cache in {@link
* org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher}
@@ -933,6 +927,18 @@ public class IoTDBConfig {
/** ThreadPool size for write operation in coordinator */
private int coordinatorWriteExecutorSize = 50;
+ /** Memory allocated for schemaRegion */
+ private long allocateMemoryForSchemaRegion = allocateMemoryForSchema * 8 /
10;
+
+ /** Memory allocated for SchemaCache */
+ private long allocateMemoryForSchemaCache = allocateMemoryForSchema / 10;
+
+ /** Memory allocated for PartitionCache */
+ private long allocateMemoryForPartitionCache = 0;
+
+ /** Memory allocated for LastCache */
+ private long allocateMemoryForLastCache = allocateMemoryForSchema / 10;
+
IoTDBConfig() {}
public float getUdfMemoryBudgetInMB() {
@@ -2839,14 +2845,6 @@ public class IoTDBConfig {
this.dataNodeId = dataNodeId;
}
- public int getDataNodeSchemaCacheSize() {
- return dataNodeSchemaCacheSize;
- }
-
- public void setDataNodeSchemaCacheSize(int dataNodeSchemaCacheSize) {
- this.dataNodeSchemaCacheSize = dataNodeSchemaCacheSize;
- }
-
public int getPartitionCacheSize() {
return partitionCacheSize;
}
@@ -2946,4 +2944,36 @@ public class IoTDBConfig {
public TEndPoint getAddressAndPort() {
return new TEndPoint(rpcAddress, rpcPort);
}
+
+ public long getAllocateMemoryForSchemaRegion() {
+ return allocateMemoryForSchemaRegion;
+ }
+
+ public void setAllocateMemoryForSchemaRegion(long
allocateMemoryForSchemaRegion) {
+ this.allocateMemoryForSchemaRegion = allocateMemoryForSchemaRegion;
+ }
+
+ public long getAllocateMemoryForSchemaCache() {
+ return allocateMemoryForSchemaCache;
+ }
+
+ public void setAllocateMemoryForSchemaCache(long
allocateMemoryForSchemaCache) {
+ this.allocateMemoryForSchemaCache = allocateMemoryForSchemaCache;
+ }
+
+ public long getAllocateMemoryForPartitionCache() {
+ return allocateMemoryForPartitionCache;
+ }
+
+ public void setAllocateMemoryForPartitionCache(long
allocateMemoryForPartitionCache) {
+ this.allocateMemoryForPartitionCache = allocateMemoryForPartitionCache;
+ }
+
+ public long getAllocateMemoryForLastCache() {
+ return allocateMemoryForLastCache;
+ }
+
+ public void setAllocateMemoryForLastCache(long allocateMemoryForLastCache) {
+ this.allocateMemoryForLastCache = allocateMemoryForLastCache;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 5f2987d461..72b804190f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -801,12 +801,6 @@ public class IoTDBDescriptor {
"insert_multi_tablet_enable_multithreading_column_threshold",
String.valueOf(conf.getInsertMultiTabletEnableMultithreadingColumnThreshold()))));
- conf.setDataNodeSchemaCacheSize(
- Integer.parseInt(
- properties.getProperty(
- "datanode_schema_cache_size",
- String.valueOf(conf.getDataNodeSchemaCacheSize()))));
-
// At the same time, set TSFileConfig
TSFileDescriptor.getInstance()
.getConfig()
@@ -1421,6 +1415,8 @@ public class IoTDBDescriptor {
logger.info("allocateMemoryForWrite = {}",
conf.getAllocateMemoryForWrite());
logger.info("allocateMemoryForSchema = {}",
conf.getAllocateMemoryForSchema());
+ initSchemaMemoryAllocate(properties);
+
conf.setMaxQueryDeduplicatedPathNum(
Integer.parseInt(
properties.getProperty(
@@ -1460,6 +1456,54 @@ public class IoTDBDescriptor {
}
}
+ private void initSchemaMemoryAllocate(Properties properties) {
+ long schemaMemoryTotal = conf.getAllocateMemoryForSchema();
+
+ int proportionSum = 10;
+ int schemaRegionProportion = 8;
+ int schemaCacheProportion = 1;
+ int partitionCacheProportion = 0;
+ int lastCacheProportion = 1;
+
+ if (conf.isClusterMode()) {
+ schemaRegionProportion = 5;
+ schemaCacheProportion = 3;
+ partitionCacheProportion = 1;
+ }
+
+ String schemaMemoryAllocatePortion =
+ properties.getProperty("schema_memory_allocate_proportion");
+ if (schemaMemoryAllocatePortion != null) {
+ String[] proportions = schemaMemoryAllocatePortion.split(":");
+ int loadedProportionSum = 0;
+ for (String proportion : proportions) {
+ loadedProportionSum += Integer.parseInt(proportion.trim());
+ }
+
+ if (loadedProportionSum != 0) {
+ proportionSum = loadedProportionSum;
+ schemaRegionProportion = Integer.parseInt(proportions[0].trim());
+ schemaCacheProportion = Integer.parseInt(proportions[1].trim());
+ partitionCacheProportion = Integer.parseInt(proportions[2].trim());
+ lastCacheProportion = Integer.parseInt(proportions[3].trim());
+ }
+ }
+
+ conf.setAllocateMemoryForSchemaRegion(
+ schemaMemoryTotal * schemaRegionProportion / proportionSum);
+ logger.info("allocateMemoryForSchemaRegion = {}",
conf.getAllocateMemoryForSchemaRegion());
+
+ conf.setAllocateMemoryForSchemaCache(schemaMemoryTotal *
schemaCacheProportion / proportionSum);
+ logger.info("allocateMemoryForSchemaCache = {}",
conf.getAllocateMemoryForSchemaCache());
+
+ conf.setAllocateMemoryForPartitionCache(
+ schemaMemoryTotal * partitionCacheProportion / proportionSum);
+ logger.info("allocateMemoryForPartitionCache = {}",
conf.getAllocateMemoryForPartitionCache());
+
+ conf.setAllocateMemoryForLastCache(schemaMemoryTotal * lastCacheProportion
/ proportionSum);
+ logger.info("allocateMemoryForLastCache = {}",
conf.getAllocateMemoryForLastCache());
+ }
+
@SuppressWarnings("squid:S3518") // "proportionSum" can't be zero
private void loadUDFProps(Properties properties) {
String initialByteArrayLengthForMemoryControl =
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index 23fa436c8f..f7a06b900d 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -46,7 +46,13 @@ public class DataNodeSchemaCache {
private final Cache<PartialPath, SchemaCacheEntry> cache;
private DataNodeSchemaCache() {
- cache =
Caffeine.newBuilder().maximumSize(config.getDataNodeSchemaCacheSize()).build();
+ cache =
+ Caffeine.newBuilder()
+ .maximumWeight(config.getAllocateMemoryForSchemaCache())
+ .weigher(
+ (PartialPath key, SchemaCacheEntry value) ->
+ PartialPath.estimateSize(key) +
SchemaCacheEntry.estimateSize(value))
+ .build();
if
(MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
// add metrics
MetricsService.getInstance()
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
index a9b8c97616..03ddf17360 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/SchemaCacheEntry.java
@@ -67,4 +67,27 @@ public class SchemaCacheEntry {
public void setLastCacheContainer(ILastCacheContainer lastCacheContainer) {
this.lastCacheContainer = lastCacheContainer;
}
+
+ /**
+ * Total basic 100B
+ *
+ * <ul>
+ * <li>SchemaCacheEntry Object header, 8B
+ * <li>isAligned, 1B
+ * <li>LastCacheContainer reference, 8B
+ * <li>MeasurementSchema
+ * <ul>
+ * <li>Reference, 8B
+ * <li>Object header, 8B
+ * <li>String measurementId basic, 8 + 8 + 4 + 8 + 4 = 32B
+ * <li>type, encoding, compressor, 3 B
+ * <li>encodingConverter, 8 + 8 + 8 = 24B
+ * <li>props, 8B
+ * </ul>
+ * </ul>
+ */
+ public static int estimateSize(SchemaCacheEntry schemaCacheEntry) {
+ // each char takes 2B in Java
+ return 100 + 2 *
schemaCacheEntry.getMeasurementSchema().getMeasurementId().length();
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/snapshot/MemMTreeSnapshotUtil.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/snapshot/MemMTreeSnapshotUtil.java
index f5fb4876d4..6059872c20 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/snapshot/MemMTreeSnapshotUtil.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/snapshot/MemMTreeSnapshotUtil.java
@@ -28,9 +28,12 @@ import org.apache.iotdb.db.metadata.mnode.InternalMNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupEntityMNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.metadata.mnode.estimator.BasicMNodSizeEstimator;
+import org.apache.iotdb.db.metadata.mnode.estimator.IMNodeSizeEstimator;
import org.apache.iotdb.db.metadata.mnode.iterator.IMNodeIterator;
import org.apache.iotdb.db.metadata.mnode.visitor.MNodeVisitor;
import org.apache.iotdb.db.metadata.mtree.store.MemMTreeStore;
+import org.apache.iotdb.db.metadata.rescon.MemoryStatistics;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -63,6 +66,8 @@ public class MemMTreeSnapshotUtil {
"Error occurred during deserializing MemMTree.";
private static final byte VERSION = 0;
+ private static final MemoryStatistics MEMORY_STATISTICS =
MemoryStatistics.getInstance();
+ private static final IMNodeSizeEstimator ESTIMATOR = new
BasicMNodSizeEstimator();
public static boolean createSnapshot(File snapshotDir, MemMTreeStore store) {
File snapshotTmp =
@@ -105,6 +110,11 @@ public class MemMTreeSnapshotUtil {
SystemFileFactory.INSTANCE.getFile(snapshotDir,
MetadataConstant.MTREE_SNAPSHOT);
try (BufferedInputStream inputStream = new BufferedInputStream(new
FileInputStream(snapshot))) {
return deserializeFrom(inputStream, measurementProcess);
+ } catch (Throwable e) {
+ // This method is only invoked during recovery. If failed, the memory
usage should be cleared
+ // since the loaded schema will not be used.
+ MEMORY_STATISTICS.clear();
+ throw e;
}
}
@@ -203,6 +213,8 @@ public class MemMTreeSnapshotUtil {
throw new IOException("Unrecognized MNode type " + type);
}
+ MEMORY_STATISTICS.requestMemory(ESTIMATOR.estimateSize(node));
+
if (!ancestors.isEmpty()) {
node.setParent(ancestors.peek());
ancestors.peek().addChild(node);
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/MemoryStatistics.java
b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/MemoryStatistics.java
index b2070fe9f1..78d2e76db2 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/MemoryStatistics.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/MemoryStatistics.java
@@ -53,7 +53,7 @@ public class MemoryStatistics {
private MemoryStatistics() {}
public void init() {
- memoryCapacity =
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForSchema();
+ memoryCapacity =
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForSchemaRegion();
memoryUsage.getAndSet(0);
allowToCreateNewSeries = true;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index d42d02c334..a34bd22cad 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -199,7 +199,10 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
// be clear first
if (config.isClusterMode()
&&
config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RatisConsensus))
{
- FileUtils.deleteDirectory(new File(schemaRegionDirPath));
+ File schemaRegionDir = new File(schemaRegionDirPath);
+ if (schemaRegionDir.exists()) {
+ FileUtils.deleteDirectory(schemaRegionDir);
+ }
}
init();
diff --git
a/server/src/test/java/org/apache/iotdb/db/metadata/mtree/disk/MemManagerTest.java
b/server/src/test/java/org/apache/iotdb/db/metadata/mtree/disk/MemManagerTest.java
index c9b35fcb33..e76000e984 100644
---
a/server/src/test/java/org/apache/iotdb/db/metadata/mtree/disk/MemManagerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/metadata/mtree/disk/MemManagerTest.java
@@ -56,15 +56,15 @@ public class MemManagerTest {
public void setUp() throws Exception {
config = IoTDBDescriptor.getInstance().getConfig();
config.setSchemaEngineMode(SchemaEngineMode.Schema_File.toString());
- rawMemorySize = config.getAllocateMemoryForSchema();
- config.setAllocateMemoryForSchema(1500);
+ rawMemorySize = config.getAllocateMemoryForSchemaRegion();
+ config.setAllocateMemoryForSchemaRegion(1500);
EnvironmentUtils.envSetUp();
}
@After
public void tearDown() throws Exception {
EnvironmentUtils.cleanEnv();
- config.setAllocateMemoryForSchema(rawMemorySize);
+ config.setAllocateMemoryForSchemaRegion(rawMemorySize);
config.setSchemaEngineMode(SchemaEngineMode.Memory.toString());
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTest.java
b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTest.java
index 541ac96513..d1240a9530 100644
---
a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionTest.java
@@ -63,12 +63,12 @@ public class SchemaRegionTest {
isMppMode = config.isMppMode();
isClusterMode = config.isClusterMode();
schemaRegionConsensusProtocolClass =
config.getSchemaRegionConsensusProtocolClass();
- schemaMemory = config.getAllocateMemoryForSchema();
+ schemaMemory = config.getAllocateMemoryForSchemaRegion();
config.setMppMode(true);
config.setClusterMode(true);
config.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RatisConsensus);
- config.setAllocateMemoryForSchema(1024 * 1024 * 1024);
+ config.setAllocateMemoryForSchemaRegion(1024 * 1024 * 1024);
EnvironmentUtils.envSetUp();
}
@@ -78,7 +78,7 @@ public class SchemaRegionTest {
config.setMppMode(isMppMode);
config.setClusterMode(isClusterMode);
config.setSchemaRegionConsensusProtocolClass(schemaRegionConsensusProtocolClass);
- config.setAllocateMemoryForSchema(schemaMemory);
+ config.setAllocateMemoryForSchemaRegion(schemaMemory);
}
@Test