This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e6f40694ea1 fix off-heap memory spike issue (#17489)
e6f40694ea1 is described below

commit e6f40694ea16dba517c1b8efab52467a7f5a8c19
Author: Hongkun Xu <[email protected]>
AuthorDate: Tue Jan 13 03:10:36 2026 +0800

    fix off-heap memory spike issue (#17489)
    
    Signed-off-by: Hongkun Xu <[email protected]>
---
 .../segment/SizeBasedSegmentFlushThresholdComputer.java      | 12 ++++++------
 .../data/manager/realtime/RealtimeSegmentDataManager.java    |  8 ++++----
 2 files changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SizeBasedSegmentFlushThresholdComputer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SizeBasedSegmentFlushThresholdComputer.java
index 6b3be0b49ff..b4f3ef6dc93 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SizeBasedSegmentFlushThresholdComputer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SizeBasedSegmentFlushThresholdComputer.java
@@ -44,7 +44,7 @@ class SizeBasedSegmentFlushThresholdComputer {
   private final Clock _clock;
 
   private long _timeConsumedForLastSegment;
-  private int _rowsConsumedForLastSegment;
+  private int _rowsIndexedForLastSegment;
   private long _sizeForLastSegment;
   private int _rowsThresholdForLastSegment;
   private double _segmentRowsToSizeRatio;
@@ -115,7 +115,7 @@ class SizeBasedSegmentFlushThresholdComputer {
 
     // Store values using the actual rows consumed for threshold calculations
     _timeConsumedForLastSegment = timeConsumed;
-    _rowsConsumedForLastSegment = (int) rowsForCalculation;
+    _rowsIndexedForLastSegment = (int) rowsForCalculation;
     _sizeForLastSegment = sizeForCalculation;
     _rowsThresholdForLastSegment = rowsThreshold;
 
@@ -171,9 +171,9 @@ class SizeBasedSegmentFlushThresholdComputer {
     // .getSizeThresholdToFlushSegment(),
     // we might end up using a lot more memory than required for the segment 
Using a minor bump strategy, until
     // we add feature to adjust time We will only slightly bump the threshold 
based on numRowsConsumed
-    if (_rowsConsumedForLastSegment < _rowsThresholdForLastSegment && 
_sizeForLastSegment < desiredSegmentSizeBytes) {
+    if (_rowsIndexedForLastSegment < _rowsThresholdForLastSegment && 
_sizeForLastSegment < desiredSegmentSizeBytes) {
       long timeThresholdMs = streamConfig.getFlushThresholdTimeMillis();
-      long rowsConsumed = _rowsConsumedForLastSegment;
+      long rowsConsumed = _rowsIndexedForLastSegment;
       StringBuilder logStringBuilder = new StringBuilder().append("Time 
threshold reached. ");
       if (timeThresholdMs < _timeConsumedForLastSegment) {
         // The administrator has reduced the time threshold. Adjust the
@@ -195,9 +195,9 @@ class SizeBasedSegmentFlushThresholdComputer {
     double optimalSegmentSizeBytesMax = desiredSegmentSizeBytes * 1.5;
     long targetRows;
     if (_sizeForLastSegment < optimalSegmentSizeBytesMin) {
-      targetRows = (long) (_rowsConsumedForLastSegment * 1.5);
+      targetRows = (long) (_rowsIndexedForLastSegment * 1.5);
     } else if (_sizeForLastSegment > optimalSegmentSizeBytesMax) {
-      targetRows = _rowsConsumedForLastSegment / 2;
+      targetRows = _rowsIndexedForLastSegment / 2;
     } else {
       targetRows = (long) (desiredSegmentSizeBytes * _segmentRowsToSizeRatio);
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index c8bff45ad13..788c1b14bc5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1009,7 +1009,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private boolean startSegmentCommit() {
     SegmentCompletionProtocol.Request.Params params = new 
SegmentCompletionProtocol.Request.Params();
     
params.withSegmentName(_segmentNameStr).withStreamPartitionMsgOffset(_currentOffset.toString())
-        
.withNumRows(_numRowsConsumed).withInstanceId(_instanceId).withReason(_stopReason);
+        
.withNumRows(_numRowsIndexed).withInstanceId(_instanceId).withReason(_stopReason);
     if (_isOffHeap) {
       params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
     }
@@ -1295,7 +1295,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     SegmentCompletionProtocol.Request.Params params = new 
SegmentCompletionProtocol.Request.Params();
 
     
params.withSegmentName(_segmentNameStr).withStreamPartitionMsgOffset(_currentOffset.toString())
-        .withNumRows(_numRowsConsumed).withInstanceId(_instanceId);
+        .withNumRows(_numRowsIndexed).withInstanceId(_instanceId);
     if (_isOffHeap) {
       params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
     }
@@ -1308,7 +1308,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     SegmentCompletionProtocol.Request.Params params = new 
SegmentCompletionProtocol.Request.Params();
 
     
params.withSegmentName(_segmentNameStr).withStreamPartitionMsgOffset(_currentOffset.toString())
-        
.withNumRows(_numRowsConsumed).withInstanceId(_instanceId).withReason(_stopReason)
+        
.withNumRows(_numRowsIndexed).withInstanceId(_instanceId).withReason(_stopReason)
         .withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
         .withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes())
         .withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis());
@@ -1447,7 +1447,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     // Retry maybe once if leader is not found.
     SegmentCompletionProtocol.Request.Params params = new 
SegmentCompletionProtocol.Request.Params();
     
params.withStreamPartitionMsgOffset(_currentOffset.toString()).withSegmentName(_segmentNameStr)
-        
.withReason(_stopReason).withNumRows(_numRowsConsumed).withInstanceId(_instanceId);
+        
.withReason(_stopReason).withNumRows(_numRowsIndexed).withInstanceId(_instanceId);
     if (_isOffHeap) {
       params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to