somandal commented on code in PR #15050:
URL: https://github.com/apache/pinot/pull/15050#discussion_r1960750727
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -559,6 +588,147 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
}
}
+ private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
+ long tableSizePerReplicaInBytes = -1;
+ if (_tableSizeReader == null) {
+ LOGGER.warn("tableSizeReader is null, cannot calculate table size for
table {}!", tableNameWithType);
+ return tableSizePerReplicaInBytes;
+ }
+ try {
+ TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
+ _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
+ tableSizePerReplicaInBytes =
tableSizeDetails._reportedSizePerReplicaInBytes;
+ } catch (InvalidConfigException e) {
+ String errMsg = String.format("Caught exception while trying to fetch
table size details for table: %s",
+ tableNameWithType);
+ LOGGER.error(errMsg, e);
+ }
+ return tableSizePerReplicaInBytes;
+ }
+
+ private RebalanceSummaryResult calculateDryRunSummary(Map<String,
Map<String, String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment, String
tableNameWithType, String rebalanceJobId) {
+ LOGGER.info("Calculating rebalance summary for table: {} with
rebalanceJobId: {}",
+ tableNameWithType, rebalanceJobId);
+ int existingReplicationFactor = 0;
+ int newReplicationFactor = 0;
+ Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+ for (Map.Entry<String, Map<String, String>> entrySet :
currentAssignment.entrySet()) {
+ existingReplicationFactor = entrySet.getValue().size();
+ for (Map.Entry<String, String> segmentEntrySet :
entrySet.getValue().entrySet()) {
+ existingServersToSegmentMap.putIfAbsent(segmentEntrySet.getKey(), new
HashSet<>());
+
existingServersToSegmentMap.get(segmentEntrySet.getKey()).add(entrySet.getKey());
+ }
+ }
+
+ for (Map.Entry<String, Map<String, String>> entrySet :
targetAssignment.entrySet()) {
+ newReplicationFactor = entrySet.getValue().size();
+ for (Map.Entry<String, String> segmentEntrySet :
entrySet.getValue().entrySet()) {
+ newServersToSegmentMap.putIfAbsent(segmentEntrySet.getKey(), new
HashSet<>());
+
newServersToSegmentMap.get(segmentEntrySet.getKey()).add(entrySet.getKey());
+ }
+ }
+ RebalanceSummaryResult.RebalanceChangeInfo replicationFactor
+ = new
RebalanceSummaryResult.RebalanceChangeInfo(existingReplicationFactor,
newReplicationFactor);
+
+ int existingNumServers = existingServersToSegmentMap.keySet().size();
+ int newNumServers = newServersToSegmentMap.keySet().size();
+ RebalanceSummaryResult.RebalanceChangeInfo numServers
+ = new RebalanceSummaryResult.RebalanceChangeInfo(existingNumServers,
newNumServers);
+
+ List<InstanceConfig> instanceConfigs = _helixDataAccessor.getChildValues(
+ _helixDataAccessor.keyBuilder().instanceConfigs(), true);
+ Map<String, List<String>> instanceToTagsMap = new HashMap<>();
+ for (InstanceConfig instanceConfig : instanceConfigs) {
+ instanceToTagsMap.put(instanceConfig.getInstanceName(),
instanceConfig.getTags());
+ }
+
+ Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo>
serverSegmentChangeInfoMap = new HashMap<>();
+ int segmentsNotMoved = 0;
+ int numServersGettingDataAdded = 0;
+ for (Map.Entry<String, Set<String>> entry :
newServersToSegmentMap.entrySet()) {
+ String server = entry.getKey();
+ Set<String> segmentMap = entry.getValue();
+ int totalNewSegments = segmentMap.size();
+
+ Set<String> newSegmentList = new HashSet<>(segmentMap);
+ Set<String> existingSegmentList = new HashSet<>();
+ int segmentsUnchanged = 0;
+ int totalExistingSegments = 0;
+ RebalanceSummaryResult.ServerStatus serverStatus =
RebalanceSummaryResult.ServerStatus.ADDED;
+ if (existingServersToSegmentMap.containsKey(server)) {
+ totalExistingSegments = existingServersToSegmentMap.get(server).size();
+ existingSegmentList.addAll(existingServersToSegmentMap.get(server));
+ Set<String> intersection = new
HashSet<>(existingServersToSegmentMap.get(server));
+ intersection.retainAll(newSegmentList);
+ segmentsUnchanged = intersection.size();
+ segmentsNotMoved += segmentsUnchanged;
+ serverStatus = RebalanceSummaryResult.ServerStatus.UNCHANGED;
+ }
+ newSegmentList.removeAll(existingSegmentList);
+ int segmentsAdded = newSegmentList.size();
+ int segmentsDeleted = existingSegmentList.size() - segmentsUnchanged;
+ numServersGettingDataAdded += segmentsAdded > 0 ? 1 : 0;
+
+ serverSegmentChangeInfoMap.put(server, new
RebalanceSummaryResult.ServerSegmentChangeInfo(serverStatus,
+ totalNewSegments, totalExistingSegments, segmentsAdded,
segmentsDeleted, segmentsUnchanged,
+ instanceToTagsMap.getOrDefault(server, null)));
+ }
+
+ for (Map.Entry<String, Set<String>> entry :
existingServersToSegmentMap.entrySet()) {
+ String server = entry.getKey();
+ if (!serverSegmentChangeInfoMap.containsKey(server)) {
+ serverSegmentChangeInfoMap.put(server, new
RebalanceSummaryResult.ServerSegmentChangeInfo(
+ RebalanceSummaryResult.ServerStatus.REMOVED, 0,
entry.getValue().size(), 0, entry.getValue().size(), 0,
+ instanceToTagsMap.getOrDefault(server, null)));
+ }
+ }
+
+ RebalanceSummaryResult.RebalanceChangeInfo numSegmentsInSingleReplica
+ = new
RebalanceSummaryResult.RebalanceChangeInfo(currentAssignment.size(),
targetAssignment.size());
+
+ int existingNumberSegmentsTotal = existingReplicationFactor *
currentAssignment.size();
+ int newNumberSegmentsTotal = newReplicationFactor *
targetAssignment.size();
+ RebalanceSummaryResult.RebalanceChangeInfo numSegmentsAcrossAllReplicas
+ = new
RebalanceSummaryResult.RebalanceChangeInfo(existingNumberSegmentsTotal,
newNumberSegmentsTotal);
+
+ int totalSegmentsToBeMoved = newNumberSegmentsTotal - segmentsNotMoved;
+
+ long tableSizePerReplicaInBytes =
calculateTableSizePerReplicaInBytes(tableNameWithType);
+ long averageSegmentSizeInBytes = tableSizePerReplicaInBytes <= 0 ?
tableSizePerReplicaInBytes
+ : tableSizePerReplicaInBytes / ((long) currentAssignment.size());
+ long totalEstimatedDataToBeMovedInBytes = tableSizePerReplicaInBytes <= 0
? tableSizePerReplicaInBytes
+ : ((long) totalSegmentsToBeMoved) * averageSegmentSizeInBytes;
+ double estimatedTimeToRebalanceInSec =
getEstimatedTimeToRebalanceInSec(totalEstimatedDataToBeMovedInBytes,
+ numServersGettingDataAdded);
+
+ RebalanceSummaryResult.ServerInfo serverInfo = new
RebalanceSummaryResult.ServerInfo(numServersGettingDataAdded,
+ numServers, serverSegmentChangeInfoMap);
+ RebalanceSummaryResult.SegmentInfo segmentInfo = new
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
+ averageSegmentSizeInBytes, totalEstimatedDataToBeMovedInBytes,
estimatedTimeToRebalanceInSec,
+ replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas);
+
+ LOGGER.info("Calculated rebalance summary for table: {} with
rebalanceJobId: {}", tableNameWithType,
+ rebalanceJobId);
+ return new RebalanceSummaryResult(serverInfo, segmentInfo);
+ }
+
+ private static double getEstimatedTimeToRebalanceInSec(long
totalEstimatedDataToBeMovedInBytes,
+ int numServersGettingDataAdded) {
+ double estimatedTimeToRebalanceInSec = totalEstimatedDataToBeMovedInBytes
== 0 ? 0.0 : -1.0;
+ if (totalEstimatedDataToBeMovedInBytes > 0 && numServersGettingDataAdded >
0) {
+ // Do some processing to figure out what the time to download might be
+ // Assume that data is evenly distributed across all servers
+ long totalDataPerServerInBytes = totalEstimatedDataToBeMovedInBytes /
numServersGettingDataAdded;
+ // TODO: Pick a good threshold to calculate estimated time to rebalance.
For now assume 100 MB/s data download
+ // + process rate
+ estimatedTimeToRebalanceInSec = ((double) totalDataPerServerInBytes) /
(100.0D * 1024.0D * 1024.0D);
Review Comment:
yeah, this is a tricky field for sure. I've removed it for now, but added a
TODO for adding it back once we have a better understanding of a good estimate.
This field was an ask for this summary PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]