This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e4661bef60 Pipe: Optimized the tablet size by memory estimation 
(#17452)
3e4661bef60 is described below

commit 3e4661bef60f0a43acdd63aaabe2ddc955f9581e
Author: Caideyipi <[email protected]>
AuthorDate: Fri Apr 10 14:13:59 2026 +0800

    Pipe: Optimized the tablet size by memory estimation (#17452)
    
    * fix
    
    * push
    
    * ger-limit
    
    * fix
    
    * fix
    
    * fix
    
    * sptls
    
    * Update CommonConfig.java
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +++++++
 .../tsfile/parser/TsFileInsertionEventParser.java  |  4 +--
 .../scan/TsFileInsertionEventScanParser.java       |  3 +-
 .../table/TsFileInsertionEventTableParser.java     |  6 ++--
 .../pipe/resource/memory/PipeMemoryWeightUtil.java | 34 ++++++++++++++--------
 .../pipe/event/TsFileInsertionEventParserTest.java | 13 +++++++++
 .../apache/iotdb/commons/conf/CommonConfig.java    |  4 ++-
 .../commons/pipe/receiver/IoTDBFileReceiver.java   | 24 +++++++--------
 8 files changed, 68 insertions(+), 30 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 69eab2d79cf..31f446df270 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProper
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.enums.ReadConsistencyLevel;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.exception.LoadConfigurationException;
@@ -3287,6 +3288,15 @@ public class IoTDBConfig {
     this.partitionCacheSize = partitionCacheSize;
   }
 
+  public int getPipeDataStructureTabletSizeInBytes() {
+    int size = 
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes();
+    if (size > thriftMaxFrameSize) {
+      size = (int) (thriftMaxFrameSize * 0.8);
+      
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(size);
+    }
+    return size;
+  }
+
   public int getAuthorCacheSize() {
     return authorCacheSize;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
index 82f3e0ea2b8..678e7a4a62a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
@@ -22,9 +22,9 @@ package org.apache.iotdb.db.pipe.event.common.tsfile.parser;
 import org.apache.iotdb.commons.audit.IAuditEntity;
 import org.apache.iotdb.commons.path.PatternTreeMap;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
 import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -106,7 +106,7 @@ public abstract class TsFileInsertionEventParser implements 
AutoCloseable {
     this.allocatedMemoryBlockForTablet =
         PipeDataNodeResourceManager.memory()
             .forceAllocateForTabletWithRetry(
-                
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
+                
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 2bd22c8b0bc..da9d7d00477 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
@@ -135,7 +136,7 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     this.allocatedMemoryBlockForBatchData =
         PipeDataNodeResourceManager.memory()
             .forceAllocateForTabletWithRetry(
-                
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
+                
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
     this.allocatedMemoryBlockForChunk =
         PipeDataNodeResourceManager.memory()
             
.forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index e353163e726..af2fd214e4a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -93,7 +93,7 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
               
.forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed());
       long tableSize =
           Math.min(
-              PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes(),
+              
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
               IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize());
 
       this.allocatedMemoryBlockForChunk =
@@ -107,7 +107,9 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
       this.allocatedMemoryBlockForTableSchemas =
           PipeDataNodeResourceManager.memory()
               .forceAllocateForTabletWithRetry(
-                  
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
+                  IoTDBDescriptor.getInstance()
+                      .getConfig()
+                      .getPipeDataStructureTabletSizeInBytes());
 
       this.startTime = startTime;
       this.endTime = endTime;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index 458434d8494..833bd3577eb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.resource.memory;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
 import org.apache.iotdb.db.utils.MemUtils;
 
@@ -118,7 +119,10 @@ public class PipeMemoryWeightUtil {
       }
     }
 
-    return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, 
schemaCount);
+    return calculateTabletRowCountAndMemoryBySize(
+        totalSizeInBytes,
+        schemaCount,
+        PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
   }
 
   /**
@@ -163,7 +167,8 @@ public class PipeMemoryWeightUtil {
       }
     }
 
-    return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, 
schemaCount);
+    return calculateTabletRowCountAndMemoryBySize(
+        totalSizeInBytes, schemaCount, batchData.length());
   }
 
   /**
@@ -173,22 +178,28 @@ public class PipeMemoryWeightUtil {
    * @return left is the row count of tablet, right is the memory cost of 
tablet in bytes
    */
   public static Pair<Integer, Integer> 
calculateTabletRowCountAndMemory(PipeRow row) {
-    return calculateTabletRowCountAndMemoryBySize(row.getCurrentRowSize(), 
row.size());
+    return calculateTabletRowCountAndMemoryBySize(
+        row.getCurrentRowSize(),
+        row.size(),
+        PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
   }
 
   private static Pair<Integer, Integer> calculateTabletRowCountAndMemoryBySize(
-      int rowSize, int schemaCount) {
-    if (rowSize <= 0) {
+      int rowBytesUsed, int schemaCount, int inputNum) {
+    if (rowBytesUsed <= 0) {
       return new Pair<>(1, 0);
     }
 
     // Calculate row number according to the max size of a pipe tablet.
     // "-100" is the estimated size of other data structures in a pipe tablet.
     // "*8" converts bytes to bits, because the bitmap size is 1 bit per 
schema.
-    int rowNumber =
-        8
-            * 
(PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes() - 100)
-            / (8 * rowSize + schemaCount);
+    // Here we estimate the max use of
+    int sizeLimit =
+        Math.min(
+            
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
+            (int) (inputNum * rowBytesUsed * 1.2));
+
+    int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount);
     rowNumber = Math.max(1, rowNumber);
 
     if ( // This means the row number is larger than the max row count of a 
pipe tablet
@@ -196,10 +207,9 @@ public class PipeMemoryWeightUtil {
       // Bound the row number, the memory cost is rowSize * rowNumber
       return new Pair<>(
           PipeConfig.getInstance().getPipeDataStructureTabletRowSize(),
-          rowSize * 
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
+          rowBytesUsed * 
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
     } else {
-      return new Pair<>(
-          rowNumber, 
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
+      return new Pair<>(rowNumber, sizeLimit);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index 8814190755e..a2e7c558ea0 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.pipe.event;
 
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixTreePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
@@ -48,6 +50,7 @@ import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,9 +82,19 @@ public class TsFileInsertionEventParserTest {
   private File alignedTsFile;
   private File nonalignedTsFile;
   private TsFileResource resource;
+  private boolean isPipeMemoryManagementEnabled;
+
+  @Before
+  public void setUp() throws Exception {
+    isPipeMemoryManagementEnabled = 
PipeConfig.getInstance().getPipeMemoryManagementEnabled();
+    
CommonDescriptor.getInstance().getConfig().setPipeMemoryManagementEnabled(false);
+  }
 
   @After
   public void tearDown() throws Exception {
+    CommonDescriptor.getInstance()
+        .getConfig()
+        .setPipeMemoryManagementEnabled(isPipeMemoryManagementEnabled);
     if (alignedTsFile != null) {
       alignedTsFile.delete();
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 4b754e0b135..dcf5e36ea58 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -218,7 +218,9 @@ public class CommonConfig {
   private boolean pipeRetryLocallyForParallelOrUserConflict = true;
 
   private int pipeDataStructureTabletRowSize = 2048;
-  private int pipeDataStructureTabletSizeInBytes = 2097152;
+
+  // 60MB
+  private int pipeDataStructureTabletSizeInBytes = 60 * 1024 * 1024;
   private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 
0.3;
   private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 
0.3;
   private volatile double pipeTotalFloatingMemoryProportion = 0.5;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 8140348676a..e2484576a77 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -146,11 +146,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
         } catch (Exception e) {
           PipeLogger.log(
               LOGGER::warn,
+              e,
               "Receiver id = %s: Failed to delete original receiver file dir 
%s, because %s.",
               receiverId.get(),
               receiverFileDirWithIdSuffix.get().getPath(),
-              e.getMessage(),
-              e);
+              e.getMessage());
         }
       } else {
         if (LOGGER.isDebugEnabled()) {
@@ -184,9 +184,9 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       } catch (Exception e) {
         PipeLogger.log(
             LOGGER::warn,
+            e,
             "Receiver id = %s: Failed to create pipe receiver file folder 
because all disks of folders are full.",
-            receiverId.get(),
-            e);
+            receiverId.get());
         return new 
TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT));
       }
 
@@ -535,11 +535,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       } catch (final Exception e) {
         PipeLogger.log(
             LOGGER::warn,
+            e,
             "Receiver id = %s: Failed to close current writing file writer %s, 
because %s.",
             receiverId.get(),
             writingFile == null ? "null" : writingFile.getPath(),
-            e.getMessage(),
-            e);
+            e.getMessage());
       }
       writingFileWriter = null;
     } else {
@@ -574,11 +574,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       } catch (final Exception e) {
         PipeLogger.log(
             LOGGER::warn,
+            e,
             "Receiver id = %s: Failed to delete original writing file %s, 
because %s.",
             receiverId.get(),
             file.getPath(),
-            e.getMessage(),
-            e);
+            e.getMessage());
       }
     } else {
       if (LOGGER.isDebugEnabled()) {
@@ -659,11 +659,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     } catch (final Exception e) {
       PipeLogger.log(
           LOGGER::warn,
+          e,
           "Receiver id = %s: Failed to seal file %s from req %s.",
           receiverId.get(),
           writingFile,
-          req,
-          e);
+          req);
       return new TPipeTransferResp(
           RpcUtils.getStatus(
               TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
@@ -756,11 +756,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     } catch (final Exception e) {
       PipeLogger.log(
           LOGGER::warn,
+          e,
           "Receiver id = %s: Failed to seal file %s from req %s.",
           receiverId.get(),
           files,
-          req,
-          e);
+          req);
       return new TPipeTransferResp(
           RpcUtils.getStatus(
               TSStatusCode.PIPE_TRANSFER_FILE_ERROR,

Reply via email to