J-HowHuang commented on code in PR #15368:
URL: https://github.com/apache/pinot/pull/15368#discussion_r2021450301
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -806,6 +863,150 @@ private List<String> getServerTag(String serverName) {
return instanceConfig.getTags();
}
+ private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
getConsumingSegmentSummary(String tableNameWithType,
+ Map<String, Set<String>> newServersToConsumingSegmentMap) {
+ if (newServersToConsumingSegmentMap.isEmpty()) {
+ return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(0, 0,
new HashMap<>(), new HashMap<>(),
+ new HashMap<>());
+ }
+ int numConsumingSegmentsToBeMoved =
+ newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) ->
a + b.size(), Integer::sum);
+ Set<String> uniqueConsumingSegments =
+
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+ Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new
HashMap<>();
+ uniqueConsumingSegments.forEach(segment ->
consumingSegmentZKmetadata.put(segment,
+
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
tableNameWithType, segment)));
+ Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+ getConsumingSegmentsOffsetsToCatchUp(tableNameWithType,
consumingSegmentZKmetadata);
+ Map<String, Integer> consumingSegmentsAge =
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+ Map<String, Integer> consumingSegmentsOffsetsToCatchUpTopN;
+ Map<String,
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
+ consumingSegmentSummaryPerServer =
+ new HashMap<>();
+ if (consumingSegmentsOffsetsToCatchUp != null) {
+ consumingSegmentsOffsetsToCatchUpTopN =
getTopNConsumingSegmentWithValue(consumingSegmentsOffsetsToCatchUp);
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ int totalOffsetsToCatchUp =
+
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+ consumingSegmentSummaryPerServer.put(server,
+ new
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+ segments.size(), totalOffsetsToCatchUp));
+ });
+ } else {
+ consumingSegmentsOffsetsToCatchUpTopN = null;
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ consumingSegmentSummaryPerServer.put(server,
+ new
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+ segments.size(), null));
+ });
+ }
+
+ Map<String, Integer> consumingSegmentsOldestTopN =
getTopNConsumingSegmentWithValue(consumingSegmentsAge);
+
+ return new
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+ newServersToConsumingSegmentMap.size(),
consumingSegmentsOffsetsToCatchUpTopN, consumingSegmentsOldestTopN,
+ consumingSegmentSummaryPerServer);
+ }
+
+ private static Map<String, Integer> getTopNConsumingSegmentWithValue(
+ Map<String, Integer> consumingSegmentsWithValue) {
+ Map<String, Integer> topNConsumingSegments = new LinkedHashMap<>();
+ consumingSegmentsWithValue.entrySet()
+ .stream()
+ .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
+ .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+ .forEach(entry -> topNConsumingSegments.put(entry.getKey(),
entry.getValue()));
+ return topNConsumingSegments;
+ }
+
+ /**
+ * Fetches the age of each consuming segment.
+ * The age of a consuming segment is the time since the segment was created
in ZK, it could be different to when
+ * the stream should start to be consumed for the segment.
+ * consumingSegmentZKMetadata is a map from consuming segments to be moved
to their ZK metadata. Returns a map from
+ * segment name to the age of that consuming segment. Return null if failed
to obtain info for any consuming segment.
+ */
+ private Map<String, Integer> getConsumingSegmentsAge(String
tableNameWithType,
+ Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+ Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+ long now = System.currentTimeMillis();
+ consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
+ long creationTime = segmentZKMetadata.getCreationTime();
+ if (creationTime < 0) {
+ LOGGER.warn("Creation time is not found for segment: {} in table: {}",
s, tableNameWithType);
+ return;
+ }
+ consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+ }));
+ return consumingSegmentsAge;
+ }
+
+ @VisibleForTesting
+ ConsumingSegmentInfoReader getConsumingSegmentInfoReader() {
+ if (_executorService == null || _connectionManager == null ||
_pinotHelixResourceManager == null) {
+ return null;
+ }
+ return new ConsumingSegmentInfoReader(_executorService,
_connectionManager, _pinotHelixResourceManager);
+ }
+
+ /**
+ * Fetches the consuming segment info for the table and calculates the
number of offsets to catch up for each
+ * consuming segment. consumingSegmentZKMetadata is a map from consuming
segments to be moved to their ZK metadata.
+ * Returns a map from segment name to the number of offsets to catch up for
that consuming
+ * segment. Return null if failed to obtain info for any consuming segment.
+ */
+ private Map<String, Integer> getConsumingSegmentsOffsetsToCatchUp(String
tableNameWithType,
+ Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+ if (consumingSegmentZKMetadata.isEmpty()) {
Review Comment:
This should never be reached now since we had an early return if no
consuming segment moved. I'll remove this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]