This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 1f1d2c182c2 [To dev/1.3] Pipe: Fixed the OOM bug of tablet memory
calculation & Optimized the tablet size by memory estimation (#17451)
1f1d2c182c2 is described below
commit 1f1d2c182c2a8ee1f0ae5e5c969785043eacdf08
Author: Caideyipi <[email protected]>
AuthorDate: Fri Apr 10 14:13:35 2026 +0800
[To dev/1.3] Pipe: Fixed the OOM bug of tablet memory calculation &
Optimized the tablet size by memory estimation (#17451)
* fix
* fix
* push
* ger-limit
* fix
* [To dev/1.3] Pipe: Fixed the OOM bug of tablet memory calculation &
Optimized the tablet size by memory estimation
* fix
* fix
* fix
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +++
.../container/TsFileInsertionDataContainer.java | 4 +-
.../scan/TsFileInsertionScanDataContainer.java | 3 +-
.../pipe/resource/memory/PipeMemoryWeightUtil.java | 92 ++++++----------------
.../event/TsFileInsertionDataContainerTest.java | 13 +++
.../apache/iotdb/commons/conf/CommonConfig.java | 4 +-
.../commons/pipe/receiver/IoTDBFileReceiver.java | 20 ++---
7 files changed, 65 insertions(+), 81 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b2655e350a2..989dc757464 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.ConsensusFactory;
@@ -3388,6 +3389,15 @@ public class IoTDBConfig {
this.partitionCacheSize = partitionCacheSize;
}
+ public int getPipeDataStructureTabletSizeInBytes() {
+ int size =
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes();
+ if (size > thriftMaxFrameSize) {
+ size = (int) (thriftMaxFrameSize * 0.8);
+
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(size);
+ }
+ return size;
+ }
+
public int getAuthorCacheSize() {
return authorCacheSize;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
index cbbfea0a5b2..279996690ff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
@@ -21,9 +21,9 @@ package
org.apache.iotdb.db.pipe.event.common.tsfile.container;
import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
@@ -92,7 +92,7 @@ public abstract class TsFileInsertionDataContainer implements
AutoCloseable {
this.allocatedMemoryBlockForTablet =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(
-
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
+
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index ce2c2b8e467..271bc317d0f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
@@ -124,7 +125,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
this.allocatedMemoryBlockForBatchData =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(
-
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
+
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
this.allocatedMemoryBlockForChunk =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index c9b21d780ee..a707f554c51 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.resource.memory;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
import org.apache.iotdb.db.utils.MemUtils;
@@ -33,10 +34,10 @@ import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.record.Tablet;
-import org.apache.tsfile.write.schema.MeasurementSchema;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
public class PipeMemoryWeightUtil {
@@ -107,7 +108,10 @@ public class PipeMemoryWeightUtil {
}
}
- return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes,
schemaCount);
+ return calculateTabletRowCountAndMemoryBySize(
+ totalSizeInBytes,
+ schemaCount,
+ PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
}
/**
@@ -152,7 +156,8 @@ public class PipeMemoryWeightUtil {
}
}
- return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes,
schemaCount);
+ return calculateTabletRowCountAndMemoryBySize(
+ totalSizeInBytes, schemaCount, batchData.length());
}
/**
@@ -162,22 +167,28 @@ public class PipeMemoryWeightUtil {
* @return left is the row count of tablet, right is the memory cost of
tablet in bytes
*/
public static Pair<Integer, Integer>
calculateTabletRowCountAndMemory(PipeRow row) {
- return calculateTabletRowCountAndMemoryBySize(row.getCurrentRowSize(),
row.size());
+ return calculateTabletRowCountAndMemoryBySize(
+ row.getCurrentRowSize(),
+ row.size(),
+ PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
}
private static Pair<Integer, Integer> calculateTabletRowCountAndMemoryBySize(
- int rowSize, int schemaCount) {
- if (rowSize <= 0) {
+ int rowBytesUsed, int schemaCount, int inputNum) {
+ if (rowBytesUsed <= 0) {
return new Pair<>(1, 0);
}
// Calculate row number according to the max size of a pipe tablet.
// "-100" is the estimated size of other data structures in a pipe tablet.
// "*8" converts bytes to bits, because the bitmap size is 1 bit per
schema.
- int rowNumber =
- 8
- *
(PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes() - 100)
- / (8 * rowSize + schemaCount);
+ // Here we estimate the max use of
+ int sizeLimit =
+ Math.min(
+
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
+ (int) (inputNum * rowBytesUsed * 1.2));
+
+ int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount);
rowNumber = Math.max(1, rowNumber);
if ( // This means the row number is larger than the max row count of a
pipe tablet
@@ -185,67 +196,14 @@ public class PipeMemoryWeightUtil {
// Bound the row number, the memory cost is rowSize * rowNumber
return new Pair<>(
PipeConfig.getInstance().getPipeDataStructureTabletRowSize(),
- rowSize *
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
+ rowBytesUsed *
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
} else {
- return new Pair<>(
- rowNumber,
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
+ return new Pair<>(rowNumber, sizeLimit);
}
}
- public static long calculateTabletSizeInBytes(Tablet tablet) {
- long totalSizeInBytes = 0;
-
- if (tablet == null) {
- return totalSizeInBytes;
- }
-
- // timestamps
- if (tablet.timestamps != null) {
- totalSizeInBytes += tablet.timestamps.length * 8L;
- }
-
- // values
- final List<MeasurementSchema> timeseries = tablet.getSchemas();
- if (timeseries != null) {
- for (int column = 0; column < timeseries.size(); column++) {
- final MeasurementSchema measurementSchema = timeseries.get(column);
- if (measurementSchema == null) {
- continue;
- }
-
- final TSDataType tsDataType = measurementSchema.getType();
- if (tsDataType == null) {
- continue;
- }
-
- if (tsDataType.isBinary()) {
- if (tablet.values == null || tablet.values.length <= column) {
- continue;
- }
- final Binary[] values = ((Binary[]) tablet.values[column]);
- if (values == null) {
- continue;
- }
- for (Binary value : values) {
- totalSizeInBytes += value == null ? 8 : value.ramBytesUsed();
- }
- } else {
- totalSizeInBytes += (long) tablet.getMaxRowNumber() *
tsDataType.getDataTypeSize();
- }
- }
- }
-
- // bitMaps
- if (tablet.bitMaps != null) {
- for (int i = 0; i < tablet.bitMaps.length; i++) {
- totalSizeInBytes += tablet.bitMaps[i] == null ? 0 :
tablet.bitMaps[i].getSize();
- }
- }
-
- // estimate other dataStructures size
- totalSizeInBytes += 100;
-
- return totalSizeInBytes;
+ public static long calculateTabletSizeInBytes(final Tablet tablet) {
+ return Objects.nonNull(tablet) ? tablet.ramBytesUsed() : 0L;
}
public static int calculateBatchDataRamBytesUsed(BatchData batchData) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index 2674e8b8955..bd4e3923815 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.pipe.event;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
@@ -47,6 +49,7 @@ import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,9 +81,19 @@ public class TsFileInsertionDataContainerTest {
private File alignedTsFile;
private File nonalignedTsFile;
private TsFileResource resource;
+ private boolean isPipeMemoryManagementEnabled;
+
+ @Before
+ public void setUp() throws Exception {
+ isPipeMemoryManagementEnabled =
PipeConfig.getInstance().getPipeMemoryManagementEnabled();
+
CommonDescriptor.getInstance().getConfig().setPipeMemoryManagementEnabled(false);
+ }
@After
public void tearDown() throws Exception {
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMemoryManagementEnabled(isPipeMemoryManagementEnabled);
if (alignedTsFile != null) {
alignedTsFile.delete();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 44fd17c3cfe..f8e845a7af2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -214,7 +214,9 @@ public class CommonConfig {
private boolean pipeRetryLocallyForParallelOrUserConflict = true;
private int pipeDataStructureTabletRowSize = 2048;
- private int pipeDataStructureTabletSizeInBytes = 2097152;
+
+ // 128MB
+ private int pipeDataStructureTabletSizeInBytes = 60 * 1024 * 1024;
private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold =
0.3;
private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold =
0.3;
private volatile double pipeTotalFloatingMemoryProportion = 0.5;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 65283916939..82b85e3f828 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -141,11 +141,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
} catch (Exception e) {
PipeLogger.log(
LOGGER::warn,
+ e,
"Receiver id = %s: Failed to delete original receiver file dir
%s, because %s.",
receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath(),
- e.getMessage(),
- e);
+ e.getMessage());
}
} else {
if (LOGGER.isDebugEnabled()) {
@@ -487,11 +487,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn,
+ e,
"Receiver id = %s: Failed to close current writing file writer %s,
because %s.",
receiverId.get(),
writingFile == null ? "null" : writingFile.getPath(),
- e.getMessage(),
- e);
+ e.getMessage());
}
writingFileWriter = null;
} else {
@@ -526,11 +526,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn,
+ e,
"Receiver id = %s: Failed to delete original writing file %s,
because %s.",
receiverId.get(),
file.getPath(),
- e.getMessage(),
- e);
+ e.getMessage());
}
} else {
if (LOGGER.isDebugEnabled()) {
@@ -611,11 +611,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn,
+ e,
"Receiver id = %s: Failed to seal file %s from req %s.",
receiverId.get(),
writingFile,
- req,
- e);
+ req);
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
@@ -698,11 +698,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn,
+ e,
"Receiver id = %s: Failed to seal file %s from req %s.",
receiverId.get(),
files,
- req,
- e);
+ req);
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,