somandal commented on code in PR #15368:
URL: https://github.com/apache/pinot/pull/15368#discussion_r2019837369
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +781,148 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
serversGettingNewSegments, serverSegmentChangeInfoMap);
// TODO: Add a metric to estimate the total time it will take to
rebalance. Need some good heuristics on how
// rebalance time can vary with number of segments added
+ RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
consumingSegmentToBeMovedSummary =
+ isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType,
newServersToConsumingSegmentMap);
RebalanceSummaryResult.SegmentInfo segmentInfo = new
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
maxSegmentsAddedToServer, averageSegmentSizeInBytes,
totalEstimatedDataToBeMovedInBytes,
- replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas);
+ replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary);
LOGGER.info("Calculated rebalance summary for table: {} with
rebalanceJobId: {}", tableNameWithType,
rebalanceJobId);
return new RebalanceSummaryResult(serverInfo, segmentInfo);
}
+ private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
getConsumingSegmentSummary(String tableNameWithType,
+ Map<String, Set<String>> newServersToConsumingSegmentMap) {
+ int numConsumingSegmentsToBeMoved =
+ newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) ->
a + b.size(), Integer::sum);
+ Set<String> uniqueConsumingSegments =
+
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+ Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new
HashMap<>();
+ uniqueConsumingSegments.forEach(segment ->
consumingSegmentZKmetadata.put(segment,
+
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
tableNameWithType, segment)));
+ Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+ getConsumingSegmentsOffsetsToCatchUp(tableNameWithType,
consumingSegmentZKmetadata);
+ Map<String, Integer> consumingSegmentsAge =
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+ Map<String, Integer> topTenOffset;
+ Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer>
consumingSegmentSummaryPerServer =
+ new HashMap<>();
+ if (consumingSegmentsOffsetsToCatchUp != null) {
+ topTenOffset = new LinkedHashMap<>();
+ consumingSegmentsOffsetsToCatchUp.entrySet()
+ .stream()
+ .sorted(
+ Collections.reverseOrder(Map.Entry.comparingByValue()))
+ .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+ .forEach(entry -> topTenOffset.put(entry.getKey(),
entry.getValue()));
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ int totalOffsetsToCatchUp =
+
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+ consumingSegmentSummaryPerServer.put(server, new
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+ segments.size(), totalOffsetsToCatchUp));
+ });
+ } else {
+ topTenOffset = null;
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ consumingSegmentSummaryPerServer.put(server, new
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+ segments.size(), null));
+ });
+ }
+
+ Map<String, Integer> oldestTenSegment;
+ oldestTenSegment = new LinkedHashMap<>();
+ consumingSegmentsAge.entrySet()
+ .stream()
+ .sorted(
+ Map.Entry.comparingByValue())
+ .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+ .forEach(entry -> oldestTenSegment.put(entry.getKey(),
entry.getValue()));
+
+ return new
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+ newServersToConsumingSegmentMap.size(), topTenOffset,
oldestTenSegment, consumingSegmentSummaryPerServer);
+ }
+
+ private Map<String, Integer> getConsumingSegmentsAge(String
tableNameWithType,
+ Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+ Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+ long now = System.currentTimeMillis();
+ consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
+ long creationTime = segmentZKMetadata.getCreationTime();
+ if (creationTime < 0) {
+ LOGGER.warn("Creation time is not found for segment: {} in table: {}",
s, tableNameWithType);
+ return;
+ }
+ consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+ }));
+ return consumingSegmentsAge;
+ }
+
+ @VisibleForTesting
+ ConsumingSegmentInfoReader getConsumingSegmentInfoReader() {
+ if (_executorService == null || _connectionManager == null ||
_pinotHelixResourceManager == null) {
+ return null;
+ }
+ return new ConsumingSegmentInfoReader(_executorService,
_connectionManager, _pinotHelixResourceManager);
+ }
+
+ /**
+ * Fetches the consuming segment info for the table and calculates the
number of offsets to catch up for each
+ * consuming segment. consumingSegmentZKMetadata is a map from consuming
segments to be moved to their ZK metadata.
+ * Returns a map from segment name to the number of offsets to catch up for
that consuming
+ * segment. Return null if failed to obtain info for any consuming segment.
+ */
+ private Map<String, Integer> getConsumingSegmentsOffsetsToCatchUp(String
tableNameWithType,
Review Comment:
i think this is not addressed? are you planning to or not? (just to make
sure nothing is missed - it'll be good to reply to comments on whether you've
addressed or decided to skip so it's easier for the reviewer - again i'm not
insisting that this must be reviewed, just want to know if it was or not 😅)
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +781,148 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
serversGettingNewSegments, serverSegmentChangeInfoMap);
// TODO: Add a metric to estimate the total time it will take to
rebalance. Need some good heuristics on how
// rebalance time can vary with number of segments added
+ RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
consumingSegmentToBeMovedSummary =
+ isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType,
newServersToConsumingSegmentMap);
RebalanceSummaryResult.SegmentInfo segmentInfo = new
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
maxSegmentsAddedToServer, averageSegmentSizeInBytes,
totalEstimatedDataToBeMovedInBytes,
- replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas);
+ replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary);
LOGGER.info("Calculated rebalance summary for table: {} with
rebalanceJobId: {}", tableNameWithType,
rebalanceJobId);
return new RebalanceSummaryResult(serverInfo, segmentInfo);
}
+ private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
getConsumingSegmentSummary(String tableNameWithType,
+ Map<String, Set<String>> newServersToConsumingSegmentMap) {
+ int numConsumingSegmentsToBeMoved =
+ newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) ->
a + b.size(), Integer::sum);
+ Set<String> uniqueConsumingSegments =
+
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+ Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new
HashMap<>();
+ uniqueConsumingSegments.forEach(segment ->
consumingSegmentZKmetadata.put(segment,
+
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
tableNameWithType, segment)));
+ Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+ getConsumingSegmentsOffsetsToCatchUp(tableNameWithType,
consumingSegmentZKmetadata);
+ Map<String, Integer> consumingSegmentsAge =
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+ Map<String, Integer> topTenOffset;
+ Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer>
consumingSegmentSummaryPerServer =
+ new HashMap<>();
+ if (consumingSegmentsOffsetsToCatchUp != null) {
+ topTenOffset = new LinkedHashMap<>();
+ consumingSegmentsOffsetsToCatchUp.entrySet()
+ .stream()
+ .sorted(
+ Collections.reverseOrder(Map.Entry.comparingByValue()))
+ .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+ .forEach(entry -> topTenOffset.put(entry.getKey(),
entry.getValue()));
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ int totalOffsetsToCatchUp =
+
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+ consumingSegmentSummaryPerServer.put(server, new
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+ segments.size(), totalOffsetsToCatchUp));
+ });
+ } else {
+ topTenOffset = null;
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ consumingSegmentSummaryPerServer.put(server, new
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+ segments.size(), null));
+ });
+ }
+
+ Map<String, Integer> oldestTenSegment;
+ oldestTenSegment = new LinkedHashMap<>();
+ consumingSegmentsAge.entrySet()
+ .stream()
+ .sorted(
+ Map.Entry.comparingByValue())
+ .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+ .forEach(entry -> oldestTenSegment.put(entry.getKey(),
entry.getValue()));
+
+ return new
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+ newServersToConsumingSegmentMap.size(), topTenOffset,
oldestTenSegment, consumingSegmentSummaryPerServer);
+ }
+
+ private Map<String, Integer> getConsumingSegmentsAge(String
tableNameWithType,
+ Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+ Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+ long now = System.currentTimeMillis();
+ consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
+ long creationTime = segmentZKMetadata.getCreationTime();
+ if (creationTime < 0) {
+ LOGGER.warn("Creation time is not found for segment: {} in table: {}",
s, tableNameWithType);
+ return;
+ }
+ consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+ }));
+ return consumingSegmentsAge;
+ }
+
+ @VisibleForTesting
+ ConsumingSegmentInfoReader getConsumingSegmentInfoReader() {
+ if (_executorService == null || _connectionManager == null ||
_pinotHelixResourceManager == null) {
+ return null;
+ }
+ return new ConsumingSegmentInfoReader(_executorService,
_connectionManager, _pinotHelixResourceManager);
+ }
+
+ /**
+ * Fetches the consuming segment info for the table and calculates the
number of offsets to catch up for each
+ * consuming segment. consumingSegmentZKMetadata is a map from consuming
segments to be moved to their ZK metadata.
+ * Returns a map from segment name to the number of offsets to catch up for
that consuming
+ * segment. Return null if failed to obtain info for any consuming segment.
+ */
+ private Map<String, Integer> getConsumingSegmentsOffsetsToCatchUp(String
tableNameWithType,
Review Comment:
i think this is not addressed? are you planning to or not? (just to make
sure nothing is missed - it'll be good to reply to comments on whether you've
addressed or decided to skip so it's easier for the reviewer - again i'm not
insisting that this must be addressed, just want to know if it was or not 😅)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]