jackjlli commented on a change in pull request #8441:
URL: https://github.com/apache/pinot/pull/8441#discussion_r840818830



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
##########
@@ -48,52 +52,117 @@ public InstanceTagPoolSelector(InstanceTagPoolConfig 
tagPoolConfig, String table
 
   /**
    * Returns a map from pool to instance configs based on the tag and pool 
config for the given instance configs.
+   * @param instanceConfigs list of latest instance configs from ZK.
+   * @param existingPoolToInstancesMap existing instance with sequence that 
should be respected. An empty list
+   *                                      means no preceding sequence to 
respect and the instances would be sorted.
    */
-  public Map<Integer, List<InstanceConfig>> 
selectInstances(List<InstanceConfig> instanceConfigs) {
+  public Map<Integer, List<InstanceConfig>> 
selectInstances(List<InstanceConfig> instanceConfigs,
+      Map<Integer, List<String>> existingPoolToInstancesMap) {
     int tableNameHash = Math.abs(_tableNameWithType.hashCode());
     LOGGER.info("Starting instance tag/pool selection for table: {} with hash: 
{}", _tableNameWithType, tableNameHash);
 
-    // Filter out the instances with the correct tag
+    // If existingPoolToInstancesMap is null, treat it as an empty map.
+    if (existingPoolToInstancesMap == null) {
+      existingPoolToInstancesMap = Collections.emptyMap();
+    }
+    // Filter out the instances with the correct tag.
+    // Use LinkedHashMap here to retain the sorted list of instance names.
     String tag = _tagPoolConfig.getTag();
-    List<InstanceConfig> candidateInstanceConfigs = new ArrayList<>();
+    Map<String, InstanceConfig> candidateInstanceConfigsMap = new 
LinkedHashMap<>();
     for (InstanceConfig instanceConfig : instanceConfigs) {
       if (instanceConfig.getTags().contains(tag)) {
-        candidateInstanceConfigs.add(instanceConfig);
+        candidateInstanceConfigsMap.put(instanceConfig.getInstanceName(), 
instanceConfig);
       }
     }
-    
candidateInstanceConfigs.sort(Comparator.comparing(InstanceConfig::getInstanceName));
-    int numCandidateInstances = candidateInstanceConfigs.size();
+
+    // Find out newly added instances from the latest copies of instance 
configs.
+    // A deque is used here in order to retain the sequence,
+    // given the fact that the list of instance configs is always sorted.
+    Deque<String> newlyAddedInstances = new 
LinkedList<>(candidateInstanceConfigsMap.keySet());
+    for (List<String> existingInstancesWithSequence : 
existingPoolToInstancesMap.values()) {
+      newlyAddedInstances.removeAll(existingInstancesWithSequence);
+    }
+
+    int numCandidateInstances = candidateInstanceConfigsMap.size();
     Preconditions.checkState(numCandidateInstances > 0, "No enabled instance 
has the tag: %s", tag);
     LOGGER.info("{} enabled instances have the tag: {} for table: {}", 
numCandidateInstances, tag, _tableNameWithType);
 
-    Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new 
TreeMap<>();
+    Map<Integer, List<InstanceConfig>> poolToLatestInstanceConfigsMap = new 
TreeMap<>();
     if (_tagPoolConfig.isPoolBased()) {
-      // Pool based selection
+      // Pool based selection. All the instances should be associated with a 
specific pool number.
+      // Instance selection should be done within the same pool.
+      // E.g.: Pool0 -> [ I1, I2, I3 ]
+      //       Pool1 -> [ I4, I5, I6 ]
 
-      // Extract the pool information from the instance configs
-      for (InstanceConfig instanceConfig : candidateInstanceConfigs) {
+      // Each pool number associates with a map that key is the instance name 
and value is the instance config.
+      Map<Integer, Map<String, InstanceConfig>> poolToInstanceConfigsMap = new 
HashMap<>();
+      // Each pool number associates with a list of newly added instance 
configs,
+      // so that new instances can be fetched from this list.
+      Map<Integer, Deque<InstanceConfig>> poolToNewInstanceConfigsMap = new 
HashMap<>();
+
+      // Extract the pool information from the instance configs.
+      for (Map.Entry<String, InstanceConfig> entry : 
candidateInstanceConfigsMap.entrySet()) {
+        String instanceName = entry.getKey();
+        InstanceConfig instanceConfig = entry.getValue();
         Map<String, String> poolMap = 
instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY);
         if (poolMap != null && poolMap.containsKey(tag)) {
           int pool = Integer.parseInt(poolMap.get(tag));
-          poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new 
ArrayList<>()).add(instanceConfig);
+          poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new 
TreeMap<>()).put(instanceName, instanceConfig);
+          if (newlyAddedInstances.contains(instanceName)) {
+            poolToNewInstanceConfigsMap.computeIfAbsent(pool, k -> new 
LinkedList<>()).add(instanceConfig);
+          }
+        }
+      }
+
+      for (Map.Entry<Integer, List<String>> entry : 
existingPoolToInstancesMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<String> existingInstanceAssignmentInPool = entry.getValue();
+        List<InstanceConfig> candidateInstanceConfigsWithSequence = new 
ArrayList<>();
+        for (String existingInstance: existingInstanceAssignmentInPool) {
+          InstanceConfig instanceConfig = 
poolToInstanceConfigsMap.get(pool).get(existingInstance);
+          // Add instances to the candidate list and respect the sequence of 
the existing instances from the ZK.
+          // The missing/removed instances will be replaced by the newly 
instances.
+          // If the instance still exists from ZK, then add it to the 
candidate list.
+          // E.g. if the old instances are: [I1, I2, I3, I4] and the new 
instance are: [I1, I3, I4, I5, I6],
+          // the removed instance is I2 and the newly added instances are I5 
and I6.
+          // The position of I2 would be replaced by I5, the new remaining I6 
would be appended to the tail.
+          // Thus, the new order would be [I1, I5, I3, I4, I6].

Review comment:
       Yes this is a valid case, if the pool exists in the 
existingPoolToInstancesMap, we actually don't need to rotate the list. 
Otherwise, just do the rotation as the current behavior. Add a class called 
`RetainedSequenceInstanceConstraintApplier` to extend the existing default 
applier.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to