yashmayya commented on code in PR #17515:
URL: https://github.com/apache/pinot/pull/17515#discussion_r2743803174


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -157,22 +159,25 @@ public class TableRebalancer {
   private final RebalancePreChecker _rebalancePreChecker;
   private final TableSizeReader _tableSizeReader;
   private final PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
+  private final boolean _updateIdealStateInstancePartitions;
 
   public TableRebalancer(HelixManager helixManager, @Nullable 
TableRebalanceObserver tableRebalanceObserver,
       @Nullable ControllerMetrics controllerMetrics, @Nullable 
RebalancePreChecker rebalancePreChecker,
       @Nullable TableSizeReader tableSizeReader,
-      @Nullable PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) 
{
+      @Nullable PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager,
+      boolean updateIdealStateInstancePartitions) {
     _helixManager = helixManager;
     _tableRebalanceObserver = 
Objects.requireNonNullElseGet(tableRebalanceObserver, 
NoOpTableRebalanceObserver::new);
     _helixDataAccessor = helixManager.getHelixDataAccessor();
     _controllerMetrics = controllerMetrics;
     _rebalancePreChecker = rebalancePreChecker;
     _tableSizeReader = tableSizeReader;
     _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
+    _updateIdealStateInstancePartitions = updateIdealStateInstancePartitions;
   }
 
   public TableRebalancer(HelixManager helixManager) {
-    this(helixManager, null, null, null, null, null);
+    this(helixManager, null, null, null, null, null, true);

Review Comment:
   This path is mainly used during testing, so I think it makes sense to keep 
it enabled to help catch any regressions?



##########
pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java:
##########
@@ -113,6 +114,12 @@ public List<String> getInstances(int partitionId, int 
replicaGroupId) {
         .get(Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR 
+ replicaGroupId);
   }
 
+  /// Given a partition ID and replica group ID like "0_0", return the list of 
instances belonging to that instance
+  /// partition
+  public List<String> getInstances(String partitionReplica) {

Review Comment:
   It's only being used for testing after the other update now



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1854,7 +1809,7 @@ public void addTable(TableConfig tableConfig, 
List<Pair<PartitionGroupMetadata,
       Preconditions.checkState(tableConfig != null, "Failed to read table 
config for table: %s", tableNameWithType);
 
       // Assign instances
-      assignInstances(tableConfig, true);
+      assignInstances(tableConfig, idealState, true);

Review Comment:
   Hm I can revert it for the update table path (when `override` is false) 
which only does instance assignment for new instance partitions (and rely on 
table rebalance to update like in other paths). But for new tables this is 
required, since they wouldn't have this ideal state instance partitions 
metadata otherwise.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -2242,4 +2303,15 @@ private IdealState 
forceCommitConsumingSegmentsAndWait(String tableNameWithType,
     tableRebalanceLogger.info("Successfully force committed {} consuming 
segments", segmentsToCommit.size());
     return 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
   }
+
+  private IdealState replaceInstancePartitionsInIdealState(String 
tableNameWithType, IdealState currentIdealState,
+      List<InstancePartitions> instancePartitionsList) {
+    Map<String, List<String>> idealStateListFields = 
currentIdealState.getRecord().getListFields();
+    
InstancePartitionsUtils.replaceInstancePartitionsInIdealState(currentIdealState,
 instancePartitionsList);
+
+    return HelixHelper.updateIdealState(_helixManager, tableNameWithType, is 
-> {

Review Comment:
   There's three callers for this method:
   
   1. When `segmentAssignmentUnchanged` but instance partitions is changed, 
this method is called to update ideal state instance partitions just before 
completing the rebalance. I think this one is safe.
   2. After the end of the rebalance when the current assignment matches the 
target assignment. I think this one is safe too?
   3. Before starting the actual rebalance - I guess this is the one you're 
concerned about?
   
   I've updated the wipe out logic to be performed alongside IS change, but the 
restoration part at the end is still separate because it looks like there could 
be cases where the assignment reaches the target assignment outside of the 
rebalance initiated IS updates.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -296,6 +301,18 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
           "Cannot rebalance disabled table without downtime", null, null, 
null, null, null);
     }
 
+    // Wipe out ideal state instance partitions metadata

Review Comment:
   > If we wipe it here, and following part throws exception, we might end up 
with an IS without instance partitions
   
   That would cause the rebalance to fail, in which case it will be retried 
anyway right?
   
   > E.g. when segmentAssignmentUnchanged, we should check if instance 
partitions changed, then modify accordingly.
   
   We're already updating ideal state instance partitions when 
`segmentAssignmentUnchanged` but instance partitions changed. But good point 
about `segmentAssignmentUnchanged` - when there's no instance partitions 
change, I think that would've wiped out ideal state instance partitions. I've 
updated the logic.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -722,6 +764,25 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
         tableRebalanceLogger.info(msg);
         // Record completion
         _tableRebalanceObserver.onSuccess(msg);
+
+        if (_updateIdealStateInstancePartitions) {
+          // Rebalance completed successfully, so we can update the instance 
partitions in the ideal state to reflect
+          // the new set of instance partitions.
+          List<InstancePartitions> instancePartitionsList = new 
ArrayList<>(instancePartitionsMap.values());

Review Comment:
   The existing one should've been wiped out before this point though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to