This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 9e1143f4592 Pipe: Fix the Out of memory problem in WALInsertNodeCache
memory management under extreme conditions (#13538)
9e1143f4592 is described below
commit 9e1143f45920eb9bc48a568c793fe8767e2039ef
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Sep 18 18:31:45 2024 +0800
Pipe: Fix the Out of memory problem in WALInsertNodeCache memory management
under extreme conditions (#13538)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 15 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 9 +
.../resource/memory/InsertNodeMemoryEstimator.java | 802 +++++++++++++++++++++
.../plan/planner/plan/node/write/InsertNode.java | 4 +
.../dataregion/wal/utils/WALInsertNodeCache.java | 44 +-
.../consensus/index/impl/IoTProgressIndex.java | 9 +
6 files changed, 870 insertions(+), 13 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 9f6355b99eb..d2619b2ce04 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -244,6 +244,13 @@ public class IoTDBConfig {
/** The period when outdated wal files are periodically deleted. Unit:
millisecond */
private volatile long deleteWalFilesPeriodInMs = 20 * 1000L;
+ /**
+ * Enables or disables the automatic clearing of the WAL cache when a memory
compaction is
+ * triggered. When enabled, the WAL cache will be cleared to release memory
during the compaction
+ * process.
+ */
+ private volatile boolean WALCacheShrinkClearEnabled = true;
+
// endregion
/**
* The cycle when metadata log is periodically forced to be written to
disk(in milliseconds) If
@@ -2002,6 +2009,14 @@ public class IoTDBConfig {
this.deleteWalFilesPeriodInMs = deleteWalFilesPeriodInMs;
}
+ public boolean getWALCacheShrinkClearEnabled() {
+ return WALCacheShrinkClearEnabled;
+ }
+
+ void setWALCacheShrinkClearEnabled(boolean WALCacheShrinkClearEnabled) {
+ this.WALCacheShrinkClearEnabled = WALCacheShrinkClearEnabled;
+ }
+
public boolean isChunkBufferPoolEnable() {
return chunkBufferPoolEnable;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 546cea28759..442cdbb1c4e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1177,6 +1177,15 @@ public class IoTDBDescriptor {
conf.setWalBufferQueueCapacity(walBufferQueueCapacity);
}
+ boolean WALInsertNodeCacheShrinkClearEnabled =
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "wal_cache_shrink_clear_enabled",
+ Boolean.toString(conf.getWALCacheShrinkClearEnabled())));
+ if (conf.getWALCacheShrinkClearEnabled() !=
WALInsertNodeCacheShrinkClearEnabled) {
+ conf.setWALCacheShrinkClearEnabled(WALInsertNodeCacheShrinkClearEnabled);
+ }
+
loadWALHotModifiedProps(properties);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
new file mode 100644
index 00000000000..7883dda30a3
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
@@ -0,0 +1,802 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.memory;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
+import
org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
+import org.apache.iotdb.commons.path.PartialPath;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.encoding.encoder.TSEncodingBuilder;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class InsertNodeMemoryEstimator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InsertNodeMemoryEstimator.class);
+
+ private static final String INSERT_TABLET_NODE = "InsertTabletNode";
+ private static final String INSERT_ROW_NODE = "InsertRowNode";
+ private static final String INSERT_ROWS_NODE = "InsertRowsNode";
+ private static final String INSERT_ROWS_OF_ONE_DEVICE_NODE =
"InsertRowsOfOneDeviceNode";
+ private static final String INSERT_MULTI_TABLETS_NODE =
"InsertMultiTabletsNode";
+ private static final String RELATIONAL_INSERT_ROW_NODE =
"RelationalInsertRowNode";
+
+ private static final long NUM_BYTES_OBJECT_REF =
RamUsageEstimator.NUM_BYTES_OBJECT_REF;
+ private static final long NUM_BYTES_OBJECT_HEADER =
RamUsageEstimator.NUM_BYTES_OBJECT_HEADER;
+ private static final long NUM_BYTES_ARRAY_HEADER =
RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
+
+ private static final long REENTRANT_READ_WRITE_LOCK_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(ReentrantReadWriteLock.class);
+
+ private static final long TS_ENCODING_PLAIN_BUILDER_SIZE =
+
RamUsageEstimator.shallowSizeOf(TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN));
+
+ // =============================InsertNode==================================
+
+ private static final long INSERT_TABLET_NODE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(InsertTabletNode.class);
+
+ private static final long INSERT_ROW_NODE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(InsertRowNode.class);
+
+ private static final long INSERT_ROWS_NODE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(InsertRowsNode.class);
+
+ private static final long INSERT_ROWS_OF_ONE_DEVICE_NODE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(InsertRowsOfOneDeviceNode.class);
+
+ private static final long INSERT_MULTI_TABLETS_NODE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(InsertMultiTabletsNode.class);
+
+ // ============================Device And
Measurement===================================
+
+ private static final long PARTIAL_PATH_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(PartialPath.class);
+
+ private static final long MEASUREMENT_SCHEMA_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(MeasurementSchema.class);
+
+ private static final long PLAIN_DEVICE_ID_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(PlainDeviceID.class);
+
+ // =============================Thrift==================================
+
+ private static final long T_REGION_REPLICA_SET_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(TRegionReplicaSet.class);
+
+ private static final long T_DATA_NODE_LOCATION_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(TDataNodeLocation.class);
+
+ private static final long TS_STATUS_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(TSStatus.class);
+
+ private static final long T_END_POINT_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(TEndPoint.class);
+
+ private static final long T_CONSENSUS_GROUP_ID_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(TConsensusGroupId.class);
+
+ //
=============================ProgressIndex==================================
+
+ private static final long HYBRID_PROGRESS_INDEX_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(HybridProgressIndex.class);
+
+ private static final long IOT_PROGRESS_INDEX_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(IoTProgressIndex.class);
+
+ private static final long META_PROGRESS_INDEX_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(MetaProgressIndex.class);
+
+ private static final long RECOVER_PROGRESS_INDEX_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(RecoverProgressIndex.class);
+
+ private static final long SIMPLE_PROGRESS_INDEX_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(SimpleProgressIndex.class);
+
+ private static final long STATE_PROGRESS_INDEX_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(StateProgressIndex.class);
+
+ private static final long TIME_WINDOW_STATE_PROGRESS_INDEX_SIZE =
+
RamUsageEstimator.shallowSizeOfInstance(TimeWindowStateProgressIndex.class);
+
+ // =============================BitMap==================================
+
+ private static final long BIT_MAP_SIZE =
RamUsageEstimator.shallowSizeOfInstance(BitMap.class);
+
+ // ============================= Primitive Type Wrapper Classes =========
+
+ private static final long SIZE_OF_LONG =
+ RamUsageEstimator.alignObjectSize(Long.BYTES + NUM_BYTES_OBJECT_HEADER);
+ private static final long SIZE_OF_INT =
+ RamUsageEstimator.alignObjectSize(Integer.BYTES +
NUM_BYTES_OBJECT_HEADER);
+ private static final long SIZE_OF_DOUBLE =
+ RamUsageEstimator.alignObjectSize(Double.BYTES +
NUM_BYTES_OBJECT_HEADER);
+ private static final long SIZE_OF_FLOAT =
+ RamUsageEstimator.alignObjectSize(Float.BYTES + NUM_BYTES_OBJECT_HEADER);
+ private static final long SIZE_OF_BOOLEAN =
+ RamUsageEstimator.alignObjectSize(1 + NUM_BYTES_OBJECT_HEADER);
+ private static final long SIZE_OF_SHORT =
+ RamUsageEstimator.alignObjectSize(Short.BYTES + NUM_BYTES_OBJECT_HEADER);
+ private static final long SIZE_OF_STRING =
RamUsageEstimator.shallowSizeOfInstance(String.class);
+
+ // The calculated result needs to be magnified by 1.3 times, which is 1.3
times different
+ // from the actual result because the properties of the parent class are not
added.
+ private static final double INSERT_ROW_NODE_EXPANSION_FACTOR = 1.3;
+
+ public static long sizeOf(final InsertNode insertNode) {
+ try {
+ final String className = insertNode.getClass().getSimpleName();
+ switch (className) {
+ case INSERT_TABLET_NODE:
+ return sizeOfInsertTabletNode((InsertTabletNode) insertNode);
+ case INSERT_ROW_NODE:
+ return (long)
+ (sizeOfInsertRowNode((InsertRowNode) insertNode) *
INSERT_ROW_NODE_EXPANSION_FACTOR);
+ case INSERT_ROWS_NODE:
+ return sizeOfInsertRowsNode((InsertRowsNode) insertNode);
+ case INSERT_ROWS_OF_ONE_DEVICE_NODE:
+ return sizeOfInsertRowsOfOneDeviceNode((InsertRowsOfOneDeviceNode)
insertNode);
+ case INSERT_MULTI_TABLETS_NODE:
+ return sizeOfInsertMultiTabletsNode((InsertMultiTabletsNode)
insertNode);
+ default:
+ return 0L;
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to estimate size for InsertNode: {}",
e.getMessage(), e);
+ return 0L;
+ }
+ }
+
+ // =============================InsertNode==================================
+
+ private static long calculateFullInsertNodeSize(final InsertNode node) {
+ long size = 0;
+ // PartialPath
+ size += sizeOfPartialPath(node.getDevicePath());
+ // MeasurementSchemas
+ size += sizeOfMeasurementSchemas(node.getMeasurementSchemas());
+ // Measurement
+ size += sizeOfMeasurement(node.getMeasurements());
+ // dataTypes
+ size += RamUsageEstimator.shallowSizeOf(node.getDataTypes());
+ // deviceID
+ if (node.isDeviceIDExists()) {
+ size += sizeOfIDeviceID(node.getDeviceID());
+ }
+ // dataRegionReplicaSet
+ size += sizeOfTRegionReplicaSet(node.getRegionReplicaSet());
+ // progressIndex
+ size += sizeOfProgressIndex(node.getProgressIndex());
+ return size;
+ }
+
+ private static long calculateInsertNodeSizeExcludingSchemas(final InsertNode
node) {
+ // Measurement
+ long size = 2 *
RamUsageEstimator.shallowSizeOf(node.getMeasurementSchemas());
+ // dataTypes
+ size += RamUsageEstimator.shallowSizeOf(node.getDataTypes());
+ // deviceID
+ if (node.isDeviceIDExists()) {
+ size += sizeOfIDeviceID(node.getDeviceID());
+ }
+ // dataRegionReplicaSet
+ size += sizeOfTRegionReplicaSet(node.getRegionReplicaSet());
+ // progressIndex
+ size += sizeOfProgressIndex(node.getProgressIndex());
+ return size;
+ }
+
+ private static long sizeOfInsertTabletNode(final InsertTabletNode node) {
+ long size = INSERT_TABLET_NODE_SIZE;
+ size += calculateFullInsertNodeSize(node);
+ size += sizeOfTimes(node.getTimes());
+ size += sizeOfBitMapArray(node.getBitMaps());
+ size += sizeOfColumns(node.getColumns(), node.getMeasurementSchemas());
+ final List<Integer> range = node.getRange();
+ if (range != null) {
+ size += NUM_BYTES_OBJECT_HEADER + SIZE_OF_INT * range.size();
+ }
+ return size;
+ }
+
+ private static long calculateInsertTabletNodeSizeExcludingSchemas(final
InsertTabletNode node) {
+ long size = INSERT_TABLET_NODE_SIZE;
+
+ size += calculateInsertNodeSizeExcludingSchemas(node);
+
+ size += sizeOfTimes(node.getTimes());
+
+ size += sizeOfBitMapArray(node.getBitMaps());
+
+ size += sizeOfColumns(node.getColumns(), node.getMeasurementSchemas());
+
+ final List<Integer> range = node.getRange();
+ if (range != null) {
+ size += NUM_BYTES_OBJECT_HEADER + SIZE_OF_INT * range.size();
+ }
+ return size;
+ }
+
+ private static long sizeOfInsertRowNode(final InsertRowNode node) {
+ long size = INSERT_ROW_NODE_SIZE;
+ size += calculateFullInsertNodeSize(node);
+ size += sizeOfValues(node.getValues(), node.getMeasurementSchemas());
+ return size;
+ }
+
+ private static long calculateInsertRowNodeExcludingSchemas(final
InsertRowNode node) {
+ long size = INSERT_ROW_NODE_SIZE;
+ size += calculateInsertNodeSizeExcludingSchemas(node);
+ size += sizeOfValues(node.getValues(), node.getMeasurementSchemas());
+ return size;
+ }
+
+ private static long sizeOfInsertRowsNode(final InsertRowsNode node) {
+ long size = INSERT_ROWS_NODE_SIZE;
+ size += calculateFullInsertNodeSize(node);
+ final List<InsertRowNode> rows = node.getInsertRowNodeList();
+ final List<Integer> indexList = node.getInsertRowNodeIndexList();
+ if (rows != null && !rows.isEmpty()) {
+ // InsertRowNodeList
+ size += NUM_BYTES_OBJECT_HEADER;
+ size +=
+ (calculateInsertRowNodeExcludingSchemas(rows.get(0)) +
NUM_BYTES_OBJECT_REF)
+ * rows.size();
+ size += sizeOfPartialPath(rows.get(0).getDevicePath());
+ size += sizeOfMeasurementSchemas(rows.get(0).getMeasurementSchemas());
+ // InsertRowNodeIndexList
+ size += NUM_BYTES_OBJECT_HEADER;
+ size += (long) indexList.size() * (SIZE_OF_INT + NUM_BYTES_OBJECT_REF);
+ }
+ return size;
+ }
+
+ private static long sizeOfInsertRowsOfOneDeviceNode(final
InsertRowsOfOneDeviceNode node) {
+ long size = INSERT_ROWS_OF_ONE_DEVICE_NODE_SIZE;
+ size += calculateFullInsertNodeSize(node);
+ final List<InsertRowNode> rows = node.getInsertRowNodeList();
+ final List<Integer> indexList = node.getInsertRowNodeIndexList();
+ if (rows != null && !rows.isEmpty()) {
+ // InsertRowNodeList
+ size += NUM_BYTES_OBJECT_HEADER;
+ size +=
+ (calculateInsertRowNodeExcludingSchemas(rows.get(0)) +
NUM_BYTES_OBJECT_REF)
+ * rows.size();
+ size += sizeOfPartialPath(rows.get(0).getDevicePath());
+ size += sizeOfMeasurementSchemas(rows.get(0).getMeasurementSchemas());
+ // InsertRowNodeIndexList
+ size += NUM_BYTES_OBJECT_HEADER;
+ size += (long) indexList.size() * (SIZE_OF_INT + NUM_BYTES_OBJECT_REF);
+ }
+ // results
+ size += NUM_BYTES_OBJECT_HEADER;
+ for (Map.Entry<Integer, TSStatus> entry : node.getResults().entrySet()) {
+ size +=
+ Integer.BYTES
+ + sizeOfTSStatus(entry.getValue())
+ + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
+ }
+ return size;
+ }
+
+ private static long sizeOfInsertMultiTabletsNode(final
InsertMultiTabletsNode node) {
+ long size = INSERT_MULTI_TABLETS_NODE_SIZE;
+ size += calculateFullInsertNodeSize(node);
+ // dataTypes
+ size += RamUsageEstimator.shallowSizeOf(node.getDataTypes());
+
+ final List<InsertTabletNode> rows = node.getInsertTabletNodeList();
+ final List<Integer> indexList = node.getParentInsertTabletNodeIndexList();
+ if (rows != null && !rows.isEmpty()) {
+ // InsertTabletNodeList
+ size += NUM_BYTES_OBJECT_HEADER;
+ size +=
+ (calculateInsertTabletNodeSizeExcludingSchemas(rows.get(0)) +
NUM_BYTES_OBJECT_REF)
+ * rows.size();
+ size += sizeOfPartialPath(rows.get(0).getDevicePath());
+ size += sizeOfMeasurementSchemas(rows.get(0).getMeasurementSchemas());
+ // ParentInsertTabletNodeIndexList
+ size += NUM_BYTES_OBJECT_HEADER;
+ size += (long) indexList.size() * (SIZE_OF_INT + NUM_BYTES_OBJECT_REF);
+ }
+ // results
+ if (node.getResults() != null) {
+ size += NUM_BYTES_OBJECT_HEADER;
+ for (Map.Entry<Integer, TSStatus> entry : node.getResults().entrySet()) {
+ size +=
+ Integer.BYTES
+ + sizeOfTSStatus(entry.getValue())
+ + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
+ }
+ }
+ return size;
+ }
+
+ // ============================Device And
Measurement===================================
+
+ private static long sizeOfPartialPath(final PartialPath partialPath) {
+ if (partialPath == null) {
+ return 0L;
+ }
+ long size = PARTIAL_PATH_SIZE;
+ final String[] nodes = partialPath.getNodes();
+ if (nodes != null) {
+ // Since fullPath may be lazy loaded, lazy loading will not be triggered
here, so it is
+ // assumed that the memory size of fullPath is the same as that of nodes.
+ size += sizeOfStringArray(nodes) * 2;
+ size += TsFileConstant.PATH_SEPARATOR.length() * (nodes.length - 1) +
NUM_BYTES_OBJECT_HEADER;
+ }
+ return size;
+ }
+
+ private static long sizeOfMeasurementSchemas(final MeasurementSchema[]
measurementSchemas) {
+ if (measurementSchemas == null) {
+ return 0L;
+ }
+ long size =
+ RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF *
measurementSchemas.length);
+ for (MeasurementSchema measurementSchema : measurementSchemas) {
+ size += sizeOfMeasurementSchema(measurementSchema);
+ }
+ return size;
+ }
+
+ private static long sizeOfMeasurementSchema(final MeasurementSchema
measurementSchema) {
+ if (measurementSchema == null) {
+ return 0L;
+ }
+ // Header + primitive + reference
+ long size = MEASUREMENT_SCHEMA_SIZE;
+ // measurementId
+ size += sizeOfString(measurementSchema.getMeasurementId());
+ // props
+ final Map<String, String> props = measurementSchema.getProps();
+ if (props != null) {
+ size += NUM_BYTES_OBJECT_HEADER;
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ size +=
+ sizeOfString(entry.getKey())
+ + sizeOfString(entry.getValue())
+ + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
+ }
+ }
+ size += TS_ENCODING_PLAIN_BUILDER_SIZE;
+ return size;
+ }
+
+ private static long sizeOfMeasurement(final String[] measurement) {
+ if (measurement == null) {
+ return 0L;
+ }
+ return sizeOfStringArray(measurement);
+ }
+
+ private static long sizeOfIDeviceID(final IDeviceID deviceID) {
+ if (deviceID == null) {
+ return 0L;
+ }
+ return sizeOfPlainDeviceID((PlainDeviceID) deviceID);
+ }
+
+ private static long sizeOfPlainDeviceID(final PlainDeviceID deviceID) {
+ long size = PLAIN_DEVICE_ID_SIZE;
+ final String id = deviceID.toString();
+
+ if (id != null) {
+ // Estimate the sum of the table and segment lengths to be the size of
the id
+ size += sizeOfString(id) * 2;
+ }
+
+ return size;
+ }
+
+ // =============================Thrift==================================
+
+ private static long sizeOfTRegionReplicaSet(final TRegionReplicaSet
tRegionReplicaSet) {
+ if (tRegionReplicaSet == null) {
+ return 0L;
+ }
+ // Memory alignment of basic types and reference types in structures
+ long size = T_REGION_REPLICA_SET_SIZE;
+ // Memory calculation in reference type, cannot get exact value, roughly
estimate
+ if (tRegionReplicaSet.isSetRegionId()) {
+ size += sizeOfTConsensusGroupId();
+ }
+ if (tRegionReplicaSet.isSetDataNodeLocations()) {
+ size += NUM_BYTES_OBJECT_HEADER;
+ for (TDataNodeLocation tDataNodeLocation :
tRegionReplicaSet.getDataNodeLocations()) {
+ size += sizeOfTDataNodeLocation(tDataNodeLocation);
+ }
+ }
+ return size;
+ }
+
+ private static long sizeOfTConsensusGroupId() {
+ // objectHeader + type + id
+ return T_CONSENSUS_GROUP_ID_SIZE;
+ }
+
+ private static long sizeOfTDataNodeLocation(final TDataNodeLocation
tDataNodeLocation) {
+ if (tDataNodeLocation == null) {
+ return 0L;
+ }
+ long size = T_DATA_NODE_LOCATION_SIZE;
+
+ size += sizeOfTEndPoint(tDataNodeLocation.getClientRpcEndPoint());
+ size += sizeOfTEndPoint(tDataNodeLocation.getInternalEndPoint());
+ size += sizeOfTEndPoint(tDataNodeLocation.getMPPDataExchangeEndPoint());
+ size +=
sizeOfTEndPoint(tDataNodeLocation.getDataRegionConsensusEndPoint());
+ size +=
sizeOfTEndPoint(tDataNodeLocation.getSchemaRegionConsensusEndPoint());
+
+ return size;
+ }
+
+ private static long sizeOfTEndPoint(final TEndPoint tEndPoint) {
+ if (tEndPoint == null) {
+ return 0L;
+ }
+ // objectHeader + ip + port
+ long size = T_END_POINT_SIZE;
+
+ size += sizeOfString(tEndPoint.ip);
+ return size;
+ }
+
+ private static long sizeOfTSStatus(final TSStatus tSStatus) {
+ if (tSStatus == null) {
+ return 0L;
+ }
+ long size = TS_STATUS_SIZE;
+ // message
+ if (tSStatus.isSetMessage()) {
+ size += sizeOfString(tSStatus.message);
+ }
+ // ignore subStatus
+ // redirectNode
+ if (tSStatus.isSetRedirectNode()) {
+ size += sizeOfTEndPoint(tSStatus.redirectNode);
+ }
+ return size;
+ }
+
+ //
=============================ProgressIndex==================================
+
+ private static long sizeOfProgressIndex(final ProgressIndex progressIndex) {
+ if (progressIndex == null) {
+ return 0L;
+ }
+ switch (progressIndex.getType()) {
+ case HYBRID_PROGRESS_INDEX:
+ return sizeOfHybridProgressIndex((HybridProgressIndex) progressIndex);
+ case IOT_PROGRESS_INDEX:
+ return sizeOfIoTProgressIndex((IoTProgressIndex) progressIndex);
+ case META_PROGRESS_INDEX:
+ return sizeOfMetaProgressIndex();
+ case STATE_PROGRESS_INDEX:
+ return sizeOfStateProgressIndex((StateProgressIndex) progressIndex);
+ case SIMPLE_PROGRESS_INDEX:
+ return sizeOfSimpleProgressIndex();
+ case MINIMUM_PROGRESS_INDEX:
+ return 0L;
+ case RECOVER_PROGRESS_INDEX:
+ return sizeOfRecoverProgressIndex((RecoverProgressIndex)
progressIndex);
+ case TIME_WINDOW_STATE_PROGRESS_INDEX:
+ return
sizeOfTimeWindowStateProgressIndex((TimeWindowStateProgressIndex)
progressIndex);
+ }
+ return 0L;
+ }
+
+ private static long sizeOfHybridProgressIndex(final HybridProgressIndex
progressIndex) {
+ // Memory alignment of basic types and reference types in structures
+ long size = HYBRID_PROGRESS_INDEX_SIZE;
+
+ // Memory calculation in reference type, cannot get exact value, roughly
estimate
+ size += REENTRANT_READ_WRITE_LOCK_SIZE;
+ if (progressIndex.getType2Index() != null) {
+ // ignore ProgressIndex
+ size +=
+ NUM_BYTES_OBJECT_HEADER
+ + progressIndex.getType2Index().size()
+ * (SIZE_OF_SHORT +
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
+ }
+ return size;
+ }
+
+ private static long sizeOfIoTProgressIndex(IoTProgressIndex progressIndex) {
+ // Memory alignment of basic types and reference types in structures
+ long size = IOT_PROGRESS_INDEX_SIZE;
+
+ // Memory calculation in reference type, cannot get exact value, roughly
estimate
+ size += REENTRANT_READ_WRITE_LOCK_SIZE;
+
+ size +=
+ NUM_BYTES_OBJECT_HEADER
+ + progressIndex.getPeerId2SearchIndexSize()
+ * (SIZE_OF_INT + SIZE_OF_LONG +
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
+
+ return size;
+ }
+
+ private static long sizeOfMetaProgressIndex() {
+ // Memory alignment of basic types and reference types in structures
+ return META_PROGRESS_INDEX_SIZE + REENTRANT_READ_WRITE_LOCK_SIZE;
+ }
+
+ private static long sizeOfRecoverProgressIndex(RecoverProgressIndex
progressIndex) {
+ // Memory alignment of basic types and reference types in structures
+ long size = RECOVER_PROGRESS_INDEX_SIZE;
+
+ // Memory calculation in reference type, cannot get exact value, roughly
estimate
+ size += REENTRANT_READ_WRITE_LOCK_SIZE;
+ if (progressIndex.getDataNodeId2LocalIndex() != null) {
+ size +=
+ NUM_BYTES_OBJECT_HEADER
+ + progressIndex.getDataNodeId2LocalIndex().size()
+ * (SIZE_OF_INT + SIZE_OF_LONG +
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
+ }
+ return size;
+ }
+
+ private static long sizeOfSimpleProgressIndex() {
+ // Memory alignment of basic types and reference types in structures
+ return SIMPLE_PROGRESS_INDEX_SIZE;
+ }
+
+ private static long sizeOfStateProgressIndex(StateProgressIndex
progressIndex) {
+ // Memory alignment of basic types and reference types in structures
+ long size = STATE_PROGRESS_INDEX_SIZE;
+
+ // Memory calculation in reference type, cannot get exact value, roughly
estimate
+ size += REENTRANT_READ_WRITE_LOCK_SIZE;
+ if (progressIndex.getState() != null) {
+ size += NUM_BYTES_OBJECT_HEADER;
+ for (Map.Entry<String, Binary> entry :
progressIndex.getState().entrySet()) {
+ size +=
+ RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
+ + sizeOfString(entry.getKey())
+ + sizeOfBinary(entry.getValue());
+ }
+ }
+ return size;
+ }
+
+ private static long sizeOfTimeWindowStateProgressIndex(
+ TimeWindowStateProgressIndex progressIndex) {
+ // Memory alignment of basic types and reference types in structures
+ long size = TIME_WINDOW_STATE_PROGRESS_INDEX_SIZE;
+
+ // Memory calculation in reference type, cannot get exact value, roughly
estimate
+ size += REENTRANT_READ_WRITE_LOCK_SIZE;
+ if (progressIndex.getTimeSeries2TimestampWindowBufferPairMap() != null) {
+ size += NUM_BYTES_OBJECT_HEADER;
+ for (Map.Entry<String, Pair<Long, ByteBuffer>> entry :
+
progressIndex.getTimeSeries2TimestampWindowBufferPairMap().entrySet()) {
+ size +=
+ sizeOfString(entry.getKey())
+ + SIZE_OF_LONG * 2
+ + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
+ }
+ }
+ return size;
+ }
+
+ // =============================Write==================================
+
+ private static long sizeOfBinary(Binary binary) {
+ if (binary == null) {
+ return 0;
+ }
+ // -----header----
+ // -----ref-------
+ // ---------------
+ // --arrayHeader--
+ // ----values-----
+ return RamUsageEstimator.alignObjectSize(NUM_BYTES_OBJECT_HEADER +
NUM_BYTES_OBJECT_REF)
+ + RamUsageEstimator.alignObjectSize(NUM_BYTES_ARRAY_HEADER +
binary.getValues().length);
+ }
+
+ private static long sizeOfString(String value) {
+ if (value == null) {
+ return 0;
+ }
+ // -----header----
+ // -----ref-------
+ // ---------------
+ // --arrayHeader--
+ // ----values-----
+ return SIZE_OF_STRING
+ + RamUsageEstimator.alignObjectSize(NUM_BYTES_ARRAY_HEADER +
value.length());
+ }
+
+ private static long sizeOfStringArray(final String[] values) {
+ if (values == null) {
+ return 0;
+ }
+ long size =
+ RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + values.length * NUM_BYTES_OBJECT_REF);
+ for (String value : values) {
+ size += sizeOfString(value);
+ }
+ return size;
+ }
+
+ private static long sizeOfTimes(final long[] times) {
+ if (times == null) {
+ return 0;
+ }
+ long size = NUM_BYTES_ARRAY_HEADER + (long) Long.BYTES * times.length;
+ return RamUsageEstimator.alignObjectSize(size);
+ }
+
+ private static long sizeOfBitMapArray(BitMap[] bitMaps) {
+ if (bitMaps == null) {
+ return 0L;
+ }
+ long size =
+ RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * bitMaps.length);
+ for (BitMap bitMap : bitMaps) {
+ size += sizeOfBitMap(bitMap);
+ }
+ return size;
+ }
+
+ private static long sizeOfBitMap(final BitMap bitMaps) {
+ if (bitMaps == null) {
+ return 0L;
+ }
+ long size = BIT_MAP_SIZE;
+
+ size +=
+ RamUsageEstimator.alignObjectSize(NUM_BYTES_ARRAY_HEADER +
bitMaps.getByteArray().length);
+ return size;
+ }
+
+ private static long sizeOfColumns(
+ final Object[] columns, final MeasurementSchema[] measurementSchemas) {
+ long size =
+ RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns.length);
+ for (int i = 0; i < columns.length; i++) {
+ switch (measurementSchemas[i].getType()) {
+ case INT64:
+ case TIMESTAMP:
+ {
+ size += RamUsageEstimator.sizeOf((long[]) columns[i]);
+ break;
+ }
+ case DATE:
+ case INT32:
+ {
+ size += RamUsageEstimator.sizeOf((int[]) columns[i]);
+ break;
+ }
+ case DOUBLE:
+ {
+ size += RamUsageEstimator.sizeOf((double[]) columns[i]);
+ break;
+ }
+ case FLOAT:
+ {
+ size += RamUsageEstimator.sizeOf((float[]) columns[i]);
+ break;
+ }
+ case BOOLEAN:
+ {
+ size += RamUsageEstimator.sizeOf((boolean[]) columns[i]);
+ break;
+ }
+ case STRING:
+ case TEXT:
+ case BLOB:
+ {
+ final Binary[] values = (Binary[]) columns[i];
+ size +=
+ RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF *
values.length);
+ for (Binary value : values) {
+ size += sizeOfBinary(value);
+ }
+ break;
+ }
+ }
+ }
+ return size;
+ }
+
+ private static long sizeOfValues(
+ final Object[] columns, final MeasurementSchema[] measurementSchemas) {
+ long size =
+ RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns.length);
+ for (int i = 0; i < columns.length; i++) {
+ switch (measurementSchemas[i].getType()) {
+ case INT64:
+ case TIMESTAMP:
+ {
+ size += SIZE_OF_LONG;
+ break;
+ }
+ case DATE:
+ case INT32:
+ {
+ size += SIZE_OF_INT;
+ break;
+ }
+ case DOUBLE:
+ {
+ size += SIZE_OF_DOUBLE;
+ break;
+ }
+ case FLOAT:
+ {
+ size += SIZE_OF_FLOAT;
+ break;
+ }
+ case BOOLEAN:
+ {
+ size += SIZE_OF_BOOLEAN;
+ break;
+ }
+ case STRING:
+ case TEXT:
+ case BLOB:
+ {
+ final Binary binary = (Binary) columns[i];
+ size += sizeOfBinary(binary);
+ }
+ }
+ }
+ return size;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 06581922e1b..f667e955324 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -156,6 +156,10 @@ public abstract class InsertNode extends SearchNode
implements ComparableConsens
this.deviceID = deviceID;
}
+ public boolean isDeviceIDExists() {
+ return deviceID != null;
+ }
+
public boolean isGeneratedByRemoteConsensusLeader() {
switch (config.getDataRegionConsensusProtocolClass()) {
case ConsensusFactory.IOT_CONSENSUS:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index fb4f6e70d9b..958b429ae2e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.metric.PipeWALInsertNodeCacheMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -80,17 +81,6 @@ public class WALInsertNodeCache {
PipeDataNodeResourceManager.memory()
.tryAllocate(requestedAllocateSize)
.setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
- .setShrinkCallback(
- (oldMemory, newMemory) -> {
- memoryUsageCheatFactor.updateAndGet(
- factor -> factor * ((double) oldMemory / newMemory));
- isBatchLoadEnabled.set(newMemory >=
CONFIG.getWalFileSizeThresholdInByte());
- LOGGER.info(
- "WALInsertNodeCache.allocatedMemoryBlock of dataRegion
{} has shrunk from {} to {}.",
- dataRegionId,
- oldMemory,
- newMemory);
- })
.setExpandMethod(
oldMemory -> Math.min(Math.max(oldMemory, 1) * 2,
requestedAllocateSize))
.setExpandCallback(
@@ -112,8 +102,15 @@ public class WALInsertNodeCache {
.weigher(
(Weigher<WALEntryPosition, Pair<ByteBuffer, InsertNode>>)
(position, pair) -> {
- final long weightInLong =
- (long) (position.getSize() *
memoryUsageCheatFactor.get());
+ long weightInLong = 0L;
+ if (pair.right != null) {
+ weightInLong =
+ (long)
+ (InsertNodeMemoryEstimator.sizeOf(pair.right)
+ * memoryUsageCheatFactor.get());
+ } else {
+ weightInLong = (long) (position.getSize() *
memoryUsageCheatFactor.get());
+ }
if (weightInLong <= 0) {
return Integer.MAX_VALUE;
}
@@ -122,6 +119,27 @@ public class WALInsertNodeCache {
})
.recordStats()
.build(new WALInsertNodeCacheLoader());
+ allocatedMemoryBlock.setShrinkCallback(
+ (oldMemory, newMemory) -> {
+ memoryUsageCheatFactor.updateAndGet(factor -> factor * ((double)
oldMemory / newMemory));
+ isBatchLoadEnabled.set(newMemory >=
CONFIG.getWalFileSizeThresholdInByte());
+ LOGGER.info(
+ "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has
shrunk from {} to {}.",
+ dataRegionId,
+ oldMemory,
+ newMemory);
+ if (CONFIG.getWALCacheShrinkClearEnabled()) {
+ try {
+ lruCache.cleanUp();
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to clear WALInsertNodeCache for dataRegion ID: {}.",
dataRegionId, e);
+ return;
+ }
+ LOGGER.info(
+ "Successfully cleared WALInsertNodeCache for dataRegion ID:
{}.", dataRegionId);
+ }
+ });
PipeWALInsertNodeCacheMetrics.getInstance().register(this, dataRegionId);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
index d1fa57c9406..a95faea31e8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
@@ -200,6 +200,15 @@ public class IoTProgressIndex extends ProgressIndex {
}
}
+ public int getPeerId2SearchIndexSize() {
+ lock.readLock().lock();
+ try {
+ return peerId2SearchIndex.size();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
public static IoTProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
final IoTProgressIndex ioTProgressIndex = new IoTProgressIndex();
final int size = ReadWriteIOUtils.readInt(byteBuffer);