This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3e4661bef60 Pipe: Optimized the tablet size by memory estimation
(#17452)
3e4661bef60 is described below
commit 3e4661bef60f0a43acdd63aaabe2ddc955f9581e
Author: Caideyipi <[email protected]>
AuthorDate: Fri Apr 10 14:13:59 2026 +0800
Pipe: Optimized the tablet size by memory estimation (#17452)
* fix
* push
* ger-limit
* fix
* fix
* fix
* sptls
* Update CommonConfig.java
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +++++++
.../tsfile/parser/TsFileInsertionEventParser.java | 4 +--
.../scan/TsFileInsertionEventScanParser.java | 3 +-
.../table/TsFileInsertionEventTableParser.java | 6 ++--
.../pipe/resource/memory/PipeMemoryWeightUtil.java | 34 ++++++++++++++--------
.../pipe/event/TsFileInsertionEventParserTest.java | 13 +++++++++
.../apache/iotdb/commons/conf/CommonConfig.java | 4 ++-
.../commons/pipe/receiver/IoTDBFileReceiver.java | 24 +++++++--------
8 files changed, 68 insertions(+), 30 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 69eab2d79cf..31f446df270 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProper
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.enums.ReadConsistencyLevel;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.exception.LoadConfigurationException;
@@ -3287,6 +3288,15 @@ public class IoTDBConfig {
this.partitionCacheSize = partitionCacheSize;
}
+ public int getPipeDataStructureTabletSizeInBytes() {
+ int size =
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes();
+ if (size > thriftMaxFrameSize) {
+ size = (int) (thriftMaxFrameSize * 0.8);
+
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(size);
+ }
+ return size;
+ }
+
public int getAuthorCacheSize() {
return authorCacheSize;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
index 82f3e0ea2b8..678e7a4a62a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
@@ -22,9 +22,9 @@ package org.apache.iotdb.db.pipe.event.common.tsfile.parser;
import org.apache.iotdb.commons.audit.IAuditEntity;
import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -106,7 +106,7 @@ public abstract class TsFileInsertionEventParser implements
AutoCloseable {
this.allocatedMemoryBlockForTablet =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(
-
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
+
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 2bd22c8b0bc..da9d7d00477 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
@@ -135,7 +136,7 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
this.allocatedMemoryBlockForBatchData =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(
-
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
+
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
this.allocatedMemoryBlockForChunk =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index e353163e726..af2fd214e4a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -93,7 +93,7 @@ public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser
.forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed());
long tableSize =
Math.min(
- PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes(),
+
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize());
this.allocatedMemoryBlockForChunk =
@@ -107,7 +107,9 @@ public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser
this.allocatedMemoryBlockForTableSchemas =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(
-
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getPipeDataStructureTabletSizeInBytes());
this.startTime = startTime;
this.endTime = endTime;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index 458434d8494..833bd3577eb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.resource.memory;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
import org.apache.iotdb.db.utils.MemUtils;
@@ -118,7 +119,10 @@ public class PipeMemoryWeightUtil {
}
}
- return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes,
schemaCount);
+ return calculateTabletRowCountAndMemoryBySize(
+ totalSizeInBytes,
+ schemaCount,
+ PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
}
/**
@@ -163,7 +167,8 @@ public class PipeMemoryWeightUtil {
}
}
- return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes,
schemaCount);
+ return calculateTabletRowCountAndMemoryBySize(
+ totalSizeInBytes, schemaCount, batchData.length());
}
/**
@@ -173,22 +178,28 @@ public class PipeMemoryWeightUtil {
* @return left is the row count of tablet, right is the memory cost of
tablet in bytes
*/
public static Pair<Integer, Integer>
calculateTabletRowCountAndMemory(PipeRow row) {
- return calculateTabletRowCountAndMemoryBySize(row.getCurrentRowSize(),
row.size());
+ return calculateTabletRowCountAndMemoryBySize(
+ row.getCurrentRowSize(),
+ row.size(),
+ PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
}
private static Pair<Integer, Integer> calculateTabletRowCountAndMemoryBySize(
- int rowSize, int schemaCount) {
- if (rowSize <= 0) {
+ int rowBytesUsed, int schemaCount, int inputNum) {
+ if (rowBytesUsed <= 0) {
return new Pair<>(1, 0);
}
// Calculate row number according to the max size of a pipe tablet.
// "-100" is the estimated size of other data structures in a pipe tablet.
// "*8" converts bytes to bits, because the bitmap size is 1 bit per
schema.
- int rowNumber =
- 8
- *
(PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes() - 100)
- / (8 * rowSize + schemaCount);
+ // Here we estimate the max use of
+ int sizeLimit =
+ Math.min(
+
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
+ (int) (inputNum * rowBytesUsed * 1.2));
+
+ int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount);
rowNumber = Math.max(1, rowNumber);
if ( // This means the row number is larger than the max row count of a
pipe tablet
@@ -196,10 +207,9 @@ public class PipeMemoryWeightUtil {
// Bound the row number, the memory cost is rowSize * rowNumber
return new Pair<>(
PipeConfig.getInstance().getPipeDataStructureTabletRowSize(),
- rowSize *
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
+ rowBytesUsed *
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
} else {
- return new Pair<>(
- rowNumber,
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
+ return new Pair<>(rowNumber, sizeLimit);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index 8814190755e..a2e7c558ea0 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.pipe.event;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixTreePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
@@ -48,6 +50,7 @@ import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,9 +82,19 @@ public class TsFileInsertionEventParserTest {
private File alignedTsFile;
private File nonalignedTsFile;
private TsFileResource resource;
+ private boolean isPipeMemoryManagementEnabled;
+
+ @Before
+ public void setUp() throws Exception {
+ isPipeMemoryManagementEnabled =
PipeConfig.getInstance().getPipeMemoryManagementEnabled();
+
CommonDescriptor.getInstance().getConfig().setPipeMemoryManagementEnabled(false);
+ }
@After
public void tearDown() throws Exception {
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMemoryManagementEnabled(isPipeMemoryManagementEnabled);
if (alignedTsFile != null) {
alignedTsFile.delete();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 4b754e0b135..dcf5e36ea58 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -218,7 +218,9 @@ public class CommonConfig {
private boolean pipeRetryLocallyForParallelOrUserConflict = true;
private int pipeDataStructureTabletRowSize = 2048;
- private int pipeDataStructureTabletSizeInBytes = 2097152;
+
+ // 60MB
+ private int pipeDataStructureTabletSizeInBytes = 60 * 1024 * 1024;
private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold =
0.3;
private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold =
0.3;
private volatile double pipeTotalFloatingMemoryProportion = 0.5;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 8140348676a..e2484576a77 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -146,11 +146,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
} catch (Exception e) {
PipeLogger.log(
LOGGER::warn,
+ e,
"Receiver id = %s: Failed to delete original receiver file dir
%s, because %s.",
receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath(),
- e.getMessage(),
- e);
+ e.getMessage());
}
} else {
if (LOGGER.isDebugEnabled()) {
@@ -184,9 +184,9 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
} catch (Exception e) {
PipeLogger.log(
LOGGER::warn,
+ e,
"Receiver id = %s: Failed to create pipe receiver file folder
because all disks of folders are full.",
- receiverId.get(),
- e);
+ receiverId.get());
return new
TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT));
}
@@ -535,11 +535,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn,
+ e,
"Receiver id = %s: Failed to close current writing file writer %s,
because %s.",
receiverId.get(),
writingFile == null ? "null" : writingFile.getPath(),
- e.getMessage(),
- e);
+ e.getMessage());
}
writingFileWriter = null;
} else {
@@ -574,11 +574,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn,
+ e,
"Receiver id = %s: Failed to delete original writing file %s,
because %s.",
receiverId.get(),
file.getPath(),
- e.getMessage(),
- e);
+ e.getMessage());
}
} else {
if (LOGGER.isDebugEnabled()) {
@@ -659,11 +659,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn,
+ e,
"Receiver id = %s: Failed to seal file %s from req %s.",
receiverId.get(),
writingFile,
- req,
- e);
+ req);
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
@@ -756,11 +756,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn,
+ e,
"Receiver id = %s: Failed to seal file %s from req %s.",
receiverId.get(),
files,
- req,
- e);
+ req);
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,