This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 014cdd6728d change consuming segment offset and age from int to long 
(#16614)
014cdd6728d is described below

commit 014cdd6728d7d073564e35d75e67565c234a420c
Author: Jhow <[email protected]>
AuthorDate: Fri Aug 15 14:47:34 2025 -0700

    change consuming segment offset and age from int to long (#16614)
---
 .../core/rebalance/RebalanceSummaryResult.java     | 18 +++++------
 .../helix/core/rebalance/TableRebalancer.java      | 36 +++++++++++-----------
 .../rebalance/tenant/TenantRebalanceResult.java    | 16 +++++-----
 .../TableRebalancerClusterStatelessTest.java       |  2 +-
 ...mingSegmentToBeMovedSummaryIntegrationTest.java |  2 +-
 5 files changed, 37 insertions(+), 37 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
index 6abcf61b1cf..c4e58beafd5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
@@ -344,8 +344,8 @@ public class RebalanceSummaryResult {
   public static class ConsumingSegmentToBeMovedSummary {
     private final int _numConsumingSegmentsToBeMoved;
     private final int _numServersGettingConsumingSegmentsAdded;
-    private final Map<String, Integer> 
_consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
-    private final Map<String, Integer> 
_consumingSegmentsToBeMovedWithOldestAgeInMinutes;
+    private final Map<String, Long> 
_consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
+    private final Map<String, Long> 
_consumingSegmentsToBeMovedWithOldestAgeInMinutes;
     private final Map<String, ConsumingSegmentSummaryPerServer> 
_serverConsumingSegmentSummary;
 
     /**
@@ -381,9 +381,9 @@ public class RebalanceSummaryResult {
         @JsonProperty("numConsumingSegmentsToBeMoved") int 
numConsumingSegmentsToBeMoved,
         @JsonProperty("numServersGettingConsumingSegmentsAdded") int 
numServersGettingConsumingSegmentsAdded,
         @JsonProperty("consumingSegmentsToBeMovedWithMostOffsetsToCatchUp") 
@Nullable
-        Map<String, Integer> 
consumingSegmentsToBeMovedWithMostOffsetsToCatchUp,
+        Map<String, Long> consumingSegmentsToBeMovedWithMostOffsetsToCatchUp,
         @JsonProperty("consumingSegmentsToBeMovedWithOldestAgeInMinutes") 
@Nullable
-        Map<String, Integer> consumingSegmentsToBeMovedWithOldestAgeInMinutes,
+        Map<String, Long> consumingSegmentsToBeMovedWithOldestAgeInMinutes,
         @JsonProperty("serverConsumingSegmentSummary") @Nullable
         Map<String, ConsumingSegmentSummaryPerServer> 
serverConsumingSegmentSummary) {
       _numConsumingSegmentsToBeMoved = numConsumingSegmentsToBeMoved;
@@ -404,12 +404,12 @@ public class RebalanceSummaryResult {
     }
 
     @JsonProperty
-    public Map<String, Integer> 
getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp() {
+    public Map<String, Long> 
getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp() {
       return _consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
     }
 
     @JsonProperty
-    public Map<String, Integer> 
getConsumingSegmentsToBeMovedWithOldestAgeInMinutes() {
+    public Map<String, Long> 
getConsumingSegmentsToBeMovedWithOldestAgeInMinutes() {
       return _consumingSegmentsToBeMovedWithOldestAgeInMinutes;
     }
 
@@ -420,7 +420,7 @@ public class RebalanceSummaryResult {
 
     public static class ConsumingSegmentSummaryPerServer {
       protected int _numConsumingSegmentsToBeAdded;
-      protected int _totalOffsetsToCatchUpAcrossAllConsumingSegments;
+      protected long _totalOffsetsToCatchUpAcrossAllConsumingSegments;
 
       /**
        * Constructor for ConsumingSegmentSummaryPerServer
@@ -437,7 +437,7 @@ public class RebalanceSummaryResult {
       public ConsumingSegmentSummaryPerServer(
           @JsonProperty("numConsumingSegmentsToBeAdded") int 
numConsumingSegmentsToBeAdded,
           @JsonProperty("totalOffsetsToCatchUpAcrossAllConsumingSegments")
-          int totalOffsetsToCatchUpAcrossAllConsumingSegments) {
+          long totalOffsetsToCatchUpAcrossAllConsumingSegments) {
         _numConsumingSegmentsToBeAdded = numConsumingSegmentsToBeAdded;
         _totalOffsetsToCatchUpAcrossAllConsumingSegments = 
totalOffsetsToCatchUpAcrossAllConsumingSegments;
       }
@@ -448,7 +448,7 @@ public class RebalanceSummaryResult {
       }
 
       @JsonProperty
-      public int getTotalOffsetsToCatchUpAcrossAllConsumingSegments() {
+      public long getTotalOffsetsToCatchUpAcrossAllConsumingSegments() {
         return _totalOffsetsToCatchUpAcrossAllConsumingSegments;
       }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 24854b8bb75..014327e68a9 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -1006,20 +1006,20 @@ public class TableRebalancer {
     Map<String, SegmentZKMetadata> consumingSegmentZKMetadata = new 
HashMap<>();
     uniqueConsumingSegments.forEach(segment -> 
consumingSegmentZKMetadata.put(segment,
         
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
tableNameWithType, segment)));
-    Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+    Map<String, Long> consumingSegmentsOffsetsToCatchUp =
         getConsumingSegmentsOffsetsToCatchUp(tableConfig, 
consumingSegmentZKMetadata, tableRebalanceLogger);
-    Map<String, Integer> consumingSegmentsAge =
+    Map<String, Long> consumingSegmentsAge =
         getConsumingSegmentsAge(consumingSegmentZKMetadata, 
tableRebalanceLogger);
 
-    Map<String, Integer> consumingSegmentsOffsetsToCatchUpTopN;
+    Map<String, Long> consumingSegmentsOffsetsToCatchUpTopN;
     Map<String, 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
         consumingSegmentSummaryPerServer = new HashMap<>();
     if (consumingSegmentsOffsetsToCatchUp != null) {
       consumingSegmentsOffsetsToCatchUpTopN =
           getTopNConsumingSegmentWithValue(consumingSegmentsOffsetsToCatchUp, 
TOP_N_IN_CONSUMING_SEGMENT_SUMMARY);
       newServersToConsumingSegmentMap.forEach((server, segments) -> {
-        int totalOffsetsToCatchUp =
-            
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+        long totalOffsetsToCatchUp =
+            
segments.stream().mapToLong(consumingSegmentsOffsetsToCatchUp::get).sum();
         consumingSegmentSummaryPerServer.put(server,
             new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
                 segments.size(), totalOffsetsToCatchUp));
@@ -1033,7 +1033,7 @@ public class TableRebalancer {
       });
     }
 
-    Map<String, Integer> consumingSegmentsOldestTopN =
+    Map<String, Long> consumingSegmentsOldestTopN =
         consumingSegmentsAge == null ? null
             : getTopNConsumingSegmentWithValue(consumingSegmentsAge, 
TOP_N_IN_CONSUMING_SEGMENT_SUMMARY);
 
@@ -1042,9 +1042,9 @@ public class TableRebalancer {
         consumingSegmentSummaryPerServer);
   }
 
-  private static Map<String, Integer> getTopNConsumingSegmentWithValue(
-      Map<String, Integer> consumingSegmentsWithValue, @Nullable Integer topN) 
{
-    Map<String, Integer> topNConsumingSegments = new LinkedHashMap<>();
+  private static Map<String, Long> getTopNConsumingSegmentWithValue(
+      Map<String, Long> consumingSegmentsWithValue, @Nullable Integer topN) {
+    Map<String, Long> topNConsumingSegments = new LinkedHashMap<>();
     consumingSegmentsWithValue.entrySet()
         .stream()
         .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
@@ -1061,9 +1061,9 @@ public class TableRebalancer {
    * segment name to the age of that consuming segment. Return null if failed 
to obtain info for any consuming segment.
    */
   @Nullable
-  private Map<String, Integer> getConsumingSegmentsAge(Map<String, 
SegmentZKMetadata> consumingSegmentZKMetadata,
+  private Map<String, Long> getConsumingSegmentsAge(Map<String, 
SegmentZKMetadata> consumingSegmentZKMetadata,
       Logger tableRebalanceLogger) {
-    Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+    Map<String, Long> consumingSegmentsAge = new HashMap<>();
     long now = System.currentTimeMillis();
     try {
       consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
@@ -1076,7 +1076,7 @@ public class TableRebalancer {
           tableRebalanceLogger.warn("Creation time is not found for segment: 
{}", s);
           throw new RuntimeException("Creation time is not found");
         }
-        consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+        consumingSegmentsAge.put(s, (now - creationTime) / 60_000L);
       }));
     } catch (Exception e) {
       return null;
@@ -1091,9 +1091,9 @@ public class TableRebalancer {
    * segment. Return null if failed to obtain info for any consuming segment.
    */
   @Nullable
-  private Map<String, Integer> 
getConsumingSegmentsOffsetsToCatchUp(TableConfig tableConfig,
+  private Map<String, Long> getConsumingSegmentsOffsetsToCatchUp(TableConfig 
tableConfig,
       Map<String, SegmentZKMetadata> consumingSegmentZKMetadata, Logger 
tableRebalanceLogger) {
-    Map<String, Integer> segmentToOffsetsToCatchUp = new HashMap<>();
+    Map<String, Long> segmentToOffsetsToCatchUp = new HashMap<>();
     try {
       for (Map.Entry<String, SegmentZKMetadata> entry : 
consumingSegmentZKMetadata.entrySet()) {
         String segmentName = entry.getKey();
@@ -1113,11 +1113,11 @@ public class TableRebalancer {
           tableRebalanceLogger.warn("Cannot determine partition id for 
realtime segment: {}", segmentName);
           return null;
         }
-        Integer latestOffset = getLatestOffsetOfStream(tableConfig, 
partitionId, tableRebalanceLogger);
+        Long latestOffset = getLatestOffsetOfStream(tableConfig, partitionId, 
tableRebalanceLogger);
         if (latestOffset == null) {
           return null;
         }
-        int offsetsToCatchUp = latestOffset - Integer.parseInt(startOffset);
+        long offsetsToCatchUp = latestOffset - Long.parseLong(startOffset);
         segmentToOffsetsToCatchUp.put(segmentName, offsetsToCatchUp);
       }
     } catch (Exception e) {
@@ -1142,7 +1142,7 @@ public class TableRebalancer {
   }
 
   @Nullable
-  private Integer getLatestOffsetOfStream(TableConfig tableConfig, int 
partitionId,
+  private Long getLatestOffsetOfStream(TableConfig tableConfig, int 
partitionId,
       Logger tableRebalanceLogger) {
     try {
       StreamPartitionMsgOffset partitionMsgOffset = 
fetchStreamPartitionOffset(tableConfig, partitionId);
@@ -1150,7 +1150,7 @@ public class TableRebalancer {
         tableRebalanceLogger.warn("Unsupported stream partition message offset 
type: {}", partitionMsgOffset);
         return null;
       }
-      return (int) ((LongMsgOffset) partitionMsgOffset).getOffset();
+      return ((LongMsgOffset) partitionMsgOffset).getOffset();
     } catch (Exception e) {
       tableRebalanceLogger.warn("Caught exception while trying to fetch stream 
partition of partitionId: {}",
           partitionId, e);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
index af99e72b306..a20655b870d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
@@ -504,8 +504,8 @@ public class TenantRebalanceResult {
     int totalNumConsumingSegmentsToBeMoved = 0;
 
     // Create maps to store all segments by offset and age
-    Map<String, Integer> consumingSegmentsWithMostOffsetsPerTable = new 
HashMap<>();
-    Map<String, Integer> consumingSegmentsWithOldestAgePerTable = new 
HashMap<>();
+    Map<String, Long> consumingSegmentsWithMostOffsetsPerTable = new 
HashMap<>();
+    Map<String, Long> consumingSegmentsWithOldestAgePerTable = new HashMap<>();
 
     // Aggregate ConsumingSegmentSummaryPerServer by server name across all 
tables
     Map<String, AggregatedConsumingSegmentSummaryPerServer> serverAggregates = 
new HashMap<>();
@@ -516,7 +516,7 @@ public class TenantRebalanceResult {
       // Add one segment with offsets for each table
       if (summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp() != 
null
           && 
!summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().isEmpty()) {
-        Map.Entry<String, Integer> consumingSegmentWithMostOffsetsToCatchUp =
+        Map.Entry<String, Long> consumingSegmentWithMostOffsetsToCatchUp =
             
summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().entrySet().iterator().next();
         
consumingSegmentsWithMostOffsetsPerTable.put(consumingSegmentWithMostOffsetsToCatchUp.getKey(),
             consumingSegmentWithMostOffsetsToCatchUp.getValue());
@@ -525,7 +525,7 @@ public class TenantRebalanceResult {
       // Add all segments with ages
       if (summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes() != null
           && 
!summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().isEmpty()) {
-        Map.Entry<String, Integer> consumingSegmentWithOldestAge =
+        Map.Entry<String, Long> consumingSegmentWithOldestAge =
             
summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().entrySet().iterator().next();
         
consumingSegmentsWithOldestAgePerTable.put(consumingSegmentWithOldestAge.getKey(),
             consumingSegmentWithOldestAge.getValue());
@@ -548,14 +548,14 @@ public class TenantRebalanceResult {
     }
 
     // Sort consuming segments (top one from each table) by offsets and age
-    Map<String, Integer> sortedConsumingSegmentsWithMostOffsetsPerTable = new 
LinkedHashMap<>();
+    Map<String, Long> sortedConsumingSegmentsWithMostOffsetsPerTable = new 
LinkedHashMap<>();
     consumingSegmentsWithMostOffsetsPerTable.entrySet().stream()
-        .sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
+        .sorted(Map.Entry.<String, Long>comparingByValue().reversed())
         .forEach(entry -> 
sortedConsumingSegmentsWithMostOffsetsPerTable.put(entry.getKey(), 
entry.getValue()));
 
-    Map<String, Integer> sortedConsumingSegmentsWithOldestAgePerTable = new 
LinkedHashMap<>();
+    Map<String, Long> sortedConsumingSegmentsWithOldestAgePerTable = new 
LinkedHashMap<>();
     consumingSegmentsWithOldestAgePerTable.entrySet().stream()
-        .sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
+        .sorted(Map.Entry.<String, Long>comparingByValue().reversed())
         .forEach(entry -> 
sortedConsumingSegmentsWithOldestAgePerTable.put(entry.getKey(), 
entry.getValue()));
 
     // Convert aggregated server data to final 
ConsumingSegmentSummaryPerServer map
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index dbb87f8f893..599ec4a2801 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -2223,7 +2223,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     
assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
         FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS * numReplica);
     
assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
 numServers);
-    Iterator<Integer> offsetToCatchUpIterator =
+    Iterator<Long> offsetToCatchUpIterator =
         
consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().values().iterator();
     assertEquals(offsetToCatchUpIterator.next(), mockOffsetBig);
     if (FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS > 1) {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConsumingSegmentToBeMovedSummaryIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConsumingSegmentToBeMovedSummaryIntegrationTest.java
index 16999c887cc..fe800e60ef6 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConsumingSegmentToBeMovedSummaryIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConsumingSegmentToBeMovedSummaryIntegrationTest.java
@@ -140,7 +140,7 @@ public class 
KafkaConsumingSegmentToBeMovedSummaryIntegrationTest extends BaseRe
         .getServerConsumingSegmentSummary()
         .values()
         .stream()
-        .reduce(0, (a, b) -> a + 
b.getTotalOffsetsToCatchUpAcrossAllConsumingSegments(), Integer::sum), 57801);
+        .reduce(0L, (a, b) -> a + 
b.getTotalOffsetsToCatchUpAcrossAllConsumingSegments(), Long::sum), 57801);
 
     // set includeConsuming to false
     response = sendPostRequest(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to