somandal commented on code in PR #15368:
URL: https://github.com/apache/pinot/pull/15368#discussion_r2021740211
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -120,16 +130,22 @@
*/
public class TableRebalancer {
private static final Logger LOGGER =
LoggerFactory.getLogger(TableRebalancer.class);
+ private static final int TOP_N_IN_CONSUMING_SEGMENT_SUMMARY = 10;
private final HelixManager _helixManager;
private final HelixDataAccessor _helixDataAccessor;
private final TableRebalanceObserver _tableRebalanceObserver;
private final ControllerMetrics _controllerMetrics;
private final RebalancePreChecker _rebalancePreChecker;
private final TableSizeReader _tableSizeReader;
+ private final ExecutorService _executorService;
+ private final HttpClientConnectionManager _connectionManager;
+ private final PinotHelixResourceManager _pinotHelixResourceManager;
public TableRebalancer(HelixManager helixManager, @Nullable
TableRebalanceObserver tableRebalanceObserver,
@Nullable ControllerMetrics controllerMetrics, @Nullable
RebalancePreChecker rebalancePreChecker,
- @Nullable TableSizeReader tableSizeReader) {
+ @Nullable TableSizeReader tableSizeReader, @Nullable ExecutorService
executorService,
Review Comment:
nit: Since we do pass in `PinotHelixResourceManager` now, can we get the
`TableSizeReader` from that directly rather than passing it in separately?
Similarly check if any of the other parameters here can be fetched from
`PinotHelixResourceManager` directly, just to keep the parameters passed in
cleaner?
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -1570,6 +1580,269 @@ public void
testRebalanceWithMinimizeDataMovementInstanceAssignments()
}
}
+ @Test
+ public void testRebalanceConsumingSegmentSummary()
Review Comment:
For tests, is it possible to update
`OfflineClusterIntegrationTest::testRebalanceDryRunSummary()` with asserts that
the consuming segment summary is null? just to ensure we have that covered for
OFFLINE tables?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -627,22 +653,42 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails,
TableConfig tableConfig) {
LOGGER.info("Calculating rebalance summary for table: {} with
rebalanceJobId: {}",
tableNameWithType, rebalanceJobId);
+ boolean isOfflineTable =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType) ==
TableType.OFFLINE;
int existingReplicationFactor = 0;
int newReplicationFactor = 0;
Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> existingServersToConsumingSegmentMap =
isOfflineTable ? null : new HashMap<>();
+ Map<String, Set<String>> newServersToConsumingSegmentMap = isOfflineTable
? null : new HashMap<>();
for (Map.Entry<String, Map<String, String>> entrySet :
currentAssignment.entrySet()) {
existingReplicationFactor = entrySet.getValue().size();
- for (String segmentKey : entrySet.getValue().keySet()) {
- existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ String segmentName = entrySet.getKey();
+ boolean isSegmentConsuming = existingServersToConsumingSegmentMap !=
null && entrySet.getValue()
+ .values()
+ .stream()
+ .allMatch(state -> state.equals(SegmentStateModel.CONSUMING));
Review Comment:
I looked into this a bit more today for another change I'm making. Based on
my understanding, I think the logic should be:
- If any matches ONLINE, skip this segment (as it is an ONLINE segment)
- otherwise, if any matches CONSUMING, pick this segment
The above logic handles the corner case where nothing is ONLINE, but some
replica is OFFLINE/ERROR state
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -140,10 +156,20 @@ public TableRebalancer(HelixManager helixManager,
@Nullable TableRebalanceObserv
_controllerMetrics = controllerMetrics;
_rebalancePreChecker = rebalancePreChecker;
_tableSizeReader = tableSizeReader;
+ _executorService = executorService;
+ _connectionManager = connectionManager;
+ _pinotHelixResourceManager = pinotHelixResourceManager;
+ }
+
+ public TableRebalancer(HelixManager helixManager, @Nullable
TableRebalanceObserver tableRebalanceObserver,
Review Comment:
nit: is this constructor really needed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]