This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 1f1d2c182c2 [To dev/1.3] Pipe: Fixed the OOM bug of tablet memory 
calculation & Optimized the tablet size by memory estimation (#17451)
1f1d2c182c2 is described below

commit 1f1d2c182c2a8ee1f0ae5e5c969785043eacdf08
Author: Caideyipi <[email protected]>
AuthorDate: Fri Apr 10 14:13:35 2026 +0800

    [To dev/1.3] Pipe: Fixed the OOM bug of tablet memory calculation & 
Optimized the tablet size by memory estimation (#17451)
    
    * fix
    
    * fix
    
    * push
    
    * ger-limit
    
    * fix
    
    * [To dev/1.3] Pipe: Fixed the OOM bug of tablet memory calculation & 
Optimized the tablet size by memory estimation
    
    * fix
    
    * fix
    
    * fix
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +++
 .../container/TsFileInsertionDataContainer.java    |  4 +-
 .../scan/TsFileInsertionScanDataContainer.java     |  3 +-
 .../pipe/resource/memory/PipeMemoryWeightUtil.java | 92 ++++++----------------
 .../event/TsFileInsertionDataContainerTest.java    | 13 +++
 .../apache/iotdb/commons/conf/CommonConfig.java    |  4 +-
 .../commons/pipe/receiver/IoTDBFileReceiver.java   | 20 ++---
 7 files changed, 65 insertions(+), 81 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b2655e350a2..989dc757464 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import 
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.ConsensusFactory;
@@ -3388,6 +3389,15 @@ public class IoTDBConfig {
     this.partitionCacheSize = partitionCacheSize;
   }
 
+  public int getPipeDataStructureTabletSizeInBytes() {
+    int size = 
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes();
+    if (size > thriftMaxFrameSize) {
+      size = (int) (thriftMaxFrameSize * 0.8);
+      
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(size);
+    }
+    return size;
+  }
+
   public int getAuthorCacheSize() {
     return authorCacheSize;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
index cbbfea0a5b2..279996690ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
@@ -21,9 +21,9 @@ package 
org.apache.iotdb.db.pipe.event.common.tsfile.container;
 
 import org.apache.iotdb.commons.path.PatternTreeMap;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
@@ -92,7 +92,7 @@ public abstract class TsFileInsertionDataContainer implements 
AutoCloseable {
     this.allocatedMemoryBlockForTablet =
         PipeDataNodeResourceManager.memory()
             .forceAllocateForTabletWithRetry(
-                
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
+                
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index ce2c2b8e467..271bc317d0f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
@@ -124,7 +125,7 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
     this.allocatedMemoryBlockForBatchData =
         PipeDataNodeResourceManager.memory()
             .forceAllocateForTabletWithRetry(
-                
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
+                
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
     this.allocatedMemoryBlockForChunk =
         PipeDataNodeResourceManager.memory()
             
.forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index c9b21d780ee..a707f554c51 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.resource.memory;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
 import org.apache.iotdb.db.utils.MemUtils;
 
@@ -33,10 +34,10 @@ import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.TsPrimitiveType;
 import org.apache.tsfile.write.record.Tablet;
-import org.apache.tsfile.write.schema.MeasurementSchema;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 public class PipeMemoryWeightUtil {
 
@@ -107,7 +108,10 @@ public class PipeMemoryWeightUtil {
       }
     }
 
-    return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, 
schemaCount);
+    return calculateTabletRowCountAndMemoryBySize(
+        totalSizeInBytes,
+        schemaCount,
+        PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
   }
 
   /**
@@ -152,7 +156,8 @@ public class PipeMemoryWeightUtil {
       }
     }
 
-    return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, 
schemaCount);
+    return calculateTabletRowCountAndMemoryBySize(
+        totalSizeInBytes, schemaCount, batchData.length());
   }
 
   /**
@@ -162,22 +167,28 @@ public class PipeMemoryWeightUtil {
    * @return left is the row count of tablet, right is the memory cost of 
tablet in bytes
    */
   public static Pair<Integer, Integer> 
calculateTabletRowCountAndMemory(PipeRow row) {
-    return calculateTabletRowCountAndMemoryBySize(row.getCurrentRowSize(), 
row.size());
+    return calculateTabletRowCountAndMemoryBySize(
+        row.getCurrentRowSize(),
+        row.size(),
+        PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
   }
 
   private static Pair<Integer, Integer> calculateTabletRowCountAndMemoryBySize(
-      int rowSize, int schemaCount) {
-    if (rowSize <= 0) {
+      int rowBytesUsed, int schemaCount, int inputNum) {
+    if (rowBytesUsed <= 0) {
       return new Pair<>(1, 0);
     }
 
     // Calculate row number according to the max size of a pipe tablet.
     // "-100" is the estimated size of other data structures in a pipe tablet.
     // "*8" converts bytes to bits, because the bitmap size is 1 bit per 
schema.
-    int rowNumber =
-        8
-            * 
(PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes() - 100)
-            / (8 * rowSize + schemaCount);
+    // Here we estimate the max use of
+    int sizeLimit =
+        Math.min(
+            
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
+            (int) (inputNum * rowBytesUsed * 1.2));
+
+    int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount);
     rowNumber = Math.max(1, rowNumber);
 
     if ( // This means the row number is larger than the max row count of a 
pipe tablet
@@ -185,67 +196,14 @@ public class PipeMemoryWeightUtil {
       // Bound the row number, the memory cost is rowSize * rowNumber
       return new Pair<>(
           PipeConfig.getInstance().getPipeDataStructureTabletRowSize(),
-          rowSize * 
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
+          rowBytesUsed * 
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
     } else {
-      return new Pair<>(
-          rowNumber, 
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
+      return new Pair<>(rowNumber, sizeLimit);
     }
   }
 
-  public static long calculateTabletSizeInBytes(Tablet tablet) {
-    long totalSizeInBytes = 0;
-
-    if (tablet == null) {
-      return totalSizeInBytes;
-    }
-
-    // timestamps
-    if (tablet.timestamps != null) {
-      totalSizeInBytes += tablet.timestamps.length * 8L;
-    }
-
-    // values
-    final List<MeasurementSchema> timeseries = tablet.getSchemas();
-    if (timeseries != null) {
-      for (int column = 0; column < timeseries.size(); column++) {
-        final MeasurementSchema measurementSchema = timeseries.get(column);
-        if (measurementSchema == null) {
-          continue;
-        }
-
-        final TSDataType tsDataType = measurementSchema.getType();
-        if (tsDataType == null) {
-          continue;
-        }
-
-        if (tsDataType.isBinary()) {
-          if (tablet.values == null || tablet.values.length <= column) {
-            continue;
-          }
-          final Binary[] values = ((Binary[]) tablet.values[column]);
-          if (values == null) {
-            continue;
-          }
-          for (Binary value : values) {
-            totalSizeInBytes += value == null ? 8 : value.ramBytesUsed();
-          }
-        } else {
-          totalSizeInBytes += (long) tablet.getMaxRowNumber() * 
tsDataType.getDataTypeSize();
-        }
-      }
-    }
-
-    // bitMaps
-    if (tablet.bitMaps != null) {
-      for (int i = 0; i < tablet.bitMaps.length; i++) {
-        totalSizeInBytes += tablet.bitMaps[i] == null ? 0 : 
tablet.bitMaps[i].getSize();
-      }
-    }
-
-    // estimate other dataStructures size
-    totalSizeInBytes += 100;
-
-    return totalSizeInBytes;
+  public static long calculateTabletSizeInBytes(final Tablet tablet) {
+    return Objects.nonNull(tablet) ? tablet.ramBytesUsed() : 0L;
   }
 
   public static int calculateBatchDataRamBytesUsed(BatchData batchData) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index 2674e8b8955..bd4e3923815 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.pipe.event;
 
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
@@ -47,6 +49,7 @@ import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,9 +81,19 @@ public class TsFileInsertionDataContainerTest {
   private File alignedTsFile;
   private File nonalignedTsFile;
   private TsFileResource resource;
+  private boolean isPipeMemoryManagementEnabled;
+
+  @Before
+  public void setUp() throws Exception {
+    isPipeMemoryManagementEnabled = 
PipeConfig.getInstance().getPipeMemoryManagementEnabled();
+    
CommonDescriptor.getInstance().getConfig().setPipeMemoryManagementEnabled(false);
+  }
 
   @After
   public void tearDown() throws Exception {
+    CommonDescriptor.getInstance()
+        .getConfig()
+        .setPipeMemoryManagementEnabled(isPipeMemoryManagementEnabled);
     if (alignedTsFile != null) {
       alignedTsFile.delete();
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 44fd17c3cfe..f8e845a7af2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -214,7 +214,9 @@ public class CommonConfig {
   private boolean pipeRetryLocallyForParallelOrUserConflict = true;
 
   private int pipeDataStructureTabletRowSize = 2048;
-  private int pipeDataStructureTabletSizeInBytes = 2097152;
+
+  // 128MB
+  private int pipeDataStructureTabletSizeInBytes = 60 * 1024 * 1024;
   private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 
0.3;
   private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 
0.3;
   private volatile double pipeTotalFloatingMemoryProportion = 0.5;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 65283916939..82b85e3f828 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -141,11 +141,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
         } catch (Exception e) {
           PipeLogger.log(
               LOGGER::warn,
+              e,
               "Receiver id = %s: Failed to delete original receiver file dir 
%s, because %s.",
               receiverId.get(),
               receiverFileDirWithIdSuffix.get().getPath(),
-              e.getMessage(),
-              e);
+              e.getMessage());
         }
       } else {
         if (LOGGER.isDebugEnabled()) {
@@ -487,11 +487,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       } catch (final Exception e) {
         PipeLogger.log(
             LOGGER::warn,
+            e,
             "Receiver id = %s: Failed to close current writing file writer %s, 
because %s.",
             receiverId.get(),
             writingFile == null ? "null" : writingFile.getPath(),
-            e.getMessage(),
-            e);
+            e.getMessage());
       }
       writingFileWriter = null;
     } else {
@@ -526,11 +526,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       } catch (final Exception e) {
         PipeLogger.log(
             LOGGER::warn,
+            e,
             "Receiver id = %s: Failed to delete original writing file %s, 
because %s.",
             receiverId.get(),
             file.getPath(),
-            e.getMessage(),
-            e);
+            e.getMessage());
       }
     } else {
       if (LOGGER.isDebugEnabled()) {
@@ -611,11 +611,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     } catch (final Exception e) {
       PipeLogger.log(
           LOGGER::warn,
+          e,
           "Receiver id = %s: Failed to seal file %s from req %s.",
           receiverId.get(),
           writingFile,
-          req,
-          e);
+          req);
       return new TPipeTransferResp(
           RpcUtils.getStatus(
               TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
@@ -698,11 +698,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     } catch (final Exception e) {
       PipeLogger.log(
           LOGGER::warn,
+          e,
           "Receiver id = %s: Failed to seal file %s from req %s.",
           receiverId.get(),
           files,
-          req,
-          e);
+          req);
       return new TPipeTransferResp(
           RpcUtils.getStatus(
               TSStatusCode.PIPE_TRANSFER_FILE_ERROR,

Reply via email to