somandal commented on code in PR #15050:
URL: https://github.com/apache/pinot/pull/15050#discussion_r1962235392
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -559,6 +588,146 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
}
}
+ private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
+ long tableSizePerReplicaInBytes = -1;
+ if (_tableSizeReader == null) {
+ LOGGER.warn("tableSizeReader is null, cannot calculate table size for
table {}!", tableNameWithType);
+ return tableSizePerReplicaInBytes;
+ }
+ LOGGER.info("Fetching the table size for rebalance summary for table: {}",
tableNameWithType);
+ try {
+ // TODO: Consider making the timeoutMs for fetching table size via table
rebalancer configurable
+ TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
+ _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
+ tableSizePerReplicaInBytes =
tableSizeDetails._reportedSizePerReplicaInBytes;
+ } catch (InvalidConfigException e) {
+ LOGGER.error("Caught exception while trying to fetch table size details
for table: {}", tableNameWithType, e);
+ }
+ LOGGER.info("Fetched the table size for rebalance summary for table: {}",
tableNameWithType);
+ return tableSizePerReplicaInBytes;
+ }
+
+ private RebalanceSummaryResult calculateDryRunSummary(Map<String,
Map<String, String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment, String
tableNameWithType, String rebalanceJobId) {
+ LOGGER.info("Calculating rebalance summary for table: {} with
rebalanceJobId: {}",
+ tableNameWithType, rebalanceJobId);
+ int existingReplicationFactor = 0;
+ int newReplicationFactor = 0;
+ Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+ for (Map.Entry<String, Map<String, String>> entrySet :
currentAssignment.entrySet()) {
+ existingReplicationFactor = entrySet.getValue().size();
+ for (String segmentKey : entrySet.getValue().keySet()) {
+ existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ }
+ }
+
+ for (Map.Entry<String, Map<String, String>> entrySet :
targetAssignment.entrySet()) {
+ newReplicationFactor = entrySet.getValue().size();
+ for (String segmentKey : entrySet.getValue().keySet()) {
+ newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ }
+ }
+ RebalanceSummaryResult.RebalanceChangeInfo replicationFactor
+ = new
RebalanceSummaryResult.RebalanceChangeInfo(existingReplicationFactor,
newReplicationFactor);
+
+ int existingNumServers = existingServersToSegmentMap.size();
+ int newNumServers = newServersToSegmentMap.size();
+ RebalanceSummaryResult.RebalanceChangeInfo numServers
+ = new RebalanceSummaryResult.RebalanceChangeInfo(existingNumServers,
newNumServers);
+
+ List<InstanceConfig> instanceConfigs = _helixDataAccessor.getChildValues(
+ _helixDataAccessor.keyBuilder().instanceConfigs(), true);
+ Map<String, List<String>> instanceToTagsMap = new HashMap<>();
+ for (InstanceConfig instanceConfig : instanceConfigs) {
+ instanceToTagsMap.put(instanceConfig.getInstanceName(),
instanceConfig.getTags());
+ }
+
+ Set<String> serversAdded = new HashSet<>();
+ Set<String> serversRemoved = new HashSet<>();
+ Set<String> serversUnchanged = new HashSet<>();
+ Set<String> serversGettingNewSegments = new HashSet<>();
+ Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo>
serverSegmentChangeInfoMap = new HashMap<>();
+ int segmentsNotMoved = 0;
+ int maxSegmentsAddedToServer = 0;
+ for (Map.Entry<String, Set<String>> entry :
newServersToSegmentMap.entrySet()) {
+ String server = entry.getKey();
+ Set<String> segmentSet = entry.getValue();
+ int totalNewSegments = segmentSet.size();
+
+ Set<String> newSegmentList = new HashSet<>(segmentSet);
Review Comment:
oops, done
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -559,6 +588,146 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
}
}
+ private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
+ long tableSizePerReplicaInBytes = -1;
+ if (_tableSizeReader == null) {
+ LOGGER.warn("tableSizeReader is null, cannot calculate table size for
table {}!", tableNameWithType);
+ return tableSizePerReplicaInBytes;
+ }
+ LOGGER.info("Fetching the table size for rebalance summary for table: {}",
tableNameWithType);
+ try {
+ // TODO: Consider making the timeoutMs for fetching table size via table
rebalancer configurable
+ TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
+ _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
+ tableSizePerReplicaInBytes =
tableSizeDetails._reportedSizePerReplicaInBytes;
+ } catch (InvalidConfigException e) {
+ LOGGER.error("Caught exception while trying to fetch table size details
for table: {}", tableNameWithType, e);
+ }
+ LOGGER.info("Fetched the table size for rebalance summary for table: {}",
tableNameWithType);
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]