This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a72ebf6159 Pipe: Fixed multiple leakages of InsertNodeMemoryEstimator 
(#17657)
3a72ebf6159 is described below

commit 3a72ebf61596624280bc29c50ae47d7fbd97ca69
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 14 21:37:18 2026 +0800

    Pipe: Fixed multiple leakages of InsertNodeMemoryEstimator (#17657)
    
    * Fix
    
    * sptls
---
 .../resource/memory/InsertNodeMemoryEstimator.java | 405 +++++++++++++--------
 .../memory/InsertNodeMemoryEstimatorTest.java      | 265 ++++++++++++++
 2 files changed, 512 insertions(+), 158 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
index 7f1d7357b02..9522b5fb17e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -50,9 +51,12 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 public class InsertNodeMemoryEstimator {
 
@@ -183,210 +187,125 @@ public class InsertNodeMemoryEstimator {
   // =============================InsertNode==================================
 
   private static long calculateFullInsertNodeSize(final InsertNode node) {
+    return calculateFullInsertNodeSize(node, null);
+  }
+
+  private static long calculateFullInsertNodeSize(
+      final InsertNode node, final Set<Object> deduplicatedObjects) {
     long size = 0;
+    // PlanNodeId
+    size += sizeOfPlanNodeId(node.getPlanNodeId(), deduplicatedObjects);
     // PartialPath
-    size += sizeOfPartialPath(node.getTargetPath());
+    size += sizeOfPartialPath(node.getTargetPath(), deduplicatedObjects);
     // MeasurementSchemas
-    size += sizeOfMeasurementSchemas(node.getMeasurementSchemas());
+    size += sizeOfMeasurementSchemas(node.getMeasurementSchemas(), 
deduplicatedObjects);
     // Measurement
-    size += RamUsageEstimator.sizeOf(node.getMeasurements());
+    size += sizeOfStringArray(node.getMeasurements(), deduplicatedObjects);
     // dataTypes
-    size += RamUsageEstimator.shallowSizeOf(node.getDataTypes());
+    size += sizeOfShallowObject(node.getDataTypes(), deduplicatedObjects);
     // columnCategories
-    size += RamUsageEstimator.shallowSizeOf(node.getColumnCategories());
+    size += sizeOfShallowObject(node.getColumnCategories(), 
deduplicatedObjects);
     // idColumnIndices
     size += sizeOfColumnIndices(node.getColumnCategories());
     // deviceID
     if (node.isDeviceIDExists()) {
-      size += sizeOfIDeviceID(node.getDeviceID());
+      size += sizeOfIDeviceID(node.getDeviceID(), deduplicatedObjects);
     }
     // dataRegionReplicaSet
-    size += sizeOfTRegionReplicaSet(node.getRegionReplicaSet());
+    size += sizeOfTRegionReplicaSet(node.getRegionReplicaSet(), 
deduplicatedObjects);
     // progressIndex
-    size += sizeOfProgressIndex(node.getProgressIndex());
-    return size;
-  }
-
-  private static long calculateInsertNodeSizeExcludingSchemas(final InsertNode 
node) {
-    // Measurement
-    long size = 2 * 
RamUsageEstimator.shallowSizeOf(node.getMeasurementSchemas());
-    // dataTypes
-    size += RamUsageEstimator.shallowSizeOf(node.getDataTypes());
-    // columnCategories
-    size += RamUsageEstimator.shallowSizeOf(node.getColumnCategories());
-    // idColumnIndices
-    size += sizeOfColumnIndices(node.getColumnCategories());
-    // deviceID
-    if (node.isDeviceIDExists()) {
-      size += sizeOfIDeviceID(node.getDeviceID());
-    }
-    // dataRegionReplicaSet
-    size += sizeOfTRegionReplicaSet(node.getRegionReplicaSet());
-    // progressIndex
-    size += sizeOfProgressIndex(node.getProgressIndex());
+    size += sizeOfProgressIndex(node.getProgressIndex(), deduplicatedObjects);
     return size;
   }
 
   private static long sizeOfInsertTabletNode(final InsertTabletNode node) {
-    long size = INSERT_TABLET_NODE_SIZE;
-    size += calculateFullInsertNodeSize(node);
-    size += RamUsageEstimator.sizeOf(node.getTimes());
-    size += RamUsageEstimator.sizeOf(node.getBitMaps());
-    size += sizeOfColumns(node.getColumns(), node.getMeasurementSchemas());
-    final List<Integer> range = node.getRange();
-    if (range != null) {
-      size += NUM_BYTES_OBJECT_HEADER + SIZE_OF_INT * range.size();
-    }
-    return size;
+    return sizeOfInsertTabletNode(node, newDeduplicatedObjectSet());
   }
 
-  private static long calculateInsertTabletNodeSizeExcludingSchemas(final 
InsertTabletNode node) {
+  private static long sizeOfInsertTabletNode(
+      final InsertTabletNode node, final Set<Object> deduplicatedObjects) {
     long size = INSERT_TABLET_NODE_SIZE;
-
-    size += calculateInsertNodeSizeExcludingSchemas(node);
-
+    size += calculateFullInsertNodeSize(node, deduplicatedObjects);
     size += RamUsageEstimator.sizeOf(node.getTimes());
-
     size += RamUsageEstimator.sizeOf(node.getBitMaps());
-
     size += sizeOfColumns(node.getColumns(), node.getMeasurementSchemas());
-
-    final List<Integer> range = node.getRange();
-    if (range != null) {
-      size += NUM_BYTES_OBJECT_HEADER + SIZE_OF_INT * range.size();
-    }
+    size += sizeOfIntegerList(node.getRange());
     return size;
   }
 
   private static long sizeOfInsertRowNode(final InsertRowNode node) {
-    long size = INSERT_ROW_NODE_SIZE;
-    size += calculateFullInsertNodeSize(node);
-    size += sizeOfValues(node.getValues(), node.getMeasurementSchemas());
-    return size;
+    return sizeOfInsertRowNode(node, newDeduplicatedObjectSet());
   }
 
-  private static long calculateInsertRowNodeExcludingSchemas(final 
InsertRowNode node) {
+  private static long sizeOfInsertRowNode(
+      final InsertRowNode node, final Set<Object> deduplicatedObjects) {
     long size = INSERT_ROW_NODE_SIZE;
-    size += calculateInsertNodeSizeExcludingSchemas(node);
+    size += calculateFullInsertNodeSize(node, deduplicatedObjects);
     size += sizeOfValues(node.getValues(), node.getMeasurementSchemas());
     return size;
   }
 
   private static long sizeOfInsertRowsNode(final InsertRowsNode node) {
+    final Set<Object> deduplicatedObjects = newDeduplicatedObjectSet();
     long size = INSERT_ROWS_NODE_SIZE;
-    size += calculateFullInsertNodeSize(node);
-    final List<InsertRowNode> rows = node.getInsertRowNodeList();
-    final List<Integer> indexList = node.getInsertRowNodeIndexList();
-    if (rows != null && !rows.isEmpty()) {
-      // InsertRowNodeList
-      size += NUM_BYTES_OBJECT_HEADER;
-      size +=
-          (calculateInsertRowNodeExcludingSchemas(rows.get(0)) + 
NUM_BYTES_OBJECT_REF)
-              * rows.size();
-      size += sizeOfPartialPath(rows.get(0).getTargetPath());
-      size += sizeOfMeasurementSchemas(rows.get(0).getMeasurementSchemas());
-      // InsertRowNodeIndexList
-      size += NUM_BYTES_OBJECT_HEADER;
-      size += (long) indexList.size() * (SIZE_OF_INT + NUM_BYTES_OBJECT_REF);
-    }
+    size += calculateFullInsertNodeSize(node, deduplicatedObjects);
+    size += sizeOfInsertRowNodeList(node.getInsertRowNodeList(), 
deduplicatedObjects);
+    size += sizeOfIntegerList(node.getInsertRowNodeIndexList());
+    size += sizeOfResults(node.getResults());
     return size;
   }
 
   private static long sizeOfInsertRowsOfOneDeviceNode(final 
InsertRowsOfOneDeviceNode node) {
+    final Set<Object> deduplicatedObjects = newDeduplicatedObjectSet();
     long size = INSERT_ROWS_OF_ONE_DEVICE_NODE_SIZE;
-    size += calculateFullInsertNodeSize(node);
-    final List<InsertRowNode> rows = node.getInsertRowNodeList();
-    final List<Integer> indexList = node.getInsertRowNodeIndexList();
-    if (rows != null && !rows.isEmpty()) {
-      // InsertRowNodeList
-      size += NUM_BYTES_OBJECT_HEADER;
-      size +=
-          (calculateInsertRowNodeExcludingSchemas(rows.get(0)) + 
NUM_BYTES_OBJECT_REF)
-              * rows.size();
-      size += sizeOfPartialPath(rows.get(0).getTargetPath());
-      size += sizeOfMeasurementSchemas(rows.get(0).getMeasurementSchemas());
-      // InsertRowNodeIndexList
-      size += NUM_BYTES_OBJECT_HEADER;
-      size += (long) indexList.size() * (SIZE_OF_INT + NUM_BYTES_OBJECT_REF);
-    }
-    // results
-    size += NUM_BYTES_OBJECT_HEADER;
-    for (Map.Entry<Integer, TSStatus> entry : node.getResults().entrySet()) {
-      size +=
-          Integer.BYTES
-              + sizeOfTSStatus(entry.getValue())
-              + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
-    }
+    size += calculateFullInsertNodeSize(node, deduplicatedObjects);
+    size += sizeOfInsertRowNodeList(node.getInsertRowNodeList(), 
deduplicatedObjects);
+    size += sizeOfIntegerList(node.getInsertRowNodeIndexList());
+    size += sizeOfResults(node.getResults());
     return size;
   }
 
   private static long sizeOfInsertMultiTabletsNode(final 
InsertMultiTabletsNode node) {
+    final Set<Object> deduplicatedObjects = newDeduplicatedObjectSet();
     long size = INSERT_MULTI_TABLETS_NODE_SIZE;
-    size += calculateFullInsertNodeSize(node);
-    // dataTypes
-    size += RamUsageEstimator.shallowSizeOf(node.getDataTypes());
-    // columnCategories
-    size += RamUsageEstimator.shallowSizeOf(node.getColumnCategories());
-
-    final List<InsertTabletNode> rows = node.getInsertTabletNodeList();
-    final List<Integer> indexList = node.getParentInsertTabletNodeIndexList();
-    if (rows != null && !rows.isEmpty()) {
-      // InsertTabletNodeList
-      size += NUM_BYTES_OBJECT_HEADER;
-      size +=
-          (calculateInsertTabletNodeSizeExcludingSchemas(rows.get(0)) + 
NUM_BYTES_OBJECT_REF)
-              * rows.size();
-      size += sizeOfPartialPath(rows.get(0).getTargetPath());
-      size += sizeOfMeasurementSchemas(rows.get(0).getMeasurementSchemas());
-      // ParentInsertTabletNodeIndexList
-      size += NUM_BYTES_OBJECT_HEADER;
-      size += (long) indexList.size() * (SIZE_OF_INT + NUM_BYTES_OBJECT_REF);
-    }
-    // results
-    if (node.getResults() != null) {
-      size += NUM_BYTES_OBJECT_HEADER;
-      for (Map.Entry<Integer, TSStatus> entry : node.getResults().entrySet()) {
-        size +=
-            Integer.BYTES
-                + sizeOfTSStatus(entry.getValue())
-                + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
-      }
-    }
+    size += calculateFullInsertNodeSize(node, deduplicatedObjects);
+    size += sizeOfInsertTabletNodeList(node.getInsertTabletNodeList(), 
deduplicatedObjects);
+    size += sizeOfIntegerList(node.getParentInsertTabletNodeIndexList());
+    size += sizeOfResults(node.getResults());
     return size;
   }
 
   private static long sizeOfRelationalInsertRowsNode(final 
RelationalInsertRowsNode node) {
+    final Set<Object> deduplicatedObjects = newDeduplicatedObjectSet();
     long size = RELATIONAL_INSERT_ROWS_NODE_SIZE;
-    size += calculateFullInsertNodeSize(node);
-    final List<InsertRowNode> rows = node.getInsertRowNodeList();
-    final List<Integer> indexList = node.getInsertRowNodeIndexList();
-    if (rows != null && !rows.isEmpty()) {
-      // InsertRowNodeList
-      size += NUM_BYTES_OBJECT_HEADER;
-      size +=
-          (calculateInsertRowNodeExcludingSchemas(rows.get(0)) + 
NUM_BYTES_OBJECT_REF)
-              * rows.size();
-      size += sizeOfPartialPath(rows.get(0).getTargetPath());
-      size += sizeOfMeasurementSchemas(rows.get(0).getMeasurementSchemas());
-      // InsertRowNodeIndexList
-      size += NUM_BYTES_OBJECT_HEADER;
-      size += (long) indexList.size() * (SIZE_OF_INT + NUM_BYTES_OBJECT_REF);
-    }
+    size += calculateFullInsertNodeSize(node, deduplicatedObjects);
+    size += sizeOfInsertRowNodeList(node.getInsertRowNodeList(), 
deduplicatedObjects);
+    size += sizeOfIntegerList(node.getInsertRowNodeIndexList());
     // ignore deviceIDs
     return size;
   }
 
   private static long sizeOfRelationalInsertRowNode(final 
RelationalInsertRowNode node) {
+    return sizeOfRelationalInsertRowNode(node, newDeduplicatedObjectSet());
+  }
+
+  private static long sizeOfRelationalInsertRowNode(
+      final RelationalInsertRowNode node, final Set<Object> 
deduplicatedObjects) {
     long size = RELATIONAL_INSERT_ROW_NODE_SIZE;
-    size += calculateFullInsertNodeSize(node);
+    size += calculateFullInsertNodeSize(node, deduplicatedObjects);
     size += sizeOfValues(node.getValues(), node.getMeasurementSchemas());
     return size;
   }
 
   private static long sizeOfRelationalInsertTabletNode(final 
RelationalInsertTabletNode node) {
+    return sizeOfRelationalInsertTabletNode(node, newDeduplicatedObjectSet());
+  }
+
+  private static long sizeOfRelationalInsertTabletNode(
+      final RelationalInsertTabletNode node, final Set<Object> 
deduplicatedObjects) {
     long size = RELATIONAL_INSERT_TABLET_NODE_SIZE;
 
-    size += calculateFullInsertNodeSize(node);
+    size += calculateFullInsertNodeSize(node, deduplicatedObjects);
 
     size += RamUsageEstimator.sizeOf(node.getTimes());
 
@@ -394,10 +313,7 @@ public class InsertNodeMemoryEstimator {
 
     size += sizeOfColumns(node.getColumns(), node.getMeasurementSchemas());
 
-    final List<Integer> range = node.getRange();
-    if (range != null) {
-      size += NUM_BYTES_OBJECT_HEADER + (NUM_BYTES_OBJECT_REF + Integer.BYTES) 
* range.size();
-    }
+    size += sizeOfIntegerList(node.getRange());
     // ignore deviceIDs
     return size;
   }
@@ -405,9 +321,17 @@ public class InsertNodeMemoryEstimator {
   // ============================Device And 
Measurement===================================
 
   public static long sizeOfPartialPath(final PartialPath partialPath) {
+    return sizeOfPartialPath(partialPath, null);
+  }
+
+  private static long sizeOfPartialPath(
+      final PartialPath partialPath, final Set<Object> deduplicatedObjects) {
     if (partialPath == null) {
       return 0L;
     }
+    if (!shouldCountObject(partialPath, deduplicatedObjects)) {
+      return 0L;
+    }
     long size = PARTIAL_PATH_SIZE;
     final String[] nodes = partialPath.getNodes();
     if (nodes != null) {
@@ -420,22 +344,38 @@ public class InsertNodeMemoryEstimator {
   }
 
   public static long sizeOfMeasurementSchemas(final MeasurementSchema[] 
measurementSchemas) {
+    return sizeOfMeasurementSchemas(measurementSchemas, null);
+  }
+
+  private static long sizeOfMeasurementSchemas(
+      final MeasurementSchema[] measurementSchemas, final Set<Object> 
deduplicatedObjects) {
     if (measurementSchemas == null) {
       return 0L;
     }
+    if (!shouldCountObject(measurementSchemas, deduplicatedObjects)) {
+      return 0L;
+    }
     long size =
         RamUsageEstimator.alignObjectSize(
             NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * 
measurementSchemas.length);
     for (MeasurementSchema measurementSchema : measurementSchemas) {
-      size += sizeOfMeasurementSchema(measurementSchema);
+      size += sizeOfMeasurementSchema(measurementSchema, deduplicatedObjects);
     }
     return size;
   }
 
   public static long sizeOfMeasurementSchema(final MeasurementSchema 
measurementSchema) {
+    return sizeOfMeasurementSchema(measurementSchema, null);
+  }
+
+  private static long sizeOfMeasurementSchema(
+      final MeasurementSchema measurementSchema, final Set<Object> 
deduplicatedObjects) {
     if (measurementSchema == null) {
       return 0L;
     }
+    if (!shouldCountObject(measurementSchema, deduplicatedObjects)) {
+      return 0L;
+    }
     // Header + primitive + reference
     long size = MEASUREMENT_SCHEMA_SIZE;
     // measurementId
@@ -443,7 +383,7 @@ public class InsertNodeMemoryEstimator {
     // props
     final Map<String, String> props = measurementSchema.getProps();
     if (props != null) {
-      size += NUM_BYTES_OBJECT_HEADER;
+      size += RamUsageEstimator.shallowSizeOf(props);
       for (Map.Entry<String, String> entry : props.entrySet()) {
         size +=
             RamUsageEstimator.sizeOf(entry.getKey())
@@ -471,15 +411,30 @@ public class InsertNodeMemoryEstimator {
   }
 
   public static long sizeOfIDeviceID(final IDeviceID deviceID) {
-    return Objects.nonNull(deviceID) ? deviceID.ramBytesUsed() : 0L;
+    return sizeOfIDeviceID(deviceID, null);
+  }
+
+  private static long sizeOfIDeviceID(
+      final IDeviceID deviceID, final Set<Object> deduplicatedObjects) {
+    return Objects.nonNull(deviceID) && shouldCountObject(deviceID, 
deduplicatedObjects)
+        ? deviceID.ramBytesUsed()
+        : 0L;
   }
 
   // =============================Thrift==================================
 
   private static long sizeOfTRegionReplicaSet(final TRegionReplicaSet 
tRegionReplicaSet) {
+    return sizeOfTRegionReplicaSet(tRegionReplicaSet, null);
+  }
+
+  private static long sizeOfTRegionReplicaSet(
+      final TRegionReplicaSet tRegionReplicaSet, final Set<Object> 
deduplicatedObjects) {
     if (tRegionReplicaSet == null) {
       return 0L;
     }
+    if (!shouldCountObject(tRegionReplicaSet, deduplicatedObjects)) {
+      return 0L;
+    }
     // Memory alignment of basic types and reference types in structures
     long size = T_REGION_REPLICA_SET_SIZE;
     // Memory calculation in reference type, cannot get exact value, roughly 
estimate
@@ -487,9 +442,9 @@ public class InsertNodeMemoryEstimator {
       size += sizeOfTConsensusGroupId();
     }
     if (tRegionReplicaSet.isSetDataNodeLocations()) {
-      size += NUM_BYTES_OBJECT_HEADER;
+      size += sizeOfObjectList(tRegionReplicaSet.getDataNodeLocations());
       for (TDataNodeLocation tDataNodeLocation : 
tRegionReplicaSet.getDataNodeLocations()) {
-        size += sizeOfTDataNodeLocation(tDataNodeLocation);
+        size += sizeOfTDataNodeLocation(tDataNodeLocation, 
deduplicatedObjects);
       }
     }
     return size;
@@ -500,25 +455,39 @@ public class InsertNodeMemoryEstimator {
     return T_CONSENSUS_GROUP_ID_SIZE;
   }
 
-  private static long sizeOfTDataNodeLocation(final TDataNodeLocation 
tDataNodeLocation) {
+  private static long sizeOfTDataNodeLocation(
+      final TDataNodeLocation tDataNodeLocation, final Set<Object> 
deduplicatedObjects) {
     if (tDataNodeLocation == null) {
       return 0L;
     }
+    if (!shouldCountObject(tDataNodeLocation, deduplicatedObjects)) {
+      return 0L;
+    }
     long size = T_DATA_NODE_LOCATION_SIZE;
 
-    size += sizeOfTEndPoint(tDataNodeLocation.getClientRpcEndPoint());
-    size += sizeOfTEndPoint(tDataNodeLocation.getInternalEndPoint());
-    size += sizeOfTEndPoint(tDataNodeLocation.getMPPDataExchangeEndPoint());
-    size += 
sizeOfTEndPoint(tDataNodeLocation.getDataRegionConsensusEndPoint());
-    size += 
sizeOfTEndPoint(tDataNodeLocation.getSchemaRegionConsensusEndPoint());
+    size += sizeOfTEndPoint(tDataNodeLocation.getClientRpcEndPoint(), 
deduplicatedObjects);
+    size += sizeOfTEndPoint(tDataNodeLocation.getInternalEndPoint(), 
deduplicatedObjects);
+    size += sizeOfTEndPoint(tDataNodeLocation.getMPPDataExchangeEndPoint(), 
deduplicatedObjects);
+    size +=
+        sizeOfTEndPoint(tDataNodeLocation.getDataRegionConsensusEndPoint(), 
deduplicatedObjects);
+    size +=
+        sizeOfTEndPoint(tDataNodeLocation.getSchemaRegionConsensusEndPoint(), 
deduplicatedObjects);
 
     return size;
   }
 
   private static long sizeOfTEndPoint(final TEndPoint tEndPoint) {
+    return sizeOfTEndPoint(tEndPoint, null);
+  }
+
+  private static long sizeOfTEndPoint(
+      final TEndPoint tEndPoint, final Set<Object> deduplicatedObjects) {
     if (tEndPoint == null) {
       return 0L;
     }
+    if (!shouldCountObject(tEndPoint, deduplicatedObjects)) {
+      return 0L;
+    }
     // objectHeader + ip + port
     long size = T_END_POINT_SIZE;
 
@@ -527,18 +496,32 @@ public class InsertNodeMemoryEstimator {
   }
 
   private static long sizeOfTSStatus(final TSStatus tSStatus) {
+    return sizeOfTSStatus(tSStatus, null);
+  }
+
+  private static long sizeOfTSStatus(
+      final TSStatus tSStatus, final Set<Object> deduplicatedObjects) {
     if (tSStatus == null) {
       return 0L;
     }
+    if (!shouldCountObject(tSStatus, deduplicatedObjects)) {
+      return 0L;
+    }
     long size = TS_STATUS_SIZE;
     // message
     if (tSStatus.isSetMessage()) {
       size += RamUsageEstimator.sizeOf(tSStatus.message);
     }
-    // ignore subStatus
+    // subStatus
+    if (tSStatus.getSubStatus() != null) {
+      size += sizeOfObjectList(tSStatus.getSubStatus());
+      for (TSStatus subStatus : tSStatus.getSubStatus()) {
+        size += sizeOfTSStatus(subStatus, deduplicatedObjects);
+      }
+    }
     // redirectNode
     if (tSStatus.isSetRedirectNode()) {
-      size += sizeOfTEndPoint(tSStatus.redirectNode);
+      size += sizeOfTEndPoint(tSStatus.redirectNode, deduplicatedObjects);
     }
     return size;
   }
@@ -546,7 +529,14 @@ public class InsertNodeMemoryEstimator {
   // 
=============================ProgressIndex==================================
 
   private static long sizeOfProgressIndex(final ProgressIndex progressIndex) {
-    return Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0L;
+    return sizeOfProgressIndex(progressIndex, null);
+  }
+
+  private static long sizeOfProgressIndex(
+      final ProgressIndex progressIndex, final Set<Object> 
deduplicatedObjects) {
+    return Objects.nonNull(progressIndex) && shouldCountObject(progressIndex, 
deduplicatedObjects)
+        ? progressIndex.ramBytesUsed()
+        : 0L;
   }
 
   // =============================Write==================================
@@ -674,4 +664,103 @@ public class InsertNodeMemoryEstimator {
     }
     return size;
   }
+
+  private static long sizeOfPlanNodeId(
+      final PlanNodeId planNodeId, final Set<Object> deduplicatedObjects) {
+    return planNodeId != null && shouldCountObject(planNodeId, 
deduplicatedObjects)
+        ? planNodeId.ramBytesUsed()
+        : 0L;
+  }
+
+  private static long sizeOfStringArray(
+      final String[] strings, final Set<Object> deduplicatedObjects) {
+    return strings != null && shouldCountObject(strings, deduplicatedObjects)
+        ? RamUsageEstimator.sizeOf(strings)
+        : 0L;
+  }
+
+  private static long sizeOfShallowObject(
+      final Object object, final Set<Object> deduplicatedObjects) {
+    return object != null && shouldCountObject(object, deduplicatedObjects)
+        ? RamUsageEstimator.shallowSizeOf(object)
+        : 0L;
+  }
+
+  private static long sizeOfInsertRowNodeList(
+      final List<InsertRowNode> rows, final Set<Object> deduplicatedObjects) {
+    if (rows == null) {
+      return 0L;
+    }
+    long size = sizeOfObjectList(rows);
+    for (InsertRowNode row : rows) {
+      size += sizeOfContainedInsertRowNode(row, deduplicatedObjects);
+    }
+    return size;
+  }
+
+  private static long sizeOfInsertTabletNodeList(
+      final List<InsertTabletNode> tablets, final Set<Object> 
deduplicatedObjects) {
+    if (tablets == null) {
+      return 0L;
+    }
+    long size = sizeOfObjectList(tablets);
+    for (InsertTabletNode tablet : tablets) {
+      size += sizeOfInsertTabletNode(tablet, deduplicatedObjects);
+    }
+    return size;
+  }
+
+  private static long sizeOfContainedInsertRowNode(
+      final InsertRowNode node, final Set<Object> deduplicatedObjects) {
+    return node instanceof RelationalInsertRowNode
+        ? sizeOfRelationalInsertRowNode((RelationalInsertRowNode) node, 
deduplicatedObjects)
+        : sizeOfInsertRowNode(node, deduplicatedObjects);
+  }
+
+  private static long sizeOfObjectList(final List<?> list) {
+    if (list == null) {
+      return 0L;
+    }
+    long size = RamUsageEstimator.shallowSizeOf(list);
+    if (list instanceof ArrayList) {
+      size +=
+          RamUsageEstimator.alignObjectSize(
+              NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * list.size());
+    }
+    return size;
+  }
+
+  private static long sizeOfIntegerList(final List<Integer> integers) {
+    if (integers == null) {
+      return 0L;
+    }
+    long size = sizeOfObjectList(integers);
+    for (Integer ignored : integers) {
+      size += SIZE_OF_INT;
+    }
+    return size;
+  }
+
+  private static long sizeOfResults(final Map<Integer, TSStatus> results) {
+    if (results == null) {
+      return 0L;
+    }
+    long size = RamUsageEstimator.shallowSizeOf(results);
+    for (Map.Entry<Integer, TSStatus> entry : results.entrySet()) {
+      size +=
+          SIZE_OF_INT
+              + sizeOfTSStatus(entry.getValue())
+              + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
+    }
+    return size;
+  }
+
+  private static Set<Object> newDeduplicatedObjectSet() {
+    return Collections.newSetFromMap(new IdentityHashMap<>());
+  }
+
+  private static boolean shouldCountObject(
+      final Object object, final Set<Object> deduplicatedObjects) {
+    return object != null && (deduplicatedObjects == null || 
deduplicatedObjects.add(object));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimatorTest.java
new file mode 100644
index 00000000000..cefc2377dc7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimatorTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.memory;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class InsertNodeMemoryEstimatorTest {
+
+  @Test
+  public void testInsertRowsNodeLaterRowSizeIsEstimated() throws 
IllegalPathException {
+    InsertRowNode firstRow =
+        createTextInsertRowNode("child-1", "root.sg.d1", new String[] {"s1"}, 
new String[] {"v1"});
+    InsertRowNode smallSecondRow =
+        createTextInsertRowNode("child-2", "root.sg.d2", new String[] {"s1"}, 
new String[] {"v2"});
+    InsertRowNode largeSecondRow =
+        createTextInsertRowNode(
+            "child-3",
+            "root.sg.device_with_a_longer_path_segment",
+            new String[] {"s1", "measurement_with_a_longer_name", "s3"},
+            new String[] {"v2", repeatedString("payload", 32), 
repeatedString("payload", 48)});
+
+    long baselineSize =
+        InsertNodeMemoryEstimator.sizeOf(createInsertRowsNode("parent", 
firstRow, smallSecondRow));
+    long largerNodeSize =
+        InsertNodeMemoryEstimator.sizeOf(createInsertRowsNode("parent", 
firstRow, largeSecondRow));
+
+    Assert.assertTrue(largerNodeSize > baselineSize);
+  }
+
+  @Test
+  public void testInsertRowsNodeResultsAreEstimated() throws 
IllegalPathException {
+    InsertRowsNode node =
+        createInsertRowsNode(
+            "parent",
+            createTextInsertRowNode(
+                "child-1", "root.sg.d1", new String[] {"s1"}, new String[] 
{"v1"}),
+            createTextInsertRowNode(
+                "child-2", "root.sg.d2", new String[] {"s1"}, new String[] 
{"v2"}));
+
+    long sizeWithoutResults = InsertNodeMemoryEstimator.sizeOf(node);
+
+    TSStatus statusWithoutSubStatus = createStatus("outer-message");
+    node.getResults().put(1, statusWithoutSubStatus);
+    long sizeWithResults = InsertNodeMemoryEstimator.sizeOf(node);
+
+    TSStatus statusWithSubStatus = createStatus("outer-message");
+    List<TSStatus> subStatusList = new ArrayList<>();
+    subStatusList.add(createStatus(repeatedString("inner-message", 16)));
+    statusWithSubStatus.setSubStatus(subStatusList);
+    node.getResults().put(1, statusWithSubStatus);
+    long sizeWithSubStatus = InsertNodeMemoryEstimator.sizeOf(node);
+
+    Assert.assertTrue(sizeWithResults > sizeWithoutResults);
+    Assert.assertTrue(sizeWithSubStatus > sizeWithResults);
+  }
+
+  @Test
+  public void testInsertRowsOfOneDeviceNodeLaterRowSizeIsEstimated() throws 
IllegalPathException {
+    InsertRowNode firstRow =
+        createTextInsertRowNode(
+            "child-1", "root.sg.d1", new String[] {"s1", "s2"}, new String[] 
{"v1", "v2"});
+    InsertRowNode smallSecondRow =
+        createTextInsertRowNode(
+            "child-2", "root.sg.d1", new String[] {"s1", "s2"}, new String[] 
{"v3", "v4"});
+    InsertRowNode largeSecondRow =
+        createTextInsertRowNode(
+            "child-3",
+            "root.sg.d1",
+            new String[] {"s1", "s2"},
+            new String[] {repeatedString("payload", 32), 
repeatedString("payload", 48)});
+
+    long baselineSize =
+        InsertNodeMemoryEstimator.sizeOf(
+            createInsertRowsOfOneDeviceNode("parent", firstRow, 
smallSecondRow));
+    long largerNodeSize =
+        InsertNodeMemoryEstimator.sizeOf(
+            createInsertRowsOfOneDeviceNode("parent", firstRow, 
largeSecondRow));
+
+    Assert.assertTrue(largerNodeSize > baselineSize);
+  }
+
+  @Test
+  public void testInsertMultiTabletsNodeLaterTabletSizeIsEstimated() throws 
IllegalPathException {
+    InsertTabletNode firstTablet = createTextInsertTabletNode("child-1", 
"root.sg.d1", 1, 1, 2);
+    InsertTabletNode smallSecondTablet =
+        createTextInsertTabletNode("child-2", "root.sg.d2", 1, 1, 2);
+    InsertTabletNode largeSecondTablet =
+        createTextInsertTabletNode("child-3", "root.sg.d3", 3, 8, 16);
+
+    long baselineSize =
+        InsertNodeMemoryEstimator.sizeOf(
+            createInsertMultiTabletsNode("parent", firstTablet, 
smallSecondTablet));
+    long largerNodeSize =
+        InsertNodeMemoryEstimator.sizeOf(
+            createInsertMultiTabletsNode("parent", firstTablet, 
largeSecondTablet));
+
+    Assert.assertTrue(largerNodeSize > baselineSize);
+  }
+
+  @Test
+  public void testPlanNodeIdIsEstimated() throws IllegalPathException {
+    InsertRowNode shortPlanNodeIdRow =
+        createTextInsertRowNode("id", "root.sg.d1", new String[] {"s1"}, new 
String[] {"v1"});
+    InsertRowNode longPlanNodeIdRow =
+        createTextInsertRowNode(
+            repeatedString("plan-node-id", 12),
+            "root.sg.d1",
+            new String[] {"s1"},
+            new String[] {"v1"});
+
+    Assert.assertTrue(
+        InsertNodeMemoryEstimator.sizeOf(longPlanNodeIdRow)
+            > InsertNodeMemoryEstimator.sizeOf(shortPlanNodeIdRow));
+  }
+
+  private static InsertRowsNode createInsertRowsNode(
+      String planNodeId, InsertRowNode... insertRowNodes) {
+    InsertRowsNode node = new InsertRowsNode(new PlanNodeId(planNodeId));
+    for (int i = 0; i < insertRowNodes.length; i++) {
+      node.addOneInsertRowNode(insertRowNodes[i], i);
+    }
+    node.setTargetPath(insertRowNodes[0].getTargetPath());
+    node.setMeasurementSchemas(insertRowNodes[0].getMeasurementSchemas());
+    node.setMeasurements(insertRowNodes[0].getMeasurements());
+    node.setDataTypes(insertRowNodes[0].getDataTypes());
+    return node;
+  }
+
+  private static InsertRowsOfOneDeviceNode createInsertRowsOfOneDeviceNode(
+      String planNodeId, InsertRowNode... insertRowNodes) {
+    InsertRowsOfOneDeviceNode node = new InsertRowsOfOneDeviceNode(new 
PlanNodeId(planNodeId));
+    List<InsertRowNode> rows = new ArrayList<>(Arrays.asList(insertRowNodes));
+    List<Integer> indexes = new ArrayList<>();
+    for (int i = 0; i < insertRowNodes.length; i++) {
+      indexes.add(i);
+    }
+    node.setInsertRowNodeList(rows);
+    node.setInsertRowNodeIndexList(indexes);
+    return node;
+  }
+
+  private static InsertMultiTabletsNode createInsertMultiTabletsNode(
+      String planNodeId, InsertTabletNode... insertTabletNodes) {
+    InsertMultiTabletsNode node = new InsertMultiTabletsNode(new 
PlanNodeId(planNodeId));
+    for (int i = 0; i < insertTabletNodes.length; i++) {
+      node.addInsertTabletNode(insertTabletNodes[i], i);
+    }
+    return node;
+  }
+
+  private static InsertRowNode createTextInsertRowNode(
+      String planNodeId, String devicePath, String[] measurements, String[] 
values)
+      throws IllegalPathException {
+    TSDataType[] dataTypes = new TSDataType[measurements.length];
+    MeasurementSchema[] measurementSchemas = new 
MeasurementSchema[measurements.length];
+    Object[] rowValues = new Object[measurements.length];
+    for (int i = 0; i < measurements.length; i++) {
+      dataTypes[i] = TSDataType.TEXT;
+      measurementSchemas[i] = new MeasurementSchema(measurements[i], 
TSDataType.TEXT);
+      rowValues[i] = new Binary(values[i], TSFileConfig.STRING_CHARSET);
+    }
+    return new InsertRowNode(
+        new PlanNodeId(planNodeId),
+        new PartialPath(devicePath),
+        false,
+        measurements,
+        dataTypes,
+        measurementSchemas,
+        1L,
+        rowValues,
+        false);
+  }
+
+  private static InsertTabletNode createTextInsertTabletNode(
+      String planNodeId, String devicePath, int measurementCount, int 
rowCount, int repeatCount)
+      throws IllegalPathException {
+    String[] measurements = new String[measurementCount];
+    TSDataType[] dataTypes = new TSDataType[measurementCount];
+    MeasurementSchema[] measurementSchemas = new 
MeasurementSchema[measurementCount];
+    Object[] columns = new Object[measurementCount];
+    for (int measurementIndex = 0; measurementIndex < measurementCount; 
measurementIndex++) {
+      measurements[measurementIndex] = "s" + measurementIndex;
+      dataTypes[measurementIndex] = TSDataType.TEXT;
+      measurementSchemas[measurementIndex] =
+          new MeasurementSchema(measurements[measurementIndex], 
TSDataType.TEXT);
+      Binary[] values = new Binary[rowCount];
+      for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
+        values[rowIndex] =
+            new Binary(
+                repeatedString("value-" + measurementIndex + "-" + rowIndex, 
repeatCount),
+                TSFileConfig.STRING_CHARSET);
+      }
+      columns[measurementIndex] = values;
+    }
+
+    long[] times = new long[rowCount];
+    for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
+      times[rowIndex] = rowIndex;
+    }
+
+    return new InsertTabletNode(
+        new PlanNodeId(planNodeId),
+        new PartialPath(devicePath),
+        false,
+        measurements,
+        dataTypes,
+        measurementSchemas,
+        times,
+        null,
+        columns,
+        rowCount);
+  }
+
+  private static TSStatus createStatus(String message) {
+    TSStatus status = new TSStatus();
+    status.setCode(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+    status.setMessage(message);
+    return status;
+  }
+
+  private static String repeatedString(String unit, int repeatCount) {
+    StringBuilder builder = new StringBuilder(unit.length() * repeatCount);
+    for (int i = 0; i < repeatCount; i++) {
+      builder.append(unit);
+    }
+    return builder.toString();
+  }
+}


Reply via email to