jtao15 commented on a change in pull request #8441:
URL: https://github.com/apache/pinot/pull/8441#discussion_r840126785



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
##########
@@ -145,6 +145,8 @@ public RebalanceResult rebalance(TableConfig tableConfig, 
Configuration rebalanc
         tableConfig.getRoutingConfig().getInstanceSelectorType());
     boolean bestEfforts = 
rebalanceConfig.getBoolean(RebalanceConfigConstants.BEST_EFFORTS,
         RebalanceConfigConstants.DEFAULT_BEST_EFFORTS);
+    boolean retainInstanceSequence = 
rebalanceConfig.getBoolean(RebalanceConfigConstants.RETAIN_INSTANCE_SEQUENCE,
+        RebalanceConfigConstants.DEFAULT_RETAIN_INSTANCE_SEQUENCE);

Review comment:
       Should we throw exceptions if `retainInstanceSequence = true` and 
`reassignInstances = true` or the table does not allow instance assignment? 

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
##########
@@ -121,12 +190,40 @@ public InstanceTagPoolSelector(InstanceTagPoolConfig 
tagPoolConfig, String table
       LOGGER.info("Selecting pools: {} for table: {}", poolsToSelect, 
_tableNameWithType);
       pools.retainAll(poolsToSelect);
     } else {
-      // Non-pool based selection
+      // Non-pool based selection. All the instances should be associated with 
a single pool, which is always 0.
+      // E.g.: Pool0 -> [ I1, I2, I3, I4, I5, I6 ]
 
-      LOGGER.info("Selecting {} instances for table: {}", 
numCandidateInstances, _tableNameWithType);
+      LOGGER.info("Selecting {} instances for table: {}", 
candidateInstanceConfigsMap.size(), _tableNameWithType);
       // Put all instance configs as pool 0
-      poolToInstanceConfigsMap.put(0, candidateInstanceConfigs);
+
+      for (Map.Entry<Integer, List<String>> entry : 
existingPoolToInstancesMap.entrySet()) {

Review comment:
       This assume the pools are not changed between 
`existingInstancePartitions` and `instanceConfigs`. Is it possible that all the 
instances in the same pool got removed? In other words, is it possible that 
`poolToNewInstanceConfigsMap.get(pool)` is empty because the pools are changed?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
##########
@@ -48,52 +52,117 @@ public InstanceTagPoolSelector(InstanceTagPoolConfig 
tagPoolConfig, String table
 
   /**
    * Returns a map from pool to instance configs based on the tag and pool 
config for the given instance configs.
+   * @param instanceConfigs list of latest instance configs from ZK.
+   * @param existingPoolToInstancesMap existing instance with sequence that 
should be respected. An empty list
+   *                                      means no preceding sequence to 
respect and the instances would be sorted.
    */
-  public Map<Integer, List<InstanceConfig>> 
selectInstances(List<InstanceConfig> instanceConfigs) {
+  public Map<Integer, List<InstanceConfig>> 
selectInstances(List<InstanceConfig> instanceConfigs,
+      Map<Integer, List<String>> existingPoolToInstancesMap) {
     int tableNameHash = Math.abs(_tableNameWithType.hashCode());
     LOGGER.info("Starting instance tag/pool selection for table: {} with hash: 
{}", _tableNameWithType, tableNameHash);
 
-    // Filter out the instances with the correct tag
+    // If existingPoolToInstancesMap is null, treat it as an empty map.
+    if (existingPoolToInstancesMap == null) {
+      existingPoolToInstancesMap = Collections.emptyMap();
+    }
+    // Filter out the instances with the correct tag.
+    // Use LinkedHashMap here to retain the sorted list of instance names.
     String tag = _tagPoolConfig.getTag();
-    List<InstanceConfig> candidateInstanceConfigs = new ArrayList<>();
+    Map<String, InstanceConfig> candidateInstanceConfigsMap = new 
LinkedHashMap<>();
     for (InstanceConfig instanceConfig : instanceConfigs) {
       if (instanceConfig.getTags().contains(tag)) {
-        candidateInstanceConfigs.add(instanceConfig);
+        candidateInstanceConfigsMap.put(instanceConfig.getInstanceName(), 
instanceConfig);
       }
     }
-    
candidateInstanceConfigs.sort(Comparator.comparing(InstanceConfig::getInstanceName));
-    int numCandidateInstances = candidateInstanceConfigs.size();
+
+    // Find out newly added instances from the latest copies of instance 
configs.
+    // A deque is used here in order to retain the sequence,
+    // given the fact that the list of instance configs is always sorted.
+    Deque<String> newlyAddedInstances = new 
LinkedList<>(candidateInstanceConfigsMap.keySet());
+    for (List<String> existingInstancesWithSequence : 
existingPoolToInstancesMap.values()) {
+      newlyAddedInstances.removeAll(existingInstancesWithSequence);
+    }
+
+    int numCandidateInstances = candidateInstanceConfigsMap.size();
     Preconditions.checkState(numCandidateInstances > 0, "No enabled instance 
has the tag: %s", tag);
     LOGGER.info("{} enabled instances have the tag: {} for table: {}", 
numCandidateInstances, tag, _tableNameWithType);
 
-    Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new 
TreeMap<>();
+    Map<Integer, List<InstanceConfig>> poolToLatestInstanceConfigsMap = new 
TreeMap<>();
     if (_tagPoolConfig.isPoolBased()) {
-      // Pool based selection
+      // Pool based selection. All the instances should be associated with a 
specific pool number.
+      // Instance selection should be done within the same pool.
+      // E.g.: Pool0 -> [ I1, I2, I3 ]
+      //       Pool1 -> [ I4, I5, I6 ]
 
-      // Extract the pool information from the instance configs
-      for (InstanceConfig instanceConfig : candidateInstanceConfigs) {
+      // Each pool number associates with a map that key is the instance name 
and value is the instance config.
+      Map<Integer, Map<String, InstanceConfig>> poolToInstanceConfigsMap = new 
HashMap<>();
+      // Each pool number associates with a list of newly added instance 
configs,
+      // so that new instances can be fetched from this list.
+      Map<Integer, Deque<InstanceConfig>> poolToNewInstanceConfigsMap = new 
HashMap<>();
+
+      // Extract the pool information from the instance configs.
+      for (Map.Entry<String, InstanceConfig> entry : 
candidateInstanceConfigsMap.entrySet()) {
+        String instanceName = entry.getKey();
+        InstanceConfig instanceConfig = entry.getValue();
         Map<String, String> poolMap = 
instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY);
         if (poolMap != null && poolMap.containsKey(tag)) {
           int pool = Integer.parseInt(poolMap.get(tag));
-          poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new 
ArrayList<>()).add(instanceConfig);
+          poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new 
TreeMap<>()).put(instanceName, instanceConfig);
+          if (newlyAddedInstances.contains(instanceName)) {
+            poolToNewInstanceConfigsMap.computeIfAbsent(pool, k -> new 
LinkedList<>()).add(instanceConfig);
+          }
+        }
+      }
+
+      for (Map.Entry<Integer, List<String>> entry : 
existingPoolToInstancesMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<String> existingInstanceAssignmentInPool = entry.getValue();
+        List<InstanceConfig> candidateInstanceConfigsWithSequence = new 
ArrayList<>();
+        for (String existingInstance: existingInstanceAssignmentInPool) {
+          InstanceConfig instanceConfig = 
poolToInstanceConfigsMap.get(pool).get(existingInstance);
+          // Add instances to the candidate list and respect the sequence of 
the existing instances from the ZK.
+          // The missing/removed instances will be replaced by the newly 
instances.
+          // If the instance still exists from ZK, then add it to the 
candidate list.
+          // E.g. if the old instances are: [I1, I2, I3, I4] and the new 
instance are: [I1, I3, I4, I5, I6],
+          // the removed instance is I2 and the newly added instances are I5 
and I6.
+          // The position of I2 would be replaced by I5, the new remaining I6 
would be appended to the tail.
+          // Thus, the new order would be [I1, I5, I3, I4, I6].

Review comment:
       `HashBasedRotateInstanceConstraintApplier` will rotate the list which 
depends on the number of instances, even if we retain the instances order here, 
after applying the constraints, the order can be different?
   Say we need to select 4 instances (2 replicas * 2 instances/replica), and 
the rotated list is `[I6, I1, I5, I3, I4]`, then we will pick `[I6, I1, I5, 
I3]`. This won't guarantee minimum segments movement because we need to move 
all segments from `I4` to `I5` or `I6`. Is this a valid case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to