yashmayya commented on code in PR #17515:
URL: https://github.com/apache/pinot/pull/17515#discussion_r2743803174
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -157,22 +159,25 @@ public class TableRebalancer {
private final RebalancePreChecker _rebalancePreChecker;
private final TableSizeReader _tableSizeReader;
private final PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
+ private final boolean _updateIdealStateInstancePartitions;
public TableRebalancer(HelixManager helixManager, @Nullable
TableRebalanceObserver tableRebalanceObserver,
@Nullable ControllerMetrics controllerMetrics, @Nullable
RebalancePreChecker rebalancePreChecker,
@Nullable TableSizeReader tableSizeReader,
- @Nullable PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager)
{
+ @Nullable PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager,
+ boolean updateIdealStateInstancePartitions) {
_helixManager = helixManager;
_tableRebalanceObserver =
Objects.requireNonNullElseGet(tableRebalanceObserver,
NoOpTableRebalanceObserver::new);
_helixDataAccessor = helixManager.getHelixDataAccessor();
_controllerMetrics = controllerMetrics;
_rebalancePreChecker = rebalancePreChecker;
_tableSizeReader = tableSizeReader;
_pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
+ _updateIdealStateInstancePartitions = updateIdealStateInstancePartitions;
}
public TableRebalancer(HelixManager helixManager) {
- this(helixManager, null, null, null, null, null);
+ this(helixManager, null, null, null, null, null, true);
Review Comment:
This path is mainly used during testing, so I think it makes sense to keep
it enabled to help catch any regressions?
##########
pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java:
##########
@@ -113,6 +114,12 @@ public List<String> getInstances(int partitionId, int
replicaGroupId) {
.get(Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR
+ replicaGroupId);
}
+ /// Given a partition ID and replica group ID like "0_0", return the list of
instances belonging to that instance
+ /// partition
+ public List<String> getInstances(String partitionReplica) {
Review Comment:
It's only being used for testing after the other update now
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1854,7 +1809,7 @@ public void addTable(TableConfig tableConfig,
List<Pair<PartitionGroupMetadata,
Preconditions.checkState(tableConfig != null, "Failed to read table
config for table: %s", tableNameWithType);
// Assign instances
- assignInstances(tableConfig, true);
+ assignInstances(tableConfig, idealState, true);
Review Comment:
Hm I can revert it for the update table path (when `override` is false)
which only does instance assignment for new instance partitions (and rely on
table rebalance to update like in other paths). But for new tables this is
required, since they wouldn't have this ideal state instance partitions
metadata otherwise.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -2242,4 +2303,15 @@ private IdealState
forceCommitConsumingSegmentsAndWait(String tableNameWithType,
tableRebalanceLogger.info("Successfully force committed {} consuming
segments", segmentsToCommit.size());
return
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
}
+
+ private IdealState replaceInstancePartitionsInIdealState(String
tableNameWithType, IdealState currentIdealState,
+ List<InstancePartitions> instancePartitionsList) {
+ Map<String, List<String>> idealStateListFields =
currentIdealState.getRecord().getListFields();
+
InstancePartitionsUtils.replaceInstancePartitionsInIdealState(currentIdealState,
instancePartitionsList);
+
+ return HelixHelper.updateIdealState(_helixManager, tableNameWithType, is
-> {
Review Comment:
There's three callers for this method:
1. When `segmentAssignmentUnchanged` but instance partitions is changed,
this method is called to update ideal state instance partitions just before
completing the rebalance. I think this one is safe.
2. After the end of the rebalance when the current assignment matches the
target assignment. I think this one is safe too?
3. Before starting the actual rebalance - I guess this is the one you're
concerned about?
I've updated the wipe out logic to be performed alongside IS change, but the
restoration part at the end is still separate because it looks like there could
be cases where the assignment reaches the target assignment outside of the
rebalance initiated IS updates.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -296,6 +301,18 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
"Cannot rebalance disabled table without downtime", null, null,
null, null, null);
}
+ // Wipe out ideal state instance partitions metadata
Review Comment:
> If we wipe it here, and following part throws exception, we might end up
with an IS without instance partitions
That would cause the rebalance to fail, in which case it will be retried
anyway right?
> E.g. when segmentAssignmentUnchanged, we should check if instance
partitions changed, then modify accordingly.
We're already updating ideal state instance partitions when
`segmentAssignmentUnchanged` but instance partitions changed. But good point
about `segmentAssignmentUnchanged` - when there's no instance partitions
change, I think that would've wiped out ideal state instance partitions. I've
updated the logic.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -722,6 +764,25 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
tableRebalanceLogger.info(msg);
// Record completion
_tableRebalanceObserver.onSuccess(msg);
+
+ if (_updateIdealStateInstancePartitions) {
+ // Rebalance completed successfully, so we can update the instance
partitions in the ideal state to reflect
+ // the new set of instance partitions.
+ List<InstancePartitions> instancePartitionsList = new
ArrayList<>(instancePartitionsMap.values());
Review Comment:
The existing one should've been wiped out before this point though.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]