somandal commented on code in PR #15368:
URL: https://github.com/apache/pinot/pull/15368#discussion_r2017439035
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -240,6 +240,92 @@ public Map<String, ServerSegmentChangeInfo>
getServerSegmentChangeInfo() {
}
}
+ public static class ConsumingSegmentSummaryPerServer {
+ private final int _numConsumingSegmentToBeAdded;
+ private final Integer _totalOffsetsNeedToCatchUp;
+
+ @JsonCreator
+ public ConsumingSegmentSummaryPerServer(
Review Comment:
nit: can you move this class to the bottom of
`ConsumingSegmentSummaryPerServer`? I've found it easier to read the code if
the main class's constructor and fields are at the top
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -240,6 +240,92 @@ public Map<String, ServerSegmentChangeInfo>
getServerSegmentChangeInfo() {
}
}
+ public static class ConsumingSegmentSummaryPerServer {
+ private final int _numConsumingSegmentToBeAdded;
+ private final Integer _totalOffsetsNeedToCatchUp;
+
+ @JsonCreator
+ public ConsumingSegmentSummaryPerServer(
+ @JsonProperty("numConsumingSegmentToBeAdded") int
numConsumingSegmentToBeAdded,
+ @JsonProperty("totalOffsetsNeedToCatchUp") @Nullable Integer
totalOffsetsNeedToCatchUp) {
Review Comment:
nit: this name is confusing. can we rename to:
`totalNumOffsetsToReConsumeToCatchUp`
Or something along the above lines?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +781,148 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
serversGettingNewSegments, serverSegmentChangeInfoMap);
// TODO: Add a metric to estimate the total time it will take to
rebalance. Need some good heuristics on how
// rebalance time can vary with number of segments added
+ RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
consumingSegmentToBeMovedSummary =
+ isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType,
newServersToConsumingSegmentMap);
RebalanceSummaryResult.SegmentInfo segmentInfo = new
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
maxSegmentsAddedToServer, averageSegmentSizeInBytes,
totalEstimatedDataToBeMovedInBytes,
- replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas);
+ replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary);
LOGGER.info("Calculated rebalance summary for table: {} with
rebalanceJobId: {}", tableNameWithType,
rebalanceJobId);
return new RebalanceSummaryResult(serverInfo, segmentInfo);
}
+ private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
getConsumingSegmentSummary(String tableNameWithType,
+ Map<String, Set<String>> newServersToConsumingSegmentMap) {
+ int numConsumingSegmentsToBeMoved =
+ newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) ->
a + b.size(), Integer::sum);
+ Set<String> uniqueConsumingSegments =
+
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+ Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new
HashMap<>();
+ uniqueConsumingSegments.forEach(segment ->
consumingSegmentZKmetadata.put(segment,
+
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
tableNameWithType, segment)));
+ Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+ getConsumingSegmentsOffsetsToCatchUp(tableNameWithType,
consumingSegmentZKmetadata);
+ Map<String, Integer> consumingSegmentsAge =
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+ Map<String, Integer> topTenOffset;
+ Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer>
consumingSegmentSummaryPerServer =
+ new HashMap<>();
+ if (consumingSegmentsOffsetsToCatchUp != null) {
+ topTenOffset = new LinkedHashMap<>();
+ consumingSegmentsOffsetsToCatchUp.entrySet()
+ .stream()
+ .sorted(
+ Collections.reverseOrder(Map.Entry.comparingByValue()))
+ .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+ .forEach(entry -> topTenOffset.put(entry.getKey(),
entry.getValue()));
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ int totalOffsetsToCatchUp =
+
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+ consumingSegmentSummaryPerServer.put(server, new
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+ segments.size(), totalOffsetsToCatchUp));
+ });
+ } else {
+ topTenOffset = null;
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ consumingSegmentSummaryPerServer.put(server, new
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+ segments.size(), null));
+ });
+ }
+
+ Map<String, Integer> oldestTenSegment;
+ oldestTenSegment = new LinkedHashMap<>();
+ consumingSegmentsAge.entrySet()
+ .stream()
+ .sorted(
+ Map.Entry.comparingByValue())
+ .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+ .forEach(entry -> oldestTenSegment.put(entry.getKey(),
entry.getValue()));
+
+ return new
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+ newServersToConsumingSegmentMap.size(), topTenOffset,
oldestTenSegment, consumingSegmentSummaryPerServer);
+ }
+
+ private Map<String, Integer> getConsumingSegmentsAge(String
tableNameWithType,
Review Comment:
nit: let's rename to: `getTopConsumingSegmentsByOldestCreationTime`
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +781,148 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
serversGettingNewSegments, serverSegmentChangeInfoMap);
// TODO: Add a metric to estimate the total time it will take to
rebalance. Need some good heuristics on how
// rebalance time can vary with number of segments added
+ RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
consumingSegmentToBeMovedSummary =
+ isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType,
newServersToConsumingSegmentMap);
RebalanceSummaryResult.SegmentInfo segmentInfo = new
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
maxSegmentsAddedToServer, averageSegmentSizeInBytes,
totalEstimatedDataToBeMovedInBytes,
- replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas);
+ replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary);
LOGGER.info("Calculated rebalance summary for table: {} with
rebalanceJobId: {}", tableNameWithType,
rebalanceJobId);
return new RebalanceSummaryResult(serverInfo, segmentInfo);
}
+ private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
getConsumingSegmentSummary(String tableNameWithType,
+ Map<String, Set<String>> newServersToConsumingSegmentMap) {
+ int numConsumingSegmentsToBeMoved =
+ newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) ->
a + b.size(), Integer::sum);
+ Set<String> uniqueConsumingSegments =
+
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+ Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new
HashMap<>();
+ uniqueConsumingSegments.forEach(segment ->
consumingSegmentZKmetadata.put(segment,
+
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
tableNameWithType, segment)));
+ Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+ getConsumingSegmentsOffsetsToCatchUp(tableNameWithType,
consumingSegmentZKmetadata);
+ Map<String, Integer> consumingSegmentsAge =
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+ Map<String, Integer> topTenOffset;
+ Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer>
consumingSegmentSummaryPerServer =
+ new HashMap<>();
+ if (consumingSegmentsOffsetsToCatchUp != null) {
+ topTenOffset = new LinkedHashMap<>();
+ consumingSegmentsOffsetsToCatchUp.entrySet()
+ .stream()
+ .sorted(
+ Collections.reverseOrder(Map.Entry.comparingByValue()))
+ .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+ .forEach(entry -> topTenOffset.put(entry.getKey(),
entry.getValue()));
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ int totalOffsetsToCatchUp =
+
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+ consumingSegmentSummaryPerServer.put(server, new
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+ segments.size(), totalOffsetsToCatchUp));
+ });
+ } else {
+ topTenOffset = null;
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ consumingSegmentSummaryPerServer.put(server, new
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+ segments.size(), null));
+ });
+ }
+
+ Map<String, Integer> oldestTenSegment;
+ oldestTenSegment = new LinkedHashMap<>();
+ consumingSegmentsAge.entrySet()
+ .stream()
+ .sorted(
+ Map.Entry.comparingByValue())
+ .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+ .forEach(entry -> oldestTenSegment.put(entry.getKey(),
entry.getValue()));
+
+ return new
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+ newServersToConsumingSegmentMap.size(), topTenOffset,
oldestTenSegment, consumingSegmentSummaryPerServer);
+ }
+
+ private Map<String, Integer> getConsumingSegmentsAge(String
tableNameWithType,
+ Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+ Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+ long now = System.currentTimeMillis();
+ consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
+ long creationTime = segmentZKMetadata.getCreationTime();
+ if (creationTime < 0) {
+ LOGGER.warn("Creation time is not found for segment: {} in table: {}",
s, tableNameWithType);
+ return;
+ }
+ consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+ }));
+ return consumingSegmentsAge;
+ }
+
+ @VisibleForTesting
+ ConsumingSegmentInfoReader getConsumingSegmentInfoReader() {
+ if (_executorService == null || _connectionManager == null ||
_pinotHelixResourceManager == null) {
+ return null;
+ }
+ return new ConsumingSegmentInfoReader(_executorService,
_connectionManager, _pinotHelixResourceManager);
+ }
+
+ /**
+ * Fetches the consuming segment info for the table and calculates the
number of offsets to catch up for each
+ * consuming segment. consumingSegmentZKMetadata is a map from consuming
segments to be moved to their ZK metadata.
+ * Returns a map from segment name to the number of offsets to catch up for
that consuming
+ * segment. Return null if failed to obtain info for any consuming segment.
+ */
+ private Map<String, Integer> getConsumingSegmentsOffsetsToCatchUp(String
tableNameWithType,
+ Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+ if (consumingSegmentZKMetadata.isEmpty()) {
+ LOGGER.info("No consuming segments being moved for table: {}",
tableNameWithType);
+ return new HashMap<>();
+ }
+ ConsumingSegmentInfoReader consumingSegmentInfoReader =
getConsumingSegmentInfoReader();
+ if (consumingSegmentInfoReader == null) {
+ LOGGER.warn("ConsumingSegmentInfoReader is null, cannot calculate
consuming segments info for table: {}",
+ tableNameWithType);
+ return null;
+ }
+ Map<String, Integer> segmentToOffsetsToCatchUp = new HashMap<>();
+ try {
+ ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap
consumingSegmentsInfoMap =
+
consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 30_000);
+ for (Map.Entry<String, SegmentZKMetadata> entry :
consumingSegmentZKMetadata.entrySet()) {
+ String segmentName = entry.getKey();
+ List<ConsumingSegmentInfoReader.ConsumingSegmentInfo>
consumingSegmentInfoList =
+
consumingSegmentsInfoMap._segmentToConsumingInfoMap.getOrDefault(segmentName,
null);
+ SegmentZKMetadata segmentZKMetadata = entry.getValue();
+ if (segmentZKMetadata == null) {
+ LOGGER.warn("Cannot find SegmentZKMetadata for segment: {} in table:
{}", segmentName, tableNameWithType);
+ return null;
+ }
+ String startOffset = segmentZKMetadata.getStartOffset();
+ if (startOffset == null) {
+ LOGGER.warn("Start offset is null for segment: {} in table: {}",
segmentName, tableNameWithType);
+ return null;
+ }
+ if (consumingSegmentInfoList != null &&
!consumingSegmentInfoList.isEmpty()) {
+ // this value should be the same regardless of which server the
consuming segment info is from, use the
+ // first in the list here
+ int offsetsToCatchUp =
+
consumingSegmentInfoList.get(0)._partitionOffsetInfo._latestUpstreamOffsetMap.values()
+ .stream().mapToInt(offset -> Integer.parseInt(offset) -
Integer.parseInt(startOffset)).sum();
+ segmentToOffsetsToCatchUp.put(segmentName, offsetsToCatchUp);
+ } else {
+ LOGGER.warn("No available consuming segment info from any server.
Segment: {} in table: {}", segmentName,
+ tableNameWithType);
+ return null;
+ }
+ }
+ } catch (InvalidConfigException e) {
+ LOGGER.warn("Caught exception while trying to fetch consuming segment
info for table: {}", tableNameWithType, e);
+ return null;
+ }
+ LOGGER.info("Successfully fetched consuming segments info for table: {}",
tableNameWithType);
Review Comment:
nit: since we've identified a logging improvement to alway include the
rebalance job ID, can you ensure you add that to all logs added in this PR?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +781,148 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
serversGettingNewSegments, serverSegmentChangeInfoMap);
// TODO: Add a metric to estimate the total time it will take to
rebalance. Need some good heuristics on how
// rebalance time can vary with number of segments added
+ RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
consumingSegmentToBeMovedSummary =
+ isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType,
newServersToConsumingSegmentMap);
RebalanceSummaryResult.SegmentInfo segmentInfo = new
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
maxSegmentsAddedToServer, averageSegmentSizeInBytes,
totalEstimatedDataToBeMovedInBytes,
- replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas);
+ replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary);
LOGGER.info("Calculated rebalance summary for table: {} with
rebalanceJobId: {}", tableNameWithType,
rebalanceJobId);
return new RebalanceSummaryResult(serverInfo, segmentInfo);
}
+ private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
getConsumingSegmentSummary(String tableNameWithType,
+ Map<String, Set<String>> newServersToConsumingSegmentMap) {
+ int numConsumingSegmentsToBeMoved =
+ newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) ->
a + b.size(), Integer::sum);
+ Set<String> uniqueConsumingSegments =
Review Comment:
can we have an early return from here if the `uniqueConsumingSegments` list
is empty? that way you can assume that the remainder of this function is acting
on the scenario where `CONSUMING` segments are moving.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +781,148 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
serversGettingNewSegments, serverSegmentChangeInfoMap);
// TODO: Add a metric to estimate the total time it will take to
rebalance. Need some good heuristics on how
// rebalance time can vary with number of segments added
+ RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
consumingSegmentToBeMovedSummary =
+ isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType,
newServersToConsumingSegmentMap);
RebalanceSummaryResult.SegmentInfo segmentInfo = new
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
maxSegmentsAddedToServer, averageSegmentSizeInBytes,
totalEstimatedDataToBeMovedInBytes,
- replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas);
+ replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary);
LOGGER.info("Calculated rebalance summary for table: {} with
rebalanceJobId: {}", tableNameWithType,
rebalanceJobId);
return new RebalanceSummaryResult(serverInfo, segmentInfo);
}
+ private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
getConsumingSegmentSummary(String tableNameWithType,
+ Map<String, Set<String>> newServersToConsumingSegmentMap) {
+ int numConsumingSegmentsToBeMoved =
+ newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) ->
a + b.size(), Integer::sum);
+ Set<String> uniqueConsumingSegments =
+
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+ Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new
HashMap<>();
+ uniqueConsumingSegments.forEach(segment ->
consumingSegmentZKmetadata.put(segment,
+
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
tableNameWithType, segment)));
+ Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+ getConsumingSegmentsOffsetsToCatchUp(tableNameWithType,
consumingSegmentZKmetadata);
+ Map<String, Integer> consumingSegmentsAge =
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+ Map<String, Integer> topTenOffset;
+ Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer>
consumingSegmentSummaryPerServer =
+ new HashMap<>();
+ if (consumingSegmentsOffsetsToCatchUp != null) {
+ topTenOffset = new LinkedHashMap<>();
+ consumingSegmentsOffsetsToCatchUp.entrySet()
+ .stream()
+ .sorted(
+ Collections.reverseOrder(Map.Entry.comparingByValue()))
+ .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+ .forEach(entry -> topTenOffset.put(entry.getKey(),
entry.getValue()));
Review Comment:
any chance this can be made into a util? looks like the logic is the same
for both this and the top segments by age calculation?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -252,6 +338,7 @@ public static class SegmentInfo {
private final RebalanceChangeInfo _numSegmentsInSingleReplica;
@JsonInclude(JsonInclude.Include.NON_NULL)
private final RebalanceChangeInfo _numSegmentsAcrossAllReplicas;
+ private final ConsumingSegmentToBeMovedSummary
_consumingSegmentToBeMovedSummary;
Review Comment:
can we also add the annotation for
`@JsonInclude(JsonInclude.Include.NON_NULL)` here?
we can probably revisit all these in a later PR and remove where it isn't
necessary
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -625,22 +651,35 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails) {
LOGGER.info("Calculating rebalance summary for table: {} with
rebalanceJobId: {}",
tableNameWithType, rebalanceJobId);
+ boolean isOfflineTable =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType) ==
TableType.OFFLINE;
int existingReplicationFactor = 0;
int newReplicationFactor = 0;
Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> existingServersToConsumingSegmentMap =
isOfflineTable ? null : new HashMap<>();
+ Map<String, Set<String>> newServersToConsumingSegmentMap = isOfflineTable
? null : new HashMap<>();
for (Map.Entry<String, Map<String, String>> entrySet :
currentAssignment.entrySet()) {
existingReplicationFactor = entrySet.getValue().size();
for (String segmentKey : entrySet.getValue().keySet()) {
existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ if (existingServersToConsumingSegmentMap != null && entrySet.getValue()
+ .get(segmentKey)
+ .equals(SegmentStateModel.CONSUMING)) {
+ existingServersToConsumingSegmentMap.computeIfAbsent(segmentKey, k
-> new HashSet<>()).add(entrySet.getKey());
+ }
Review Comment:
shouldn't you only add the segment if all the server's state is CONSUMING?
or maybe I misunderstood this code?
Good to have a test case for this scenario too (yet to look at tests)
e.g.
```
segment_1:
server_1: CONSUMIG,
server_2: CONSUMING,
server_3: CONSUMING
segment_2:
server_1: ONLINE,
server_2: CONSUMING,
server_3: CONSUMING
```
In the above, only segment_1 should be added, right?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -625,22 +651,35 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails) {
LOGGER.info("Calculating rebalance summary for table: {} with
rebalanceJobId: {}",
tableNameWithType, rebalanceJobId);
+ boolean isOfflineTable =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType) ==
TableType.OFFLINE;
int existingReplicationFactor = 0;
int newReplicationFactor = 0;
Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> existingServersToConsumingSegmentMap =
isOfflineTable ? null : new HashMap<>();
+ Map<String, Set<String>> newServersToConsumingSegmentMap = isOfflineTable
? null : new HashMap<>();
for (Map.Entry<String, Map<String, String>> entrySet :
currentAssignment.entrySet()) {
existingReplicationFactor = entrySet.getValue().size();
for (String segmentKey : entrySet.getValue().keySet()) {
existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ if (existingServersToConsumingSegmentMap != null && entrySet.getValue()
+ .get(segmentKey)
+ .equals(SegmentStateModel.CONSUMING)) {
+ existingServersToConsumingSegmentMap.computeIfAbsent(segmentKey, k
-> new HashSet<>()).add(entrySet.getKey());
+ }
}
}
for (Map.Entry<String, Map<String, String>> entrySet :
targetAssignment.entrySet()) {
newReplicationFactor = entrySet.getValue().size();
for (String segmentKey : entrySet.getValue().keySet()) {
newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ if (newServersToConsumingSegmentMap != null && entrySet.getValue()
+ .get(segmentKey)
+ .equals(SegmentStateModel.CONSUMING)) {
+ newServersToConsumingSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ }
Review Comment:
shouldn't you only add the segment if all the server's state is CONSUMING?
or maybe I misunderstood this code?
Good to have a test case for this scenario too (yet to look at tests)
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -240,6 +240,92 @@ public Map<String, ServerSegmentChangeInfo>
getServerSegmentChangeInfo() {
}
}
+ public static class ConsumingSegmentSummaryPerServer {
+ private final int _numConsumingSegmentToBeAdded;
+ private final Integer _totalOffsetsNeedToCatchUp;
+
+ @JsonCreator
+ public ConsumingSegmentSummaryPerServer(
+ @JsonProperty("numConsumingSegmentToBeAdded") int
numConsumingSegmentToBeAdded,
+ @JsonProperty("totalOffsetsNeedToCatchUp") @Nullable Integer
totalOffsetsNeedToCatchUp) {
+ _numConsumingSegmentToBeAdded = numConsumingSegmentToBeAdded;
+ _totalOffsetsNeedToCatchUp = totalOffsetsNeedToCatchUp;
+ }
+
+ @JsonProperty
+ public int getNumConsumingSegmentToBeAdded() {
+ return _numConsumingSegmentToBeAdded;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Integer getTotalOffsetsNeedToCatchUp() {
+ return _totalOffsetsNeedToCatchUp;
+ }
+ }
+
+ public static class ConsumingSegmentToBeMovedSummary {
+ private final int _numConsumingSegmentsToBeMoved;
+ private final int _numServerGettingConsumingSegmentsAdded;
+ private final Map<String, Integer> _topConsumingSegmentsOffsetsToCatchUp;
+ private final Map<String, Integer>
_oldestConsumingSegmentsToBeMovedInMinutes;
Review Comment:
can you add a comment explaining what is meant by oldest here, i.e. it's
related to the segment creation time? to ensure that people don't assume this
has anything to do with when the event was added to Kafka
nitL perhaps rename it to:
`oldestCreationTimeInMinutesConsumingSegmentsToBeMoved` (I know it's long, but
don't want people to think we're looking at oldest to mean the Kafka timestamp)
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java:
##########
@@ -214,7 +214,7 @@ public String forTableRebalance(String tableName, String
tableType, boolean dryR
if (reassignInstances) {
stringBuilder.append("&reassignInstances=").append(reassignInstances);
}
- if (includeConsuming) {
+ if (!includeConsuming) {
Review Comment:
why was this change made?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -240,6 +240,92 @@ public Map<String, ServerSegmentChangeInfo>
getServerSegmentChangeInfo() {
}
}
+ public static class ConsumingSegmentSummaryPerServer {
+ private final int _numConsumingSegmentToBeAdded;
+ private final Integer _totalOffsetsNeedToCatchUp;
+
+ @JsonCreator
+ public ConsumingSegmentSummaryPerServer(
+ @JsonProperty("numConsumingSegmentToBeAdded") int
numConsumingSegmentToBeAdded,
+ @JsonProperty("totalOffsetsNeedToCatchUp") @Nullable Integer
totalOffsetsNeedToCatchUp) {
+ _numConsumingSegmentToBeAdded = numConsumingSegmentToBeAdded;
+ _totalOffsetsNeedToCatchUp = totalOffsetsNeedToCatchUp;
+ }
+
+ @JsonProperty
+ public int getNumConsumingSegmentToBeAdded() {
+ return _numConsumingSegmentToBeAdded;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Integer getTotalOffsetsNeedToCatchUp() {
+ return _totalOffsetsNeedToCatchUp;
+ }
+ }
+
+ public static class ConsumingSegmentToBeMovedSummary {
+ private final int _numConsumingSegmentsToBeMoved;
+ private final int _numServerGettingConsumingSegmentsAdded;
+ private final Map<String, Integer> _topConsumingSegmentsOffsetsToCatchUp;
Review Comment:
nit: let's rename this to: topConsumingSegmentsWithMaxOffsetsToCatchUp
or something similar?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -711,6 +750,15 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
}
}
+ if (existingServersToConsumingSegmentMap != null &&
newServersToConsumingSegmentMap != null) {
+ for (Map.Entry<String, Set<String>> entry :
newServersToConsumingSegmentMap.entrySet()) {
+ String server = entry.getKey();
+
entry.getValue().removeAll(existingServersToConsumingSegmentMap.getOrDefault(server,
Collections.emptySet()));
+ }
+ // turn the map into server -> consuming segments added
Review Comment:
nit: should this comment be moved further up? I think the code below just
removes empty sets?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -314,6 +404,12 @@ public RebalanceChangeInfo getNumSegmentsInSingleReplica()
{
public RebalanceChangeInfo getNumSegmentsAcrossAllReplicas() {
return _numSegmentsAcrossAllReplicas;
}
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
Review Comment:
nit: can you add this annotation to all getters in this
`RebalanceSummaryResult` class where the field has been annotated with this and
just do a quick verification?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -625,22 +651,35 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails) {
LOGGER.info("Calculating rebalance summary for table: {} with
rebalanceJobId: {}",
tableNameWithType, rebalanceJobId);
+ boolean isOfflineTable =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType) ==
TableType.OFFLINE;
int existingReplicationFactor = 0;
int newReplicationFactor = 0;
Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> existingServersToConsumingSegmentMap =
isOfflineTable ? null : new HashMap<>();
+ Map<String, Set<String>> newServersToConsumingSegmentMap = isOfflineTable
? null : new HashMap<>();
for (Map.Entry<String, Map<String, String>> entrySet :
currentAssignment.entrySet()) {
existingReplicationFactor = entrySet.getValue().size();
Review Comment:
nit: let's add a variable to get `entrySet.getKey()` and use that instead of
calling `entrySet.getKey()` over and over again.
e.g. `String segmentName = entrySet.getKey();`
can you also rename `segmentKey` -> `instanceName` (since most likely your
PR will be merged before mine that fixes this)
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -625,22 +651,35 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails) {
LOGGER.info("Calculating rebalance summary for table: {} with
rebalanceJobId: {}",
tableNameWithType, rebalanceJobId);
+ boolean isOfflineTable =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType) ==
TableType.OFFLINE;
int existingReplicationFactor = 0;
int newReplicationFactor = 0;
Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> existingServersToConsumingSegmentMap =
isOfflineTable ? null : new HashMap<>();
+ Map<String, Set<String>> newServersToConsumingSegmentMap = isOfflineTable
? null : new HashMap<>();
for (Map.Entry<String, Map<String, String>> entrySet :
currentAssignment.entrySet()) {
existingReplicationFactor = entrySet.getValue().size();
for (String segmentKey : entrySet.getValue().keySet()) {
existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ if (existingServersToConsumingSegmentMap != null && entrySet.getValue()
+ .get(segmentKey)
+ .equals(SegmentStateModel.CONSUMING)) {
+ existingServersToConsumingSegmentMap.computeIfAbsent(segmentKey, k
-> new HashSet<>()).add(entrySet.getKey());
+ }
}
}
for (Map.Entry<String, Map<String, String>> entrySet :
targetAssignment.entrySet()) {
newReplicationFactor = entrySet.getValue().size();
Review Comment:
nit: let's add a variable to get entrySet.getKey() and use that instead of
calling entrySet.getKey() over and over again.
e.g. String segmentName = entrySet.getKey();
can you also rename segmentKey -> instanceName (since most likely your PR
will be merged before mine that fixes this)
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -711,6 +750,15 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
}
}
+ if (existingServersToConsumingSegmentMap != null &&
newServersToConsumingSegmentMap != null) {
+ for (Map.Entry<String, Set<String>> entry :
newServersToConsumingSegmentMap.entrySet()) {
+ String server = entry.getKey();
+
entry.getValue().removeAll(existingServersToConsumingSegmentMap.getOrDefault(server,
Collections.emptySet()));
Review Comment:
nit: recommend having a variable for `entry.getValue()` since it is accessed
more than once
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +781,148 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
serversGettingNewSegments, serverSegmentChangeInfoMap);
// TODO: Add a metric to estimate the total time it will take to
rebalance. Need some good heuristics on how
// rebalance time can vary with number of segments added
+ RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
consumingSegmentToBeMovedSummary =
+ isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType,
newServersToConsumingSegmentMap);
RebalanceSummaryResult.SegmentInfo segmentInfo = new
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
maxSegmentsAddedToServer, averageSegmentSizeInBytes,
totalEstimatedDataToBeMovedInBytes,
- replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas);
+ replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary);
LOGGER.info("Calculated rebalance summary for table: {} with
rebalanceJobId: {}", tableNameWithType,
rebalanceJobId);
return new RebalanceSummaryResult(serverInfo, segmentInfo);
}
+ private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
getConsumingSegmentSummary(String tableNameWithType,
+ Map<String, Set<String>> newServersToConsumingSegmentMap) {
+ int numConsumingSegmentsToBeMoved =
+ newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) ->
a + b.size(), Integer::sum);
+ Set<String> uniqueConsumingSegments =
+
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+ Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new
HashMap<>();
+ uniqueConsumingSegments.forEach(segment ->
consumingSegmentZKmetadata.put(segment,
+
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
tableNameWithType, segment)));
+ Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+ getConsumingSegmentsOffsetsToCatchUp(tableNameWithType,
consumingSegmentZKmetadata);
+ Map<String, Integer> consumingSegmentsAge =
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+ Map<String, Integer> topTenOffset;
+ Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer>
consumingSegmentSummaryPerServer =
+ new HashMap<>();
+ if (consumingSegmentsOffsetsToCatchUp != null) {
+ topTenOffset = new LinkedHashMap<>();
+ consumingSegmentsOffsetsToCatchUp.entrySet()
+ .stream()
+ .sorted(
+ Collections.reverseOrder(Map.Entry.comparingByValue()))
+ .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+ .forEach(entry -> topTenOffset.put(entry.getKey(),
entry.getValue()));
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ int totalOffsetsToCatchUp =
+
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+ consumingSegmentSummaryPerServer.put(server, new
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+ segments.size(), totalOffsetsToCatchUp));
+ });
+ } else {
+ topTenOffset = null;
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ consumingSegmentSummaryPerServer.put(server, new
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+ segments.size(), null));
+ });
+ }
+
+ Map<String, Integer> oldestTenSegment;
+ oldestTenSegment = new LinkedHashMap<>();
+ consumingSegmentsAge.entrySet()
+ .stream()
+ .sorted(
+ Map.Entry.comparingByValue())
+ .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+ .forEach(entry -> oldestTenSegment.put(entry.getKey(),
entry.getValue()));
+
+ return new
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+ newServersToConsumingSegmentMap.size(), topTenOffset,
oldestTenSegment, consumingSegmentSummaryPerServer);
+ }
+
+ private Map<String, Integer> getConsumingSegmentsAge(String
tableNameWithType,
+ Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+ Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+ long now = System.currentTimeMillis();
+ consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
+ long creationTime = segmentZKMetadata.getCreationTime();
+ if (creationTime < 0) {
+ LOGGER.warn("Creation time is not found for segment: {} in table: {}",
s, tableNameWithType);
+ return;
+ }
+ consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+ }));
+ return consumingSegmentsAge;
+ }
+
+ @VisibleForTesting
+ ConsumingSegmentInfoReader getConsumingSegmentInfoReader() {
+ if (_executorService == null || _connectionManager == null ||
_pinotHelixResourceManager == null) {
+ return null;
+ }
+ return new ConsumingSegmentInfoReader(_executorService,
_connectionManager, _pinotHelixResourceManager);
+ }
+
+ /**
+ * Fetches the consuming segment info for the table and calculates the
number of offsets to catch up for each
+ * consuming segment. consumingSegmentZKMetadata is a map from consuming
segments to be moved to their ZK metadata.
+ * Returns a map from segment name to the number of offsets to catch up for
that consuming
+ * segment. Return null if failed to obtain info for any consuming segment.
+ */
+ private Map<String, Integer> getConsumingSegmentsOffsetsToCatchUp(String
tableNameWithType,
Review Comment:
nit: let's rename: `getTopConsumingSegmentsByNumOffsetsToCatchUp`
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +781,148 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
serversGettingNewSegments, serverSegmentChangeInfoMap);
// TODO: Add a metric to estimate the total time it will take to
rebalance. Need some good heuristics on how
// rebalance time can vary with number of segments added
+ RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
consumingSegmentToBeMovedSummary =
+ isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType,
newServersToConsumingSegmentMap);
RebalanceSummaryResult.SegmentInfo segmentInfo = new
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
maxSegmentsAddedToServer, averageSegmentSizeInBytes,
totalEstimatedDataToBeMovedInBytes,
- replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas);
+ replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary);
LOGGER.info("Calculated rebalance summary for table: {} with
rebalanceJobId: {}", tableNameWithType,
rebalanceJobId);
return new RebalanceSummaryResult(serverInfo, segmentInfo);
}
+ private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
getConsumingSegmentSummary(String tableNameWithType,
+ Map<String, Set<String>> newServersToConsumingSegmentMap) {
+ int numConsumingSegmentsToBeMoved =
+ newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) ->
a + b.size(), Integer::sum);
+ Set<String> uniqueConsumingSegments =
+
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+ Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new
HashMap<>();
+ uniqueConsumingSegments.forEach(segment ->
consumingSegmentZKmetadata.put(segment,
+
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
tableNameWithType, segment)));
+ Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+ getConsumingSegmentsOffsetsToCatchUp(tableNameWithType,
consumingSegmentZKmetadata);
+ Map<String, Integer> consumingSegmentsAge =
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+ Map<String, Integer> topTenOffset;
Review Comment:
nit: let's rename to topOffsets?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +781,148 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
serversGettingNewSegments, serverSegmentChangeInfoMap);
// TODO: Add a metric to estimate the total time it will take to
rebalance. Need some good heuristics on how
// rebalance time can vary with number of segments added
+ RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
consumingSegmentToBeMovedSummary =
+ isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType,
newServersToConsumingSegmentMap);
RebalanceSummaryResult.SegmentInfo segmentInfo = new
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
maxSegmentsAddedToServer, averageSegmentSizeInBytes,
totalEstimatedDataToBeMovedInBytes,
- replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas);
+ replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary);
LOGGER.info("Calculated rebalance summary for table: {} with
rebalanceJobId: {}", tableNameWithType,
rebalanceJobId);
return new RebalanceSummaryResult(serverInfo, segmentInfo);
}
+ private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
getConsumingSegmentSummary(String tableNameWithType,
+ Map<String, Set<String>> newServersToConsumingSegmentMap) {
+ int numConsumingSegmentsToBeMoved =
+ newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) ->
a + b.size(), Integer::sum);
+ Set<String> uniqueConsumingSegments =
+
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+ Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new
HashMap<>();
+ uniqueConsumingSegments.forEach(segment ->
consumingSegmentZKmetadata.put(segment,
+
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
tableNameWithType, segment)));
+ Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+ getConsumingSegmentsOffsetsToCatchUp(tableNameWithType,
consumingSegmentZKmetadata);
+ Map<String, Integer> consumingSegmentsAge =
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+ Map<String, Integer> topTenOffset;
+ Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer>
consumingSegmentSummaryPerServer =
+ new HashMap<>();
+ if (consumingSegmentsOffsetsToCatchUp != null) {
+ topTenOffset = new LinkedHashMap<>();
+ consumingSegmentsOffsetsToCatchUp.entrySet()
+ .stream()
+ .sorted(
+ Collections.reverseOrder(Map.Entry.comparingByValue()))
+ .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+ .forEach(entry -> topTenOffset.put(entry.getKey(),
entry.getValue()));
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ int totalOffsetsToCatchUp =
+
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+ consumingSegmentSummaryPerServer.put(server, new
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+ segments.size(), totalOffsetsToCatchUp));
+ });
+ } else {
+ topTenOffset = null;
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ consumingSegmentSummaryPerServer.put(server, new
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+ segments.size(), null));
+ });
+ }
+
+ Map<String, Integer> oldestTenSegment;
+ oldestTenSegment = new LinkedHashMap<>();
+ consumingSegmentsAge.entrySet()
+ .stream()
+ .sorted(
+ Map.Entry.comparingByValue())
+ .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+ .forEach(entry -> oldestTenSegment.put(entry.getKey(),
entry.getValue()));
+
+ return new
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+ newServersToConsumingSegmentMap.size(), topTenOffset,
oldestTenSegment, consumingSegmentSummaryPerServer);
+ }
+
+ private Map<String, Integer> getConsumingSegmentsAge(String
tableNameWithType,
Review Comment:
nit: let's add a comment to explain what this function does as well?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +781,148 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
serversGettingNewSegments, serverSegmentChangeInfoMap);
// TODO: Add a metric to estimate the total time it will take to
rebalance. Need some good heuristics on how
// rebalance time can vary with number of segments added
+ RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
consumingSegmentToBeMovedSummary =
+ isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType,
newServersToConsumingSegmentMap);
RebalanceSummaryResult.SegmentInfo segmentInfo = new
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
maxSegmentsAddedToServer, averageSegmentSizeInBytes,
totalEstimatedDataToBeMovedInBytes,
- replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas);
+ replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary);
LOGGER.info("Calculated rebalance summary for table: {} with
rebalanceJobId: {}", tableNameWithType,
rebalanceJobId);
return new RebalanceSummaryResult(serverInfo, segmentInfo);
}
+ private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
getConsumingSegmentSummary(String tableNameWithType,
+ Map<String, Set<String>> newServersToConsumingSegmentMap) {
+ int numConsumingSegmentsToBeMoved =
+ newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) ->
a + b.size(), Integer::sum);
+ Set<String> uniqueConsumingSegments =
+
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+ Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new
HashMap<>();
+ uniqueConsumingSegments.forEach(segment ->
consumingSegmentZKmetadata.put(segment,
+
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
tableNameWithType, segment)));
+ Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+ getConsumingSegmentsOffsetsToCatchUp(tableNameWithType,
consumingSegmentZKmetadata);
+ Map<String, Integer> consumingSegmentsAge =
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+ Map<String, Integer> topTenOffset;
+ Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer>
consumingSegmentSummaryPerServer =
+ new HashMap<>();
+ if (consumingSegmentsOffsetsToCatchUp != null) {
+ topTenOffset = new LinkedHashMap<>();
+ consumingSegmentsOffsetsToCatchUp.entrySet()
+ .stream()
+ .sorted(
+ Collections.reverseOrder(Map.Entry.comparingByValue()))
+ .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+ .forEach(entry -> topTenOffset.put(entry.getKey(),
entry.getValue()));
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ int totalOffsetsToCatchUp =
+
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+ consumingSegmentSummaryPerServer.put(server, new
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+ segments.size(), totalOffsetsToCatchUp));
+ });
+ } else {
+ topTenOffset = null;
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ consumingSegmentSummaryPerServer.put(server, new
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+ segments.size(), null));
+ });
+ }
+
+ Map<String, Integer> oldestTenSegment;
Review Comment:
nit: let's rename: `oldestConsumingSegments`
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +781,148 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
serversGettingNewSegments, serverSegmentChangeInfoMap);
// TODO: Add a metric to estimate the total time it will take to
rebalance. Need some good heuristics on how
// rebalance time can vary with number of segments added
+ RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
consumingSegmentToBeMovedSummary =
+ isOfflineTable ? null : getConsumingSegmentSummary(tableNameWithType,
newServersToConsumingSegmentMap);
RebalanceSummaryResult.SegmentInfo segmentInfo = new
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
maxSegmentsAddedToServer, averageSegmentSizeInBytes,
totalEstimatedDataToBeMovedInBytes,
- replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas);
+ replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary);
LOGGER.info("Calculated rebalance summary for table: {} with
rebalanceJobId: {}", tableNameWithType,
rebalanceJobId);
return new RebalanceSummaryResult(serverInfo, segmentInfo);
}
+ private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
getConsumingSegmentSummary(String tableNameWithType,
+ Map<String, Set<String>> newServersToConsumingSegmentMap) {
+ int numConsumingSegmentsToBeMoved =
+ newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) ->
a + b.size(), Integer::sum);
+ Set<String> uniqueConsumingSegments =
+
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+ Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new
HashMap<>();
+ uniqueConsumingSegments.forEach(segment ->
consumingSegmentZKmetadata.put(segment,
+
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
tableNameWithType, segment)));
+ Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+ getConsumingSegmentsOffsetsToCatchUp(tableNameWithType,
consumingSegmentZKmetadata);
+ Map<String, Integer> consumingSegmentsAge =
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+ Map<String, Integer> topTenOffset;
+ Map<String, RebalanceSummaryResult.ConsumingSegmentSummaryPerServer>
consumingSegmentSummaryPerServer =
+ new HashMap<>();
+ if (consumingSegmentsOffsetsToCatchUp != null) {
+ topTenOffset = new LinkedHashMap<>();
+ consumingSegmentsOffsetsToCatchUp.entrySet()
+ .stream()
+ .sorted(
+ Collections.reverseOrder(Map.Entry.comparingByValue()))
+ .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+ .forEach(entry -> topTenOffset.put(entry.getKey(),
entry.getValue()));
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ int totalOffsetsToCatchUp =
+
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+ consumingSegmentSummaryPerServer.put(server, new
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+ segments.size(), totalOffsetsToCatchUp));
+ });
+ } else {
+ topTenOffset = null;
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ consumingSegmentSummaryPerServer.put(server, new
RebalanceSummaryResult.ConsumingSegmentSummaryPerServer(
+ segments.size(), null));
+ });
+ }
+
+ Map<String, Integer> oldestTenSegment;
+ oldestTenSegment = new LinkedHashMap<>();
+ consumingSegmentsAge.entrySet()
+ .stream()
+ .sorted(
+ Map.Entry.comparingByValue())
+ .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+ .forEach(entry -> oldestTenSegment.put(entry.getKey(),
entry.getValue()));
+
+ return new
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+ newServersToConsumingSegmentMap.size(), topTenOffset,
oldestTenSegment, consumingSegmentSummaryPerServer);
+ }
+
+ private Map<String, Integer> getConsumingSegmentsAge(String
tableNameWithType,
+ Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+ Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+ long now = System.currentTimeMillis();
+ consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
+ long creationTime = segmentZKMetadata.getCreationTime();
+ if (creationTime < 0) {
+ LOGGER.warn("Creation time is not found for segment: {} in table: {}",
s, tableNameWithType);
+ return;
+ }
+ consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+ }));
+ return consumingSegmentsAge;
+ }
+
+ @VisibleForTesting
+ ConsumingSegmentInfoReader getConsumingSegmentInfoReader() {
+ if (_executorService == null || _connectionManager == null ||
_pinotHelixResourceManager == null) {
+ return null;
+ }
+ return new ConsumingSegmentInfoReader(_executorService,
_connectionManager, _pinotHelixResourceManager);
+ }
+
+ /**
+ * Fetches the consuming segment info for the table and calculates the
number of offsets to catch up for each
+ * consuming segment. consumingSegmentZKMetadata is a map from consuming
segments to be moved to their ZK metadata.
+ * Returns a map from segment name to the number of offsets to catch up for
that consuming
+ * segment. Return null if failed to obtain info for any consuming segment.
+ */
+ private Map<String, Integer> getConsumingSegmentsOffsetsToCatchUp(String
tableNameWithType,
+ Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+ if (consumingSegmentZKMetadata.isEmpty()) {
+ LOGGER.info("No consuming segments being moved for table: {}",
tableNameWithType);
+ return new HashMap<>();
+ }
+ ConsumingSegmentInfoReader consumingSegmentInfoReader =
getConsumingSegmentInfoReader();
+ if (consumingSegmentInfoReader == null) {
+ LOGGER.warn("ConsumingSegmentInfoReader is null, cannot calculate
consuming segments info for table: {}",
+ tableNameWithType);
+ return null;
+ }
+ Map<String, Integer> segmentToOffsetsToCatchUp = new HashMap<>();
+ try {
+ ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap
consumingSegmentsInfoMap =
+
consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 30_000);
+ for (Map.Entry<String, SegmentZKMetadata> entry :
consumingSegmentZKMetadata.entrySet()) {
+ String segmentName = entry.getKey();
+ List<ConsumingSegmentInfoReader.ConsumingSegmentInfo>
consumingSegmentInfoList =
+
consumingSegmentsInfoMap._segmentToConsumingInfoMap.getOrDefault(segmentName,
null);
+ SegmentZKMetadata segmentZKMetadata = entry.getValue();
+ if (segmentZKMetadata == null) {
+ LOGGER.warn("Cannot find SegmentZKMetadata for segment: {} in table:
{}", segmentName, tableNameWithType);
+ return null;
+ }
+ String startOffset = segmentZKMetadata.getStartOffset();
+ if (startOffset == null) {
+ LOGGER.warn("Start offset is null for segment: {} in table: {}",
segmentName, tableNameWithType);
+ return null;
+ }
+ if (consumingSegmentInfoList != null &&
!consumingSegmentInfoList.isEmpty()) {
+ // this value should be the same regardless of which server the
consuming segment info is from, use the
+ // first in the list here
+ int offsetsToCatchUp =
+
consumingSegmentInfoList.get(0)._partitionOffsetInfo._latestUpstreamOffsetMap.values()
Review Comment:
should there be a null check for `_partitionOffsetInfo` and
`_latestUpstreamOffsetMap`? (I know I ran into `_latestUpstreamOffsetMap` being
null for Kinesis)
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +781,148 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
serversGettingNewSegments, serverSegmentChangeInfoMap);
// TODO: Add a metric to estimate the total time it will take to
rebalance. Need some good heuristics on how
// rebalance time can vary with number of segments added
+ RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
consumingSegmentToBeMovedSummary =
Review Comment:
would it make sense to skip calling `getConsumingSegmentSummary` altogether
if `newServersToConsumingSegmentMap` is empty?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -243,6 +244,8 @@ private enum LineageUpdateType {
private final LineageManager _lineageManager;
private final RebalancePreChecker _rebalancePreChecker;
private TableSizeReader _tableSizeReader;
+ private final ExecutorService _executorService;
+ private HttpClientConnectionManager _connectionManager;
Review Comment:
cc @xiangfu0 @Jackie-Jiang @klsince
We wanted to know what's the recommendation on passing some of these
objects. I see that both `_executorService` and `_connectionManager` are
available in the Restlet classes. Should we always be passing these down from
there for handling APIs?
And for non-Restlet called paths, do we just create these and pass them in?
Or doing it the way it's done in this PR is fine?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]