This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 014cdd6728d change consuming segment offset and age from int to long
(#16614)
014cdd6728d is described below
commit 014cdd6728d7d073564e35d75e67565c234a420c
Author: Jhow <[email protected]>
AuthorDate: Fri Aug 15 14:47:34 2025 -0700
change consuming segment offset and age from int to long (#16614)
---
.../core/rebalance/RebalanceSummaryResult.java | 18 +++++------
.../helix/core/rebalance/TableRebalancer.java | 36 +++++++++++-----------
.../rebalance/tenant/TenantRebalanceResult.java | 16 +++++-----
.../TableRebalancerClusterStatelessTest.java | 2 +-
...mingSegmentToBeMovedSummaryIntegrationTest.java | 2 +-
5 files changed, 37 insertions(+), 37 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
index 6abcf61b1cf..c4e58beafd5 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
@@ -344,8 +344,8 @@ public class RebalanceSummaryResult {
public static class ConsumingSegmentToBeMovedSummary {
private final int _numConsumingSegmentsToBeMoved;
private final int _numServersGettingConsumingSegmentsAdded;
- private final Map<String, Integer>
_consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
- private final Map<String, Integer>
_consumingSegmentsToBeMovedWithOldestAgeInMinutes;
+ private final Map<String, Long>
_consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
+ private final Map<String, Long>
_consumingSegmentsToBeMovedWithOldestAgeInMinutes;
private final Map<String, ConsumingSegmentSummaryPerServer>
_serverConsumingSegmentSummary;
/**
@@ -381,9 +381,9 @@ public class RebalanceSummaryResult {
@JsonProperty("numConsumingSegmentsToBeMoved") int
numConsumingSegmentsToBeMoved,
@JsonProperty("numServersGettingConsumingSegmentsAdded") int
numServersGettingConsumingSegmentsAdded,
@JsonProperty("consumingSegmentsToBeMovedWithMostOffsetsToCatchUp")
@Nullable
- Map<String, Integer>
consumingSegmentsToBeMovedWithMostOffsetsToCatchUp,
+ Map<String, Long> consumingSegmentsToBeMovedWithMostOffsetsToCatchUp,
@JsonProperty("consumingSegmentsToBeMovedWithOldestAgeInMinutes")
@Nullable
- Map<String, Integer> consumingSegmentsToBeMovedWithOldestAgeInMinutes,
+ Map<String, Long> consumingSegmentsToBeMovedWithOldestAgeInMinutes,
@JsonProperty("serverConsumingSegmentSummary") @Nullable
Map<String, ConsumingSegmentSummaryPerServer>
serverConsumingSegmentSummary) {
_numConsumingSegmentsToBeMoved = numConsumingSegmentsToBeMoved;
@@ -404,12 +404,12 @@ public class RebalanceSummaryResult {
}
@JsonProperty
- public Map<String, Integer>
getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp() {
+ public Map<String, Long>
getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp() {
return _consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
}
@JsonProperty
- public Map<String, Integer>
getConsumingSegmentsToBeMovedWithOldestAgeInMinutes() {
+ public Map<String, Long>
getConsumingSegmentsToBeMovedWithOldestAgeInMinutes() {
return _consumingSegmentsToBeMovedWithOldestAgeInMinutes;
}
@@ -420,7 +420,7 @@ public class RebalanceSummaryResult {
public static class ConsumingSegmentSummaryPerServer {
protected int _numConsumingSegmentsToBeAdded;
- protected int _totalOffsetsToCatchUpAcrossAllConsumingSegments;
+ protected long _totalOffsetsToCatchUpAcrossAllConsumingSegments;
/**
* Constructor for ConsumingSegmentSummaryPerServer
@@ -437,7 +437,7 @@ public class RebalanceSummaryResult {
public ConsumingSegmentSummaryPerServer(
@JsonProperty("numConsumingSegmentsToBeAdded") int
numConsumingSegmentsToBeAdded,
@JsonProperty("totalOffsetsToCatchUpAcrossAllConsumingSegments")
- int totalOffsetsToCatchUpAcrossAllConsumingSegments) {
+ long totalOffsetsToCatchUpAcrossAllConsumingSegments) {
_numConsumingSegmentsToBeAdded = numConsumingSegmentsToBeAdded;
_totalOffsetsToCatchUpAcrossAllConsumingSegments =
totalOffsetsToCatchUpAcrossAllConsumingSegments;
}
@@ -448,7 +448,7 @@ public class RebalanceSummaryResult {
}
@JsonProperty
- public int getTotalOffsetsToCatchUpAcrossAllConsumingSegments() {
+ public long getTotalOffsetsToCatchUpAcrossAllConsumingSegments() {
return _totalOffsetsToCatchUpAcrossAllConsumingSegments;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 24854b8bb75..014327e68a9 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -1006,20 +1006,20 @@ public class TableRebalancer {
Map<String, SegmentZKMetadata> consumingSegmentZKMetadata = new
HashMap<>();
uniqueConsumingSegments.forEach(segment ->
consumingSegmentZKMetadata.put(segment,
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
tableNameWithType, segment)));
- Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+ Map<String, Long> consumingSegmentsOffsetsToCatchUp =
getConsumingSegmentsOffsetsToCatchUp(tableConfig,
consumingSegmentZKMetadata, tableRebalanceLogger);
- Map<String, Integer> consumingSegmentsAge =
+ Map<String, Long> consumingSegmentsAge =
getConsumingSegmentsAge(consumingSegmentZKMetadata,
tableRebalanceLogger);
- Map<String, Integer> consumingSegmentsOffsetsToCatchUpTopN;
+ Map<String, Long> consumingSegmentsOffsetsToCatchUpTopN;
Map<String,
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
consumingSegmentSummaryPerServer = new HashMap<>();
if (consumingSegmentsOffsetsToCatchUp != null) {
consumingSegmentsOffsetsToCatchUpTopN =
getTopNConsumingSegmentWithValue(consumingSegmentsOffsetsToCatchUp,
TOP_N_IN_CONSUMING_SEGMENT_SUMMARY);
newServersToConsumingSegmentMap.forEach((server, segments) -> {
- int totalOffsetsToCatchUp =
-
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+ long totalOffsetsToCatchUp =
+
segments.stream().mapToLong(consumingSegmentsOffsetsToCatchUp::get).sum();
consumingSegmentSummaryPerServer.put(server,
new
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
segments.size(), totalOffsetsToCatchUp));
@@ -1033,7 +1033,7 @@ public class TableRebalancer {
});
}
- Map<String, Integer> consumingSegmentsOldestTopN =
+ Map<String, Long> consumingSegmentsOldestTopN =
consumingSegmentsAge == null ? null
: getTopNConsumingSegmentWithValue(consumingSegmentsAge,
TOP_N_IN_CONSUMING_SEGMENT_SUMMARY);
@@ -1042,9 +1042,9 @@ public class TableRebalancer {
consumingSegmentSummaryPerServer);
}
- private static Map<String, Integer> getTopNConsumingSegmentWithValue(
- Map<String, Integer> consumingSegmentsWithValue, @Nullable Integer topN)
{
- Map<String, Integer> topNConsumingSegments = new LinkedHashMap<>();
+ private static Map<String, Long> getTopNConsumingSegmentWithValue(
+ Map<String, Long> consumingSegmentsWithValue, @Nullable Integer topN) {
+ Map<String, Long> topNConsumingSegments = new LinkedHashMap<>();
consumingSegmentsWithValue.entrySet()
.stream()
.sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
@@ -1061,9 +1061,9 @@ public class TableRebalancer {
* segment name to the age of that consuming segment. Return null if failed
to obtain info for any consuming segment.
*/
@Nullable
- private Map<String, Integer> getConsumingSegmentsAge(Map<String,
SegmentZKMetadata> consumingSegmentZKMetadata,
+ private Map<String, Long> getConsumingSegmentsAge(Map<String,
SegmentZKMetadata> consumingSegmentZKMetadata,
Logger tableRebalanceLogger) {
- Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+ Map<String, Long> consumingSegmentsAge = new HashMap<>();
long now = System.currentTimeMillis();
try {
consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
@@ -1076,7 +1076,7 @@ public class TableRebalancer {
tableRebalanceLogger.warn("Creation time is not found for segment:
{}", s);
throw new RuntimeException("Creation time is not found");
}
- consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+ consumingSegmentsAge.put(s, (now - creationTime) / 60_000L);
}));
} catch (Exception e) {
return null;
@@ -1091,9 +1091,9 @@ public class TableRebalancer {
* segment. Return null if failed to obtain info for any consuming segment.
*/
@Nullable
- private Map<String, Integer>
getConsumingSegmentsOffsetsToCatchUp(TableConfig tableConfig,
+ private Map<String, Long> getConsumingSegmentsOffsetsToCatchUp(TableConfig
tableConfig,
Map<String, SegmentZKMetadata> consumingSegmentZKMetadata, Logger
tableRebalanceLogger) {
- Map<String, Integer> segmentToOffsetsToCatchUp = new HashMap<>();
+ Map<String, Long> segmentToOffsetsToCatchUp = new HashMap<>();
try {
for (Map.Entry<String, SegmentZKMetadata> entry :
consumingSegmentZKMetadata.entrySet()) {
String segmentName = entry.getKey();
@@ -1113,11 +1113,11 @@ public class TableRebalancer {
tableRebalanceLogger.warn("Cannot determine partition id for
realtime segment: {}", segmentName);
return null;
}
- Integer latestOffset = getLatestOffsetOfStream(tableConfig,
partitionId, tableRebalanceLogger);
+ Long latestOffset = getLatestOffsetOfStream(tableConfig, partitionId,
tableRebalanceLogger);
if (latestOffset == null) {
return null;
}
- int offsetsToCatchUp = latestOffset - Integer.parseInt(startOffset);
+ long offsetsToCatchUp = latestOffset - Long.parseLong(startOffset);
segmentToOffsetsToCatchUp.put(segmentName, offsetsToCatchUp);
}
} catch (Exception e) {
@@ -1142,7 +1142,7 @@ public class TableRebalancer {
}
@Nullable
- private Integer getLatestOffsetOfStream(TableConfig tableConfig, int
partitionId,
+ private Long getLatestOffsetOfStream(TableConfig tableConfig, int
partitionId,
Logger tableRebalanceLogger) {
try {
StreamPartitionMsgOffset partitionMsgOffset =
fetchStreamPartitionOffset(tableConfig, partitionId);
@@ -1150,7 +1150,7 @@ public class TableRebalancer {
tableRebalanceLogger.warn("Unsupported stream partition message offset
type: {}", partitionMsgOffset);
return null;
}
- return (int) ((LongMsgOffset) partitionMsgOffset).getOffset();
+ return ((LongMsgOffset) partitionMsgOffset).getOffset();
} catch (Exception e) {
tableRebalanceLogger.warn("Caught exception while trying to fetch stream
partition of partitionId: {}",
partitionId, e);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
index af99e72b306..a20655b870d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
@@ -504,8 +504,8 @@ public class TenantRebalanceResult {
int totalNumConsumingSegmentsToBeMoved = 0;
// Create maps to store all segments by offset and age
- Map<String, Integer> consumingSegmentsWithMostOffsetsPerTable = new
HashMap<>();
- Map<String, Integer> consumingSegmentsWithOldestAgePerTable = new
HashMap<>();
+ Map<String, Long> consumingSegmentsWithMostOffsetsPerTable = new
HashMap<>();
+ Map<String, Long> consumingSegmentsWithOldestAgePerTable = new HashMap<>();
// Aggregate ConsumingSegmentSummaryPerServer by server name across all
tables
Map<String, AggregatedConsumingSegmentSummaryPerServer> serverAggregates =
new HashMap<>();
@@ -516,7 +516,7 @@ public class TenantRebalanceResult {
// Add one segment with offsets for each table
if (summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp() !=
null
&&
!summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().isEmpty()) {
- Map.Entry<String, Integer> consumingSegmentWithMostOffsetsToCatchUp =
+ Map.Entry<String, Long> consumingSegmentWithMostOffsetsToCatchUp =
summary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().entrySet().iterator().next();
consumingSegmentsWithMostOffsetsPerTable.put(consumingSegmentWithMostOffsetsToCatchUp.getKey(),
consumingSegmentWithMostOffsetsToCatchUp.getValue());
@@ -525,7 +525,7 @@ public class TenantRebalanceResult {
// Add all segments with ages
if (summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes() != null
&&
!summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().isEmpty()) {
- Map.Entry<String, Integer> consumingSegmentWithOldestAge =
+ Map.Entry<String, Long> consumingSegmentWithOldestAge =
summary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().entrySet().iterator().next();
consumingSegmentsWithOldestAgePerTable.put(consumingSegmentWithOldestAge.getKey(),
consumingSegmentWithOldestAge.getValue());
@@ -548,14 +548,14 @@ public class TenantRebalanceResult {
}
// Sort consuming segments (top one from each table) by offsets and age
- Map<String, Integer> sortedConsumingSegmentsWithMostOffsetsPerTable = new
LinkedHashMap<>();
+ Map<String, Long> sortedConsumingSegmentsWithMostOffsetsPerTable = new
LinkedHashMap<>();
consumingSegmentsWithMostOffsetsPerTable.entrySet().stream()
- .sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
+ .sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.forEach(entry ->
sortedConsumingSegmentsWithMostOffsetsPerTable.put(entry.getKey(),
entry.getValue()));
- Map<String, Integer> sortedConsumingSegmentsWithOldestAgePerTable = new
LinkedHashMap<>();
+ Map<String, Long> sortedConsumingSegmentsWithOldestAgePerTable = new
LinkedHashMap<>();
consumingSegmentsWithOldestAgePerTable.entrySet().stream()
- .sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
+ .sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.forEach(entry ->
sortedConsumingSegmentsWithOldestAgePerTable.put(entry.getKey(),
entry.getValue()));
// Convert aggregated server data to final
ConsumingSegmentSummaryPerServer map
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index dbb87f8f893..599ec4a2801 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -2223,7 +2223,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS * numReplica);
assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
numServers);
- Iterator<Integer> offsetToCatchUpIterator =
+ Iterator<Long> offsetToCatchUpIterator =
consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().values().iterator();
assertEquals(offsetToCatchUpIterator.next(), mockOffsetBig);
if (FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS > 1) {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConsumingSegmentToBeMovedSummaryIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConsumingSegmentToBeMovedSummaryIntegrationTest.java
index 16999c887cc..fe800e60ef6 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConsumingSegmentToBeMovedSummaryIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConsumingSegmentToBeMovedSummaryIntegrationTest.java
@@ -140,7 +140,7 @@ public class
KafkaConsumingSegmentToBeMovedSummaryIntegrationTest extends BaseRe
.getServerConsumingSegmentSummary()
.values()
.stream()
- .reduce(0, (a, b) -> a +
b.getTotalOffsetsToCatchUpAcrossAllConsumingSegments(), Integer::sum), 57801);
+ .reduce(0L, (a, b) -> a +
b.getTotalOffsetsToCatchUpAcrossAllConsumingSegments(), Long::sum), 57801);
// set includeConsuming to false
response = sendPostRequest(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]