This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e6f40694ea1 fix off-heap memory spike issue (#17489)
e6f40694ea1 is described below
commit e6f40694ea16dba517c1b8efab52467a7f5a8c19
Author: Hongkun Xu <[email protected]>
AuthorDate: Tue Jan 13 03:10:36 2026 +0800
fix off-heap memory spike issue (#17489)
Signed-off-by: Hongkun Xu <[email protected]>
---
.../segment/SizeBasedSegmentFlushThresholdComputer.java | 12 ++++++------
.../data/manager/realtime/RealtimeSegmentDataManager.java | 8 ++++----
2 files changed, 10 insertions(+), 10 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SizeBasedSegmentFlushThresholdComputer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SizeBasedSegmentFlushThresholdComputer.java
index 6b3be0b49ff..b4f3ef6dc93 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SizeBasedSegmentFlushThresholdComputer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SizeBasedSegmentFlushThresholdComputer.java
@@ -44,7 +44,7 @@ class SizeBasedSegmentFlushThresholdComputer {
private final Clock _clock;
private long _timeConsumedForLastSegment;
- private int _rowsConsumedForLastSegment;
+ private int _rowsIndexedForLastSegment;
private long _sizeForLastSegment;
private int _rowsThresholdForLastSegment;
private double _segmentRowsToSizeRatio;
@@ -115,7 +115,7 @@ class SizeBasedSegmentFlushThresholdComputer {
// Store values using the actual rows consumed for threshold calculations
_timeConsumedForLastSegment = timeConsumed;
- _rowsConsumedForLastSegment = (int) rowsForCalculation;
+ _rowsIndexedForLastSegment = (int) rowsForCalculation;
_sizeForLastSegment = sizeForCalculation;
_rowsThresholdForLastSegment = rowsThreshold;
@@ -171,9 +171,9 @@ class SizeBasedSegmentFlushThresholdComputer {
// .getSizeThresholdToFlushSegment(),
// we might end up using a lot more memory than required for the segment
Using a minor bump strategy, until
// we add feature to adjust time We will only slightly bump the threshold
based on numRowsConsumed
- if (_rowsConsumedForLastSegment < _rowsThresholdForLastSegment &&
_sizeForLastSegment < desiredSegmentSizeBytes) {
+ if (_rowsIndexedForLastSegment < _rowsThresholdForLastSegment &&
_sizeForLastSegment < desiredSegmentSizeBytes) {
long timeThresholdMs = streamConfig.getFlushThresholdTimeMillis();
- long rowsConsumed = _rowsConsumedForLastSegment;
+ long rowsConsumed = _rowsIndexedForLastSegment;
StringBuilder logStringBuilder = new StringBuilder().append("Time
threshold reached. ");
if (timeThresholdMs < _timeConsumedForLastSegment) {
// The administrator has reduced the time threshold. Adjust the
@@ -195,9 +195,9 @@ class SizeBasedSegmentFlushThresholdComputer {
double optimalSegmentSizeBytesMax = desiredSegmentSizeBytes * 1.5;
long targetRows;
if (_sizeForLastSegment < optimalSegmentSizeBytesMin) {
- targetRows = (long) (_rowsConsumedForLastSegment * 1.5);
+ targetRows = (long) (_rowsIndexedForLastSegment * 1.5);
} else if (_sizeForLastSegment > optimalSegmentSizeBytesMax) {
- targetRows = _rowsConsumedForLastSegment / 2;
+ targetRows = _rowsIndexedForLastSegment / 2;
} else {
targetRows = (long) (desiredSegmentSizeBytes * _segmentRowsToSizeRatio);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index c8bff45ad13..788c1b14bc5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1009,7 +1009,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
private boolean startSegmentCommit() {
SegmentCompletionProtocol.Request.Params params = new
SegmentCompletionProtocol.Request.Params();
params.withSegmentName(_segmentNameStr).withStreamPartitionMsgOffset(_currentOffset.toString())
-
.withNumRows(_numRowsConsumed).withInstanceId(_instanceId).withReason(_stopReason);
+
.withNumRows(_numRowsIndexed).withInstanceId(_instanceId).withReason(_stopReason);
if (_isOffHeap) {
params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
}
@@ -1295,7 +1295,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
SegmentCompletionProtocol.Request.Params params = new
SegmentCompletionProtocol.Request.Params();
params.withSegmentName(_segmentNameStr).withStreamPartitionMsgOffset(_currentOffset.toString())
- .withNumRows(_numRowsConsumed).withInstanceId(_instanceId);
+ .withNumRows(_numRowsIndexed).withInstanceId(_instanceId);
if (_isOffHeap) {
params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
}
@@ -1308,7 +1308,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
SegmentCompletionProtocol.Request.Params params = new
SegmentCompletionProtocol.Request.Params();
params.withSegmentName(_segmentNameStr).withStreamPartitionMsgOffset(_currentOffset.toString())
-
.withNumRows(_numRowsConsumed).withInstanceId(_instanceId).withReason(_stopReason)
+
.withNumRows(_numRowsIndexed).withInstanceId(_instanceId).withReason(_stopReason)
.withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
.withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes())
.withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis());
@@ -1447,7 +1447,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
// Retry maybe once if leader is not found.
SegmentCompletionProtocol.Request.Params params = new
SegmentCompletionProtocol.Request.Params();
params.withStreamPartitionMsgOffset(_currentOffset.toString()).withSegmentName(_segmentNameStr)
-
.withReason(_stopReason).withNumRows(_numRowsConsumed).withInstanceId(_instanceId);
+
.withReason(_stopReason).withNumRows(_numRowsIndexed).withInstanceId(_instanceId);
if (_isOffHeap) {
params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]