tflobbe commented on code in PR #1650:
URL: https://github.com/apache/solr/pull/1650#discussion_r1223482384


##########
solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java:
##########
@@ -135,190 +100,35 @@ public void call(ClusterState state, ZkNodeProps 
message, NamedList<Object> resu
         assignRequests.add(assignRequest);
       }
       Assign.AssignStrategy assignStrategy = 
Assign.createAssignStrategy(ccc.getCoreContainer());
-      replicaPositions = assignStrategy.assign(ccc.getSolrCloudManager(), 
assignRequests);
-    }
-    int replicaPositionIdx = 0;
-    for (ZkNodeProps sourceReplica : sourceReplicas) {
-      String sourceCollection = sourceReplica.getStr(COLLECTION_PROP);
-      if (log.isInfoEnabled()) {
-        log.info(
-            "Going to create replica for collection={} shard={} on node={}",
-            sourceCollection,
-            sourceReplica.getStr(SHARD_ID_PROP),
-            target);
-      }
-      String targetNode;
-      // Use the assigned replica positions, if target is null or empty 
(checked above)
-      if (replicaPositions != null) {
-        targetNode = replicaPositions.get(replicaPositionIdx).node;
-        replicaPositionIdx++;
-      } else {
-        targetNode = target;
-      }
-      ZkNodeProps msg =
-          sourceReplica
-              .plus("parallel", String.valueOf(parallel))
-              .plus(CoreAdminParams.NODE, targetNode);
-      if (async != null) msg.getProperties().put(ASYNC, async);
-      NamedList<Object> nl = new NamedList<>();
-      final ZkNodeProps addedReplica =
-          new AddReplicaCmd(ccc)
-              .addReplica(
-                  clusterState,
-                  msg,
-                  nl,
-                  () -> {
-                    countDownLatch.countDown();
-                    if (nl.get("failure") != null) {
-                      String errorString =
-                          String.format(
-                              Locale.ROOT,
-                              "Failed to create replica for collection=%s 
shard=%s" + " on node=%s",
-                              sourceCollection,
-                              sourceReplica.getStr(SHARD_ID_PROP),
-                              target);
-                      log.warn(errorString);
-                      // one replica creation failed. Make the best attempt to
-                      // delete all the replicas created so far in the target
-                      // and exit
-                      synchronized (results) {
-                        results.add("failure", errorString);
-                        anyOneFailed.set(true);
-                      }
-                    } else {
-                      if (log.isDebugEnabled()) {
-                        log.debug(
-                            "Successfully created replica for collection={} 
shard={} on node={}",
-                            sourceCollection,
-                            sourceReplica.getStr(SHARD_ID_PROP),
-                            target);
-                      }
-                    }
-                  })
-              .get(0);
-
-      if (addedReplica != null) {
-        createdReplicas.add(addedReplica);
-        if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || 
waitForFinalState) {
-          String shardName = sourceReplica.getStr(SHARD_ID_PROP);
-          String replicaName = 
sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
-          String collectionName = sourceCollection;
-          String key = collectionName + "_" + replicaName;
-          CollectionStateWatcher watcher;
-          if (waitForFinalState) {
-            watcher =
-                new ActiveReplicaWatcher(
-                    collectionName,
-                    null,
-                    
Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)),
-                    replicasToRecover);
-          } else {
-            watcher =
-                new LeaderRecoveryWatcher(
-                    collectionName,
-                    shardName,
-                    replicaName,
-                    addedReplica.getStr(ZkStateReader.CORE_NAME_PROP),
-                    replicasToRecover);
-          }
-          watchers.put(key, watcher);
-          log.debug("--- adding {}, {}", key, watcher);
-          zkStateReader.registerCollectionStateWatcher(collectionName, 
watcher);
-        } else {
-          log.debug("--- not waiting for {}", addedReplica);
-        }
+      List<ReplicaPosition> replicaPositions =
+          assignStrategy.assign(ccc.getSolrCloudManager(), assignRequests);
+      int position = 0;
+      for (Replica sourceReplica : sourceReplicas) {
+        replicaMovements.put(sourceReplica, 
replicaPositions.get(position++).node);
       }
-    }
-
-    log.debug("Waiting for replicas to be added");
-    if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
-      log.info("Timed out waiting for replicas to be added");
-      anyOneFailed.set(true);
-    } else {
-      log.debug("Finished waiting for replicas to be added");
-    }
-
-    // now wait for leader replicas to recover
-    log.debug("Waiting for {} leader replicas to recover", numLeaders);
-    if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
-      if (log.isInfoEnabled()) {
-        log.info(
-            "Timed out waiting for {} leader replicas to recover", 
replicasToRecover.getCount());
-      }
-      anyOneFailed.set(true);
     } else {
-      log.debug("Finished waiting for leader replicas to recover");
-    }
-    // remove the watchers, we're done either way
-    for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) {
-      zkStateReader.removeCollectionStateWatcher(e.getKey(), e.getValue());
-    }
-    if (anyOneFailed.get()) {
-      log.info("Failed to create some replicas. Cleaning up all replicas on 
target node");
-      SolrCloseableLatch cleanupLatch =
-          new SolrCloseableLatch(createdReplicas.size(), 
ccc.getCloseableToLatchOn());
-      for (ZkNodeProps createdReplica : createdReplicas) {
-        NamedList<Object> deleteResult = new NamedList<>();
-        try {
-          new DeleteReplicaCmd(ccc)
-              .deleteReplica(
-                  zkStateReader.getClusterState(),
-                  createdReplica.plus("parallel", "true"),
-                  deleteResult,
-                  () -> {
-                    cleanupLatch.countDown();
-                    if (deleteResult.get("failure") != null) {
-                      synchronized (results) {
-                        results.add(
-                            "failure",
-                            "Could not cleanup, because of : " + 
deleteResult.get("failure"));
-                      }
-                    }
-                  });
-        } catch (KeeperException e) {
-          cleanupLatch.countDown();
-          log.warn("Error deleting replica ", e);
-        } catch (Exception e) {
-          log.warn("Error deleting replica ", e);
-          cleanupLatch.countDown();
-          throw e;
-        }
+      for (Replica sourceReplica : sourceReplicas) {
+        replicaMovements.put(sourceReplica, target);
       }
-      cleanupLatch.await(5, TimeUnit.MINUTES);
-      return;
     }
 
-    // we have reached this far means all replicas could be recreated
-    // now cleanup the replicas in the source node
-    DeleteNodeCmd.cleanupReplicas(results, state, sourceReplicas, ccc, source, 
async);
-    results.add(
-        "success",
-        "REPLACENODE action completed successfully from  : " + source + " to : 
" + target);
+    boolean migrationSuccessful =
+        ReplicaMigrationUtils.migrateReplicas(
+            ccc, replicaMovements, parallel, waitForFinalState, timeout, 
async, results);
+    if (migrationSuccessful) {
+      results.add(
+          "success",
+          "REPLACENODE action completed successfully from  : " + source + " to 
: " + target);
+    }
   }
 
-  static List<ZkNodeProps> getReplicasOfNode(String source, ClusterState 
state) {
-    List<ZkNodeProps> sourceReplicas = new ArrayList<>();
+  static List<Replica> getReplicasOfNode(String source, ClusterState state) {

Review Comment:
   maybe rename `source` to `node` or `nodeName`?



##########
solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.solr.cloud.ActiveReplicaWatcher;
+import org.apache.solr.common.SolrCloseableLatch;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicaMigrationUtils {
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Code to migrate replicas to already chosen nodes. This will create new 
replicas, and delete the
+   * old replicas after the creation is done.
+   *
+   * <p>If an error occurs during the creation of new replicas, all new 
replicas will be deleted.
+   *
+   * @param ccc The collection command context to use from the API that calls 
this method
+   * @param movements a map from replica to the new node that the replica 
should live on
+   * @param parallel whether the replica creations should be done in parallel
+   * @param waitForFinalState wait for the final state of all newly created 
replicas before
+   *     continuing
+   * @param timeout the amount of time to wait for new replicas to be created
+   * @param asyncId If provided, the command will be run under the given 
asyncId
+   * @param results push results (successful and failure) onto this list
+   * @return whether the command was successful
+   */
+  static boolean migrateReplicas(
+      CollectionCommandContext ccc,
+      Map<Replica, String> movements,
+      boolean parallel,
+      boolean waitForFinalState,
+      int timeout,
+      String asyncId,
+      NamedList<Object> results)
+      throws IOException, InterruptedException, KeeperException {
+    // how many leaders are we moving? for these replicas we have to make sure 
that either:
+    // * another existing replica can become a leader, or
+    // * we wait until the newly created replica completes recovery (and can 
become the new leader)
+    // If waitForFinalState=true we wait for all replicas
+    int numLeaders = 0;
+    for (Replica replica : movements.keySet()) {
+      if (replica.isLeader() || waitForFinalState) {
+        numLeaders++;
+      }
+    }
+    // map of collectionName_coreNodeName to watchers
+    Map<String, CollectionStateWatcher> watchers = new HashMap<>();
+    List<ZkNodeProps> createdReplicas = new ArrayList<>();
+
+    AtomicBoolean anyOneFailed = new AtomicBoolean(false);
+    SolrCloseableLatch countDownLatch =
+        new SolrCloseableLatch(movements.size(), ccc.getCloseableToLatchOn());
+
+    SolrCloseableLatch replicasToRecover =
+        new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn());
+
+    ClusterState clusterState = ccc.getZkStateReader().getClusterState();
+
+    for (Map.Entry<Replica, String> movement : movements.entrySet()) {
+      Replica sourceReplica = movement.getKey();
+      String targetNode = movement.getValue();
+      String sourceCollection = sourceReplica.getCollection();
+      if (log.isInfoEnabled()) {
+        log.info(
+            "Going to create replica for collection={} shard={} on node={}",
+            sourceCollection,
+            sourceReplica.getShard(),
+            targetNode);
+      }
+
+      ZkNodeProps msg =
+          sourceReplica
+              .toFullProps()
+              .plus("parallel", String.valueOf(parallel))
+              .plus(CoreAdminParams.NODE, targetNode);
+      if (asyncId != null) msg.getProperties().put(ASYNC, asyncId);
+      NamedList<Object> nl = new NamedList<>();
+      final ZkNodeProps addedReplica =
+          new AddReplicaCmd(ccc)
+              .addReplica(
+                  clusterState,
+                  msg,
+                  nl,
+                  () -> {
+                    countDownLatch.countDown();
+                    if (nl.get("failure") != null) {
+                      String errorString =
+                          String.format(
+                              Locale.ROOT,
+                              "Failed to create replica for collection=%s 
shard=%s" + " on node=%s",
+                              sourceCollection,
+                              sourceReplica.getShard(),
+                              targetNode);
+                      log.warn(errorString);
+                      // one replica creation failed. Make the best attempt to
+                      // delete all the replicas created so far in the target
+                      // and exit
+                      synchronized (results) {
+                        results.add("failure", errorString);
+                        anyOneFailed.set(true);
+                      }
+                    } else {
+                      if (log.isDebugEnabled()) {
+                        log.debug(
+                            "Successfully created replica for collection={} 
shard={} on node={}",
+                            sourceCollection,
+                            sourceReplica.getShard(),
+                            targetNode);
+                      }
+                    }
+                  })
+              .get(0);
+
+      if (addedReplica != null) {
+        createdReplicas.add(addedReplica);
+        if (sourceReplica.isLeader() || waitForFinalState) {
+          String shardName = sourceReplica.getShard();
+          String replicaName = sourceReplica.getName();
+          String key = sourceCollection + "_" + replicaName;
+          CollectionStateWatcher watcher;
+          if (waitForFinalState) {
+            watcher =
+                new ActiveReplicaWatcher(
+                    sourceCollection,
+                    null,
+                    
Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)),
+                    replicasToRecover);
+          } else {
+            watcher =
+                new LeaderRecoveryWatcher(
+                    sourceCollection,
+                    shardName,
+                    replicaName,
+                    addedReplica.getStr(ZkStateReader.CORE_NAME_PROP),
+                    replicasToRecover);
+          }
+          watchers.put(key, watcher);
+          log.debug("--- adding {}, {}", key, watcher);
+          
ccc.getZkStateReader().registerCollectionStateWatcher(sourceCollection, 
watcher);
+        } else {
+          log.debug("--- not waiting for {}", addedReplica);
+        }
+      }
+    }
+
+    log.debug("Waiting for replicas to be added");
+    if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
+      log.info("Timed out waiting for replicas to be added");
+      anyOneFailed.set(true);
+    } else {
+      log.debug("Finished waiting for replicas to be added");
+    }
+
+    // now wait for leader replicas to recover
+    log.debug("Waiting for {} leader replicas to recover", numLeaders);
+    if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
+      if (log.isInfoEnabled()) {
+        log.info(
+            "Timed out waiting for {} leader replicas to recover", 
replicasToRecover.getCount());
+      }
+      anyOneFailed.set(true);
+    } else {
+      log.debug("Finished waiting for leader replicas to recover");
+    }
+    // remove the watchers, we're done either way
+    for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) {
+      ccc.getZkStateReader().removeCollectionStateWatcher(e.getKey(), 
e.getValue());
+    }
+    if (anyOneFailed.get()) {
+      log.info("Failed to create some replicas. Cleaning up all replicas on 
target node");
+      SolrCloseableLatch cleanupLatch =
+          new SolrCloseableLatch(createdReplicas.size(), 
ccc.getCloseableToLatchOn());
+      for (ZkNodeProps createdReplica : createdReplicas) {
+        NamedList<Object> deleteResult = new NamedList<>();
+        try {
+          new DeleteReplicaCmd(ccc)
+              .deleteReplica(
+                  ccc.getZkStateReader().getClusterState(),
+                  createdReplica.plus("parallel", "true"),
+                  deleteResult,
+                  () -> {
+                    cleanupLatch.countDown();
+                    if (deleteResult.get("failure") != null) {
+                      synchronized (results) {
+                        results.add(
+                            "failure",
+                            "Could not cleanup, because of : " + 
deleteResult.get("failure"));
+                      }
+                    }
+                  });
+        } catch (KeeperException e) {
+          cleanupLatch.countDown();
+          log.warn("Error deleting replica ", e);
+        } catch (Exception e) {
+          log.warn("Error deleting replica ", e);
+          cleanupLatch.countDown();
+          throw e;
+        }
+      }
+      cleanupLatch.await(5, TimeUnit.MINUTES);
+      return false;
+    }
+
+    // we have reached this far, meaning all replicas should have been 
recreated.
+    // now cleanup the original replicas
+    return cleanupReplicas(
+        results, ccc.getZkStateReader().getClusterState(), movements.keySet(), 
ccc, asyncId);
+  }
+
+  static boolean cleanupReplicas(
+      NamedList<Object> results,
+      ClusterState clusterState,
+      Collection<Replica> sourceReplicas,
+      CollectionCommandContext ccc,
+      String async)
+      throws IOException, InterruptedException {
+    CountDownLatch cleanupLatch = new CountDownLatch(sourceReplicas.size());
+    for (Replica sourceReplica : sourceReplicas) {
+      String coll = sourceReplica.getCollection();
+      String shard = sourceReplica.getShard();
+      String type = sourceReplica.getType().toString();
+      String node = sourceReplica.getNodeName();
+      log.info(
+          "Deleting replica type={} for collection={} shard={} on node={}",
+          type,
+          coll,
+          shard,
+          node);
+      NamedList<Object> deleteResult = new NamedList<>();
+      try {
+        ZkNodeProps cmdMessage = sourceReplica.toFullProps();
+        if (async != null) cmdMessage = cmdMessage.plus(ASYNC, async);
+        new DeleteReplicaCmd(ccc)
+            .deleteReplica(
+                clusterState,
+                cmdMessage.plus("parallel", "true"),
+                deleteResult,
+                () -> {
+                  cleanupLatch.countDown();
+                  if (deleteResult.get("failure") != null) {
+                    synchronized (results) {
+                      results.add(
+                          "failure",
+                          String.format(
+                              Locale.ROOT,
+                              "Failed to delete replica for collection=%s 
shard=%s" + " on node=%s",

Review Comment:
   No need to add strings



##########
solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java:
##########
@@ -251,1006 +231,463 @@ private AffinityPlacementPlugin(
 
       // We make things reproducible in tests by using test seed if any
       String seed = System.getProperty("tests.seed");
-      if (seed != null) {
-        replicaPlacementRandom.setSeed(seed.hashCode());
-      }
-    }
-
-    @Override
-    @SuppressForbidden(
-        reason =
-            "Ordering.arbitrary() has no equivalent in Comparator class. 
Rather reuse than copy.")
-    public List<PlacementPlan> computePlacements(
-        Collection<PlacementRequest> requests, PlacementContext 
placementContext)
-        throws PlacementException {
-      List<PlacementPlan> placementPlans = new ArrayList<>(requests.size());
-      Set<Node> allNodes = new HashSet<>();
-      Set<SolrCollection> allCollections = new HashSet<>();
-      for (PlacementRequest request : requests) {
-        allNodes.addAll(request.getTargetNodes());
-        allCollections.add(request.getCollection());
-      }
-
-      // Fetch attributes for a superset of all nodes requested amongst the 
placementRequests
-      AttributeFetcher attributeFetcher = 
placementContext.getAttributeFetcher();
-      attributeFetcher
-          
.requestNodeSystemProperty(AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP)
-          .requestNodeSystemProperty(AffinityPlacementConfig.NODE_TYPE_SYSPROP)
-          
.requestNodeSystemProperty(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP)
-          
.requestNodeSystemProperty(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
-      attributeFetcher
-          .requestNodeMetric(BuiltInMetrics.NODE_NUM_CORES)
-          .requestNodeMetric(BuiltInMetrics.NODE_FREE_DISK_GB);
-      Set<ReplicaMetric<?>> replicaMetrics = 
Set.of(BuiltInMetrics.REPLICA_INDEX_SIZE_GB);
-      for (SolrCollection collection : allCollections) {
-        attributeFetcher.requestCollectionMetrics(collection, replicaMetrics);
-      }
-      attributeFetcher.fetchFrom(allNodes);
-      final AttributeValues attrValues = attributeFetcher.fetchAttributes();
-      // Get the number of currently existing cores per node, so we can update 
as we place new cores
-      // to not end up always selecting the same node(s). This is used across 
placement requests
-      Map<Node, Integer> allCoresOnNodes = getCoreCountPerNode(allNodes, 
attrValues);
-
-      boolean doSpreadAcrossDomains = shouldSpreadAcrossDomains(allNodes, 
attrValues);
-
-      // Keep track with nodesWithReplicas across requests
-      Map<String, Map<String, Set<Node>>> allNodesWithReplicas = new 
HashMap<>();
-      for (PlacementRequest request : requests) {
-        Set<Node> nodes = request.getTargetNodes();
-        SolrCollection solrCollection = request.getCollection();
-
-        // filter out nodes that don't meet the `withCollection` constraint
-        nodes =
-            filterNodesWithCollection(placementContext.getCluster(), request, 
attrValues, nodes);
-        // filter out nodes that don't match the "node types" specified in the 
collection props
-        nodes = filterNodesByNodeType(placementContext.getCluster(), request, 
attrValues, nodes);
-
-        // All available zones of live nodes. Due to some nodes not being 
candidates for placement,
-        // and some existing replicas being one availability zones that might 
be offline (i.e. their
-        // nodes are not live), this set might contain zones on which it is 
impossible to place
-        // replicas. That's ok.
-        Set<String> availabilityZones = getZonesFromNodes(nodes, attrValues);
-
-        // Build the replica placement decisions here
-        Set<ReplicaPlacement> replicaPlacements = new HashSet<>();
-
-        // Let's now iterate on all shards to create replicas for and start 
finding home sweet homes
-        // for the replicas
-        for (String shardName : request.getShardNames()) {
-          ReplicaMetrics leaderMetrics =
-              attrValues
-                  .getCollectionMetrics(solrCollection.getName())
-                  .flatMap(colMetrics -> colMetrics.getShardMetrics(shardName))
-                  .flatMap(ShardMetrics::getLeaderMetrics)
-                  .orElse(null);
-
-          // Split the set of nodes into 3 sets of nodes accepting each 
replica type (sets can
-          // overlap
-          // if nodes accept multiple replica types). These subsets sets are 
actually maps, because
-          // we
-          // capture the number of cores (of any replica type) present on each 
node.
-          //
-          // This also filters out nodes that will not satisfy the rules if 
the replica is placed
-          // there
-          EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes =
-              getAvailableNodesForReplicaTypes(nodes, attrValues, 
leaderMetrics);
-
-          // Inventory nodes (if any) that already have a replica of any type 
for the shard, because
-          // we can't be placing additional replicas on these. This data 
structure is updated after
-          // each replica to node assign and is used to make sure different 
replica types are not
-          // allocated to the same nodes (protecting same node assignments 
within a given replica
-          // type is done "by construction" in makePlacementDecisions()).
-          Set<Node> nodesWithReplicas =
-              allNodesWithReplicas
-                  .computeIfAbsent(solrCollection.getName(), col -> new 
HashMap<>())
-                  .computeIfAbsent(
-                      shardName,
-                      s -> {
-                        Set<Node> newNodeSet = new HashSet<>();
-                        Shard shard = solrCollection.getShard(s);
-                        if (shard != null) {
-                          // Prefill the set with the existing replicas
-                          for (Replica r : shard.replicas()) {
-                            newNodeSet.add(r.getNode());
-                          }
-                        }
-                        return newNodeSet;
-                      });
-
-          // Iterate on the replica types in the enum order. We place more 
strategic replicas first
-          // (NRT is more strategic than TLOG more strategic than PULL). This 
is in case we
-          // eventually decide that less strategic replica placement 
impossibility is not a problem
-          // that should lead to replica placement computation failure. 
Current code does fail if
-          // placement is impossible (constraint is at most one replica of a 
shard on any node).
-          for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) 
{
-            int numReplicasToCreate = 
request.getCountReplicasToCreate(replicaType);
-            if (numReplicasToCreate > 0) {
-              makePlacementDecisions(
-                  solrCollection,
-                  shardName,
-                  availabilityZones,
-                  replicaType,
-                  numReplicasToCreate,
-                  attrValues,
-                  leaderMetrics,
-                  replicaTypeToNodes,
-                  nodesWithReplicas,
-                  allCoresOnNodes,
-                  placementContext.getPlacementPlanFactory(),
-                  replicaPlacements,
-                  doSpreadAcrossDomains);
-            }
-          }
-        }
-        placementPlans.add(
-            placementContext
-                .getPlacementPlanFactory()
-                .createPlacementPlan(request, replicaPlacements));
-      }
-
-      return placementPlans;
-    }
-
-    private boolean shouldSpreadAcrossDomains(Set<Node> allNodes, 
AttributeValues attrValues) {
-      boolean doSpreadAcrossDomains =
-          spreadAcrossDomains && spreadDomainPropPresent(allNodes, attrValues);
-      if (spreadAcrossDomains && !doSpreadAcrossDomains) {
-        log.warn(
-            "AffinityPlacementPlugin configured to spread across domains, but 
there are nodes in the cluster without the {} system property. Ignoring 
spreadAcrossDomains.",
-            AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
-      }
-      return doSpreadAcrossDomains;
-    }
-
-    private boolean spreadDomainPropPresent(Set<Node> allNodes, 
AttributeValues attrValues) {
-      // We can only use spread domains if all nodes have the system property
-      return allNodes.stream()
-          .noneMatch(
-              n ->
-                  attrValues
-                      .getSystemProperty(n, 
AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
-                      .isEmpty());
     }
 
     @Override
-    public void verifyAllowedModification(
-        ModificationRequest modificationRequest, PlacementContext 
placementContext)
-        throws PlacementModificationException {
-      if (modificationRequest instanceof DeleteShardsRequest) {
-        log.warn("DeleteShardsRequest not implemented yet, skipping: {}", 
modificationRequest);
-      } else if (modificationRequest instanceof DeleteCollectionRequest) {
-        verifyDeleteCollection((DeleteCollectionRequest) modificationRequest, 
placementContext);
-      } else if (modificationRequest instanceof DeleteReplicasRequest) {
-        verifyDeleteReplicas((DeleteReplicasRequest) modificationRequest, 
placementContext);
-      } else {
-        log.warn("unsupported request type, skipping: {}", 
modificationRequest);
-      }
-    }
-
-    private void verifyDeleteCollection(
+    protected void verifyDeleteCollection(
         DeleteCollectionRequest deleteCollectionRequest, PlacementContext 
placementContext)
         throws PlacementModificationException {
       Cluster cluster = placementContext.getCluster();
-      Set<String> colocatedCollections =
-          
colocatedWith.getOrDefault(deleteCollectionRequest.getCollection().getName(), 
Set.of());
-      for (String primaryName : colocatedCollections) {
+      Set<String> collocatedCollections =
+          
collocatedWith.getOrDefault(deleteCollectionRequest.getCollection().getName(), 
Set.of());
+      for (String primaryName : collocatedCollections) {
         try {
           if (cluster.getCollection(primaryName) != null) {
             // still exists
             throw new PlacementModificationException(
-                "colocated collection "
+                "collocated collection "
                     + primaryName
                     + " of "
                     + deleteCollectionRequest.getCollection().getName()
                     + " still present");
           }
         } catch (IOException e) {
           throw new PlacementModificationException(
-              "failed to retrieve colocated collection information", e);
+              "failed to retrieve collocated collection information", e);
         }
       }
     }
 
-    private void verifyDeleteReplicas(
-        DeleteReplicasRequest deleteReplicasRequest, PlacementContext 
placementContext)
-        throws PlacementModificationException {
-      Cluster cluster = placementContext.getCluster();
-      SolrCollection secondaryCollection = 
deleteReplicasRequest.getCollection();
-      Set<String> colocatedCollections = 
colocatedWith.get(secondaryCollection.getName());
-      if (colocatedCollections == null) {
-        return;
-      }
-      Map<Node, Map<String, AtomicInteger>> secondaryNodeShardReplicas = new 
HashMap<>();
-      secondaryCollection
-          .shards()
-          .forEach(
-              shard ->
-                  shard
-                      .replicas()
-                      .forEach(
-                          replica ->
-                              secondaryNodeShardReplicas
-                                  .computeIfAbsent(replica.getNode(), n -> new 
HashMap<>())
-                                  .computeIfAbsent(
-                                      replica.getShard().getShardName(), s -> 
new AtomicInteger())
-                                  .incrementAndGet()));
+    private static final class AffinityPlacementContext {
+      private final Set<String> allSpreadDomains = new HashSet<>();
+      private final Map<String, Map<String, ReplicaSpread>> spreadDomainUsage 
= new HashMap<>();
+      private final Set<String> allAvailabilityZones = new HashSet<>();
+      private final Map<String, Map<String, Map<Replica.ReplicaType, 
ReplicaSpread>>>
+          availabilityZoneUsage = new HashMap<>();
+      private boolean doSpreadAcrossDomains;
+    }
 
-      // find the colocated-with collections
-      Map<Node, Set<String>> colocatingNodes = new HashMap<>();
-      try {
-        for (String colocatedCollection : colocatedCollections) {
-          SolrCollection coll = cluster.getCollection(colocatedCollection);
-          coll.shards()
-              .forEach(
-                  shard ->
-                      shard
-                          .replicas()
-                          .forEach(
-                              replica ->
-                                  colocatingNodes
-                                      .computeIfAbsent(replica.getNode(), n -> 
new HashSet<>())
-                                      .add(coll.getName())));
-        }
-      } catch (IOException ioe) {
-        throw new PlacementModificationException(
-            "failed to retrieve colocated collection information", ioe);
-      }
-      PlacementModificationException exception = null;
-      for (Replica replica : deleteReplicasRequest.getReplicas()) {
-        if (!colocatingNodes.containsKey(replica.getNode())) {
-          continue;
-        }
-        // check that there will be at least one replica remaining
-        AtomicInteger secondaryCount =
-            secondaryNodeShardReplicas
-                .getOrDefault(replica.getNode(), Map.of())
-                .getOrDefault(replica.getShard().getShardName(), new 
AtomicInteger());
-        if (secondaryCount.get() > 1) {
-          // we can delete it - record the deletion
-          secondaryCount.decrementAndGet();
-          continue;
-        }
-        // fail - this replica cannot be removed
-        if (exception == null) {
-          exception = new PlacementModificationException("delete replica(s) 
rejected");
+    @Override
+    protected Map<Node, WeightedNode> getBaseWeightedNodes(
+        PlacementContext placementContext,
+        Set<Node> nodes,
+        Iterable<SolrCollection> relevantCollections,
+        boolean skipNodesWithErrors)
+        throws PlacementException {
+      // Fetch attributes for a superset of all nodes requested amongst the 
placementRequests
+      AttributeFetcher attributeFetcher = 
placementContext.getAttributeFetcher();
+      attributeFetcher
+          
.requestNodeSystemProperty(AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP)
+          .requestNodeSystemProperty(AffinityPlacementConfig.NODE_TYPE_SYSPROP)
+          
.requestNodeSystemProperty(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP)
+          
.requestNodeSystemProperty(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
+      attributeFetcher
+          .requestNodeMetric(BuiltInMetrics.NODE_NUM_CORES)
+          .requestNodeMetric(BuiltInMetrics.NODE_FREE_DISK_GB);
+      Set<ReplicaMetric<?>> replicaMetrics = 
Set.of(BuiltInMetrics.REPLICA_INDEX_SIZE_GB);
+      Set<String> requestedCollections = new HashSet<>();
+      for (SolrCollection collection : relevantCollections) {
+        if (requestedCollections.add(collection.getName())) {
+          attributeFetcher.requestCollectionMetrics(collection, 
replicaMetrics);
         }
-        exception.addRejectedModification(
-            replica.toString(),
-            "co-located with replicas of " + 
colocatingNodes.get(replica.getNode()));
-      }
-      if (exception != null) {
-        throw exception;
       }
-    }
+      attributeFetcher.fetchFrom(nodes);
+      final AttributeValues attrValues = attributeFetcher.fetchAttributes();
 
-    private Set<String> getZonesFromNodes(Set<Node> nodes, final 
AttributeValues attrValues) {
-      Set<String> azs = new HashSet<>();
+      AffinityPlacementContext affinityPlacementContext = new 
AffinityPlacementContext();
+      affinityPlacementContext.doSpreadAcrossDomains = spreadAcrossDomains;
 
-      for (Node n : nodes) {
-        azs.add(getNodeAZ(n, attrValues));
+      Map<Node, WeightedNode> affinityNodeMap = 
CollectionUtil.newHashMap(nodes.size());
+      for (Node node : nodes) {
+        AffinityNode affinityNode =
+            newNodeFromMetrics(node, attrValues, affinityPlacementContext, 
skipNodesWithErrors);
+        if (affinityNode != null) {
+          affinityNodeMap.put(node, affinityNode);
+        }
       }
 
-      return Collections.unmodifiableSet(azs);
+      return affinityNodeMap;
     }
 
-    /**
-     * Resolves the AZ of a node and takes care of nodes that have no defined 
AZ in system property
-     * {@link AffinityPlacementConfig#AVAILABILITY_ZONE_SYSPROP} to then 
return {@link
-     * AffinityPlacementConfig#UNDEFINED_AVAILABILITY_ZONE} as the AZ name.
-     */
-    private String getNodeAZ(Node n, final AttributeValues attrValues) {
-      Optional<String> nodeAz =
-          attrValues.getSystemProperty(n, 
AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP);
-      // All nodes with undefined AZ will be considered part of the same AZ. 
This also works for
-      // deployments that do not care about AZ's
-      return 
nodeAz.orElse(AffinityPlacementConfig.UNDEFINED_AVAILABILITY_ZONE);
-    }
-
-    /**
-     * This class captures an availability zone and the nodes that are 
legitimate targets for
-     * replica placement in that Availability Zone. Instances are used as 
values in a {@link
-     * java.util.TreeMap} in which the total number of already existing 
replicas in the AZ is the
-     * key. This allows easily picking the set of nodes from which to select a 
node for placement in
-     * order to balance the number of replicas per AZ. Picking one of the 
nodes from the set is done
-     * using different criteria unrelated to the Availability Zone (picking 
the node is based on the
-     * {@link CoresAndDiskComparator} ordering).
-     */
-    private static class AzWithNodes {
-      final String azName;
-      private final boolean useSpreadDomains;
-      private boolean listIsSorted = false;
-      private final Comparator<Node> nodeComparator;
-      private final Random random;
-      private final List<Node> availableNodesForPlacement;
-      private final AttributeValues attributeValues;
-      private final ReplicaMetrics leaderMetrics;
-      private TreeSet<SpreadDomainWithNodes> sortedSpreadDomains;
-      private final Map<String, Integer> currentSpreadDomainUsageUsage;
-      private int numNodesForPlacement;
-
-      AzWithNodes(
-          String azName,
-          List<Node> availableNodesForPlacement,
-          boolean useSpreadDomains,
-          Comparator<Node> nodeComparator,
-          Random random,
-          AttributeValues attributeValues,
-          ReplicaMetrics leaderMetrics,
-          Map<String, Integer> currentSpreadDomainUsageUsage) {
-        this.azName = azName;
-        this.availableNodesForPlacement = availableNodesForPlacement;
-        this.useSpreadDomains = useSpreadDomains;
-        this.nodeComparator = nodeComparator;
-        this.random = random;
-        this.attributeValues = attributeValues;
-        this.leaderMetrics = leaderMetrics;
-        this.currentSpreadDomainUsageUsage = currentSpreadDomainUsageUsage;
-        this.numNodesForPlacement = availableNodesForPlacement.size();
-      }
-
-      private boolean hasBeenSorted() {
-        return (useSpreadDomains && sortedSpreadDomains != null)
-            || (!useSpreadDomains && listIsSorted);
-      }
-
-      void ensureSorted() {
-        if (!hasBeenSorted()) {
-          sort();
-        }
+    AffinityNode newNodeFromMetrics(
+        Node node,
+        AttributeValues attrValues,
+        AffinityPlacementContext affinityPlacementContext,
+        boolean skipNodesWithErrors)
+        throws PlacementException {
+      Set<Replica.ReplicaType> supportedReplicaTypes =
+          attrValues.getSystemProperty(node, 
AffinityPlacementConfig.REPLICA_TYPE_SYSPROP).stream()
+              .flatMap(s -> Arrays.stream(s.split(",")))
+              .map(String::trim)
+              .map(s -> s.toUpperCase(Locale.ROOT))
+              .map(
+                  s -> {
+                    try {
+                      return Replica.ReplicaType.valueOf(s);
+                    } catch (IllegalArgumentException e) {
+                      log.warn(
+                          "Node {} has an invalid value for the {} 
systemProperty: {}",
+                          node.getName(),
+                          AffinityPlacementConfig.REPLICA_TYPE_SYSPROP,
+                          s);
+                      return null;
+                    }
+                  })
+              .collect(Collectors.toSet());
+      if (supportedReplicaTypes.isEmpty()) {
+        // If property not defined or is only whitespace on a node, assuming 
node can take any
+        // replica type
+        supportedReplicaTypes = Set.of(Replica.ReplicaType.values());
       }
 
-      private void sort() {
-        assert !listIsSorted && sortedSpreadDomains == null
-            : "We shouldn't be sorting this list again";
-
-        // Make sure we do not tend to use always the same nodes (within an 
AZ) if all
-        // conditions are identical (well, this likely is not the case since 
after having added
-        // a replica to a node its number of cores increases for the next 
placement decision,
-        // but let's be defensive here, given that multiple concurrent 
placement decisions might
-        // see the same initial cluster state, and we want placement to be 
reasonable even in
-        // that case without creating an unnecessary imbalance). For example, 
if all nodes have
-        // 0 cores and same amount of free disk space, ideally we want to pick 
a random node for
-        // placement, not always the same one due to some internal ordering.
-        Collections.shuffle(availableNodesForPlacement, random);
-
-        if (useSpreadDomains) {
-          // When we use spread domains, we don't just sort the list of nodes, 
instead we generate a
-          // TreeSet of SpreadDomainWithNodes,
-          // sorted by the number of times the domain has been used. Each
-          // SpreadDomainWithNodes internally contains the list of nodes that 
belong to that
-          // particular domain,
-          // and it's sorted internally by the comparator passed to this
-          // class (which is the same that's used when not using spread 
domains).
-          // Whenever a node from a particular SpreadDomainWithNodes is 
selected as the best
-          // candidate, the call to "removeBestNode" will:
-          // 1. Remove the SpreadDomainWithNodes instance from the TreeSet
-          // 2. Remove the best node from the list within the 
SpreadDomainWithNodes
-          // 3. Increment the count of times the domain has been used
-          // 4. Re-add the SpreadDomainWithNodes instance to the TreeSet if 
there are still nodes
-          // available
-          HashMap<String, List<Node>> spreadDomainToListOfNodesMap = new 
HashMap<>();
-          for (Node node : availableNodesForPlacement) {
-            spreadDomainToListOfNodesMap
-                .computeIfAbsent(
-                    attributeValues
-                        .getSystemProperty(node, 
AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
-                        .get(),
-                    k -> new ArrayList<>())
-                .add(node);
-          }
-          sortedSpreadDomains =
-              new TreeSet<>(new 
SpreadDomainComparator(currentSpreadDomainUsageUsage));
-
-          int i = 0;
-          for (Map.Entry<String, List<Node>> entry : 
spreadDomainToListOfNodesMap.entrySet()) {
-            // Sort the nodes within the spread domain by the provided 
comparator
-            entry.getValue().sort(nodeComparator);
-            sortedSpreadDomains.add(
-                new SpreadDomainWithNodes(entry.getKey(), entry.getValue(), 
i++, nodeComparator));
+      Set<String> nodeType;
+      Optional<String> nodePropOpt =
+          attrValues.getSystemProperty(node, 
AffinityPlacementConfig.NODE_TYPE_SYSPROP);
+      if (nodePropOpt.isEmpty()) {
+        nodeType = Collections.emptySet();
+      } else {
+        nodeType = new HashSet<>(StrUtils.splitSmart(nodePropOpt.get(), ','));
+      }
+
+      Optional<Double> nodeFreeDiskGB =
+          attrValues.getNodeMetric(node, BuiltInMetrics.NODE_FREE_DISK_GB);
+      Optional<Integer> nodeNumCores =
+          attrValues.getNodeMetric(node, BuiltInMetrics.NODE_NUM_CORES);
+      String az =
+          attrValues
+              .getSystemProperty(node, 
AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP)
+              .orElse(AffinityPlacementConfig.UNDEFINED_AVAILABILITY_ZONE);
+      affinityPlacementContext.allAvailabilityZones.add(az);
+      String spreadDomain;
+      if (affinityPlacementContext.doSpreadAcrossDomains) {
+        spreadDomain =
+            attrValues
+                .getSystemProperty(node, 
AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
+                .orElse(null);
+        if (spreadDomain == null) {
+          if (log.isWarnEnabled()) {
+            log.warn(
+                "AffinityPlacementPlugin configured to spread across domains, 
but node {} does not have the {} system property. Ignoring 
spreadAcrossDomains.",
+                node.getName(),
+                AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP);
           }
+          affinityPlacementContext.doSpreadAcrossDomains = false;
+          affinityPlacementContext.allSpreadDomains.clear();
         } else {
-          availableNodesForPlacement.sort(nodeComparator);
-          listIsSorted = true;
+          affinityPlacementContext.allSpreadDomains.add(spreadDomain);
         }
+      } else {
+        spreadDomain = null;
       }
-
-      Node getBestNode() {
-        assert hasBeenSorted();
-        if (useSpreadDomains) {
-          return sortedSpreadDomains.first().sortedNodesForPlacement.get(0);
-        } else {
-          return availableNodesForPlacement.get(0);
+      if (nodeFreeDiskGB.isEmpty() && skipNodesWithErrors) {
+        if (log.isWarnEnabled()) {
+          log.warn(
+              "Unknown free disk on node {}, excluding it from placement 
decisions.",
+              node.getName());
         }
-      }
-
-      public Node removeBestNode() {
-        assert hasBeenSorted();
-        this.numNodesForPlacement--;
-        Node n;
-        if (useSpreadDomains) {
-          // Since this SpreadDomainWithNodes needs to be re-sorted in the 
sortedSpreadDomains, we
-          // remove it and then re-add it, once the best node has been removed.
-          SpreadDomainWithNodes group = sortedSpreadDomains.pollFirst();
-          n = group.sortedNodesForPlacement.remove(0);
-          this.currentSpreadDomainUsageUsage.merge(group.spreadDomainName, 1, 
Integer::sum);
-          if (!group.sortedNodesForPlacement.isEmpty()) {
-            sortedSpreadDomains.add(group);
-          }
-        } else {
-          n = availableNodesForPlacement.remove(0);
+        return null;
+      } else if (nodeNumCores.isEmpty() && skipNodesWithErrors) {
+        if (log.isWarnEnabled()) {
+          log.warn(
+              "Unknown number of cores on node {}, excluding it from placement 
decisions.",
+              node.getName());
         }
-        Optional.ofNullable(leaderMetrics)
-            .flatMap(lrm -> 
lrm.getReplicaMetric(BuiltInMetrics.REPLICA_INDEX_SIZE_GB))
-            .ifPresent(
-                indexSize ->
-                    attributeValues.decreaseNodeMetric(
-                        n, BuiltInMetrics.NODE_FREE_DISK_GB, indexSize));
-        attributeValues.increaseNodeMetric(n, BuiltInMetrics.NODE_NUM_CORES, 
1);
-        return n;
-      }
-
-      public int numNodes() {
-        return this.numNodesForPlacement;
+        return null;
+      } else {
+        return new AffinityNode(
+            node,
+            attrValues,
+            affinityPlacementContext,
+            supportedReplicaTypes,
+            nodeType,
+            nodeNumCores.orElse(0),
+            nodeFreeDiskGB.orElse(0D),
+            az,
+            spreadDomain);
       }
     }
 
-    /**
-     * This class represents group of nodes with the same {@link
-     * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label.
-     */
-    static class SpreadDomainWithNodes implements 
Comparable<SpreadDomainWithNodes> {
+    private class AffinityNode extends WeightedNode {
 
-      /**
-       * This is the label that all nodes in this group have in {@link
-       * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label.
-       */
-      final String spreadDomainName;
+      private final AttributeValues attrValues;
 
-      /**
-       * The list of all nodes that contain the same {@link
-       * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label. They must be 
sorted before creating
-       * this class.
-       */
-      private final List<Node> sortedNodesForPlacement;
+      private final AffinityPlacementContext affinityPlacementContext;
 
-      /**
-       * This is used for tie breaking the sort of {@link 
SpreadDomainWithNodes}, when the
-       * nodeComparator between the top nodes of each group return 0.
-       */
-      private final int tieBreaker;
+      private final Set<Replica.ReplicaType> supportedReplicaTypes;
+      private final Set<String> nodeType;
 
-      /**
-       * This is the comparator that is used to compare the top nodes in the 
{@link
-       * #sortedNodesForPlacement} lists. Must be the same that was used to 
sort {@link
-       * #sortedNodesForPlacement}.
-       */
-      private final Comparator<Node> nodeComparator;
+      private int coresOnNode;
+      private double nodeFreeDiskGB;
 
-      public SpreadDomainWithNodes(
-          String spreadDomainName,
-          List<Node> sortedNodesForPlacement,
-          int tieBreaker,
-          Comparator<Node> nodeComparator) {
-        this.spreadDomainName = spreadDomainName;
-        this.sortedNodesForPlacement = sortedNodesForPlacement;
-        this.tieBreaker = tieBreaker;
-        this.nodeComparator = nodeComparator;
+      private final String availabilityZone;
+      private final String spreadDomain;
+
+      AffinityNode(
+          Node node,
+          AttributeValues attrValues,
+          AffinityPlacementContext affinityPlacementContext,
+          Set<Replica.ReplicaType> supportedReplicaTypes,
+          Set<String> nodeType,
+          int coresOnNode,
+          double nodeFreeDiskGB,
+          String az,
+          String spreadDomain) {
+        super(node);
+        this.attrValues = attrValues;
+        this.affinityPlacementContext = affinityPlacementContext;
+        this.supportedReplicaTypes = supportedReplicaTypes;
+        this.nodeType = nodeType;
+        this.coresOnNode = coresOnNode;
+        this.nodeFreeDiskGB = nodeFreeDiskGB;
+        this.availabilityZone = az;
+        this.spreadDomain = spreadDomain;
       }
 
       @Override
-      public int compareTo(SpreadDomainWithNodes o) {
-        if (o == this) {
-          return 0;
-        }
-        int result =
-            nodeComparator.compare(
-                this.sortedNodesForPlacement.get(0), 
o.sortedNodesForPlacement.get(0));
-        if (result == 0) {
-          return Integer.compare(this.tieBreaker, o.tieBreaker);
-        }
-        return result;
+      public int calcWeight() {
+        return coresOnNode
+            + 100 * (prioritizedFreeDiskGB > 0 && nodeFreeDiskGB < 
prioritizedFreeDiskGB ? 1 : 0)
+            + 10000 * getSpreadDomainWeight()
+            + 1000000 * getAZWeight();
       }
-    }
-
-    /**
-     * Builds the number of existing cores on each node returned in the 
attrValues. Nodes for which
-     * the number of cores is not available for whatever reason are excluded 
from acceptable
-     * candidate nodes as it would not be possible to make any meaningful 
placement decisions.
-     *
-     * @param nodes all nodes on which this plugin should compute placement
-     * @param attrValues attributes fetched for the nodes. This method uses 
system property {@link
-     *     AffinityPlacementConfig#REPLICA_TYPE_SYSPROP} as well as the number 
of cores on each
-     *     node.
-     */
-    private Map<Node, Integer> getCoreCountPerNode(
-        Set<Node> nodes, final AttributeValues attrValues) {
-      Map<Node, Integer> coresOnNodes = new HashMap<>();
 
-      for (Node node : nodes) {
-        attrValues
-            .getNodeMetric(node, BuiltInMetrics.NODE_NUM_CORES)
-            .ifPresent(count -> coresOnNodes.put(node, count));
+      @Override
+      public int calcRelevantWeightWithReplica(Replica replica) {
+        return coresOnNode
+            + 100
+                * (prioritizedFreeDiskGB > 0
+                        && nodeFreeDiskGB - getProjectedSizeOfReplica(replica)
+                            < prioritizedFreeDiskGB
+                    ? 1
+                    : 0)
+            + 10000 * projectReplicaSpreadWeight(replica)
+            + 1000000 * projectAZWeight(replica);
       }
 
-      return coresOnNodes;
-    }
-
-    /**
-     * Given the set of all nodes on which to do placement and fetched 
attributes, builds the sets
-     * representing candidate nodes for placement of replicas of each replica 
type. These sets are
-     * packaged and returned in an EnumMap keyed by replica type. Nodes for 
which the number of
-     * cores is not available for whatever reason are excluded from acceptable 
candidate nodes as it
-     * would not be possible to make any meaningful placement decisions.
-     *
-     * @param nodes all nodes on which this plugin should compute placement
-     * @param attrValues attributes fetched for the nodes. This method uses 
system property {@link
-     *     AffinityPlacementConfig#REPLICA_TYPE_SYSPROP} as well as the number 
of cores on each
-     *     node.
-     */
-    private EnumMap<Replica.ReplicaType, Set<Node>> 
getAvailableNodesForReplicaTypes(
-        Set<Node> nodes, final AttributeValues attrValues, final 
ReplicaMetrics leaderMetrics) {
-      EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes =
-          new EnumMap<>(Replica.ReplicaType.class);
-
-      for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
-        replicaTypeToNodes.put(replicaType, new HashSet<>());
+      @Override
+      public boolean canAddReplica(Replica replica) {
+        String collection = replica.getShard().getCollection().getName();
+        return
+        // By default, do not allow two replicas of the same shard on a node
+        super.canAddReplica(replica)
+            && supportedReplicaTypes.contains(replica.getType())
+            && Optional.ofNullable(nodeTypes.get(collection))
+                .map(s -> s.stream().anyMatch(nodeType::contains))
+                .orElse(true)
+            && Optional.ofNullable(withCollections.get(collection))
+                .map(this::hasCollectionOnNode)
+                .orElse(true)
+            && (minimalFreeDiskGB <= 0
+                || nodeFreeDiskGB - getProjectedSizeOfReplica(replica) > 
minimalFreeDiskGB);
       }
 
-      for (Node node : nodes) {
-        // Exclude nodes with unknown or too small disk free space
-        Optional<Double> nodeFreeDiskGB =
-            attrValues.getNodeMetric(node, BuiltInMetrics.NODE_FREE_DISK_GB);
-        if (nodeFreeDiskGB.isEmpty()) {
-          if (log.isWarnEnabled()) {
-            log.warn(
-                "Unknown free disk on node {}, excluding it from placement 
decisions.",
-                node.getName());
-          }
-          // We rely later on the fact that the free disk optional is present 
(see
-          // CoresAndDiskComparator), be careful it you change anything here.
-          continue;
-        }
-        double replicaIndexSize =
-            Optional.ofNullable(leaderMetrics)
-                .flatMap(lm -> 
lm.getReplicaMetric(BuiltInMetrics.REPLICA_INDEX_SIZE_GB))
-                .orElse(0D);
-        double projectedFreeDiskIfPlaced =
-            BuiltInMetrics.NODE_FREE_DISK_GB.decrease(nodeFreeDiskGB.get(), 
replicaIndexSize);
-        if (projectedFreeDiskIfPlaced < minimalFreeDiskGB) {
-          if (log.isWarnEnabled()) {
-            log.warn(
-                "Node {} free disk ({}GB) minus the projected replica size 
({}GB) is lower than configured"
-                    + " minimum {}GB, excluding it from placement decisions.",
-                node.getName(),
-                nodeFreeDiskGB.get(),
-                replicaIndexSize,
-                minimalFreeDiskGB);
-          }
-          continue;
-        }
-
-        if (attrValues.getNodeMetric(node, 
BuiltInMetrics.NODE_NUM_CORES).isEmpty()) {
-          if (log.isWarnEnabled()) {
-            log.warn(
-                "Unknown number of cores on node {}, excluding it from 
placement decisions.",
-                node.getName());
+      @Override
+      public Map<Replica, String> canRemoveReplicas(Collection<Replica> 
replicas) {
+        Map<Replica, String> replicaRemovalExceptions = new HashMap<>();
+        Map<String, Map<String, Set<Replica>>> removals = new HashMap<>();
+        for (Replica replica : replicas) {
+          SolrCollection collection = replica.getShard().getCollection();
+          Set<String> collocatedCollections = new HashSet<>();
+          Optional.ofNullable(collocatedWith.get(collection.getName()))
+              .ifPresent(collocatedCollections::addAll);
+          collocatedCollections.retainAll(getCollectionsOnNode());
+          if (collocatedCollections.isEmpty()) {
+            continue;
           }
-          // We rely later on the fact that the number of cores optional is 
present (see
-          // CoresAndDiskComparator), be careful it you change anything here.
-          continue;
-        }
 
-        String supportedReplicaTypes =
-            attrValues
-                    .getSystemProperty(node, 
AffinityPlacementConfig.REPLICA_TYPE_SYSPROP)
-                    .isPresent()
-                ? attrValues
-                    .getSystemProperty(node, 
AffinityPlacementConfig.REPLICA_TYPE_SYSPROP)
-                    .get()
-                : null;
-        // If property not defined or is only whitespace on a node, assuming 
node can take any
-        // replica type
-        if (supportedReplicaTypes == null || supportedReplicaTypes.isBlank()) {
-          for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
-            replicaTypeToNodes.get(rt).add(node);
-          }
-        } else {
-          Set<String> acceptedTypes =
-              Arrays.stream(supportedReplicaTypes.split(","))
-                  .map(String::trim)
-                  .map(s -> s.toLowerCase(Locale.ROOT))
-                  .collect(Collectors.toSet());
-          for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
-            if (acceptedTypes.contains(rt.name().toLowerCase(Locale.ROOT))) {
-              replicaTypeToNodes.get(rt).add(node);
-            }
+          // There are collocatedCollections for this shard, so make sure 
there is a replica of this
+          // shard left on the node after it is removed
+          Set<Replica> replicasRemovedForShard =
+              removals
+                  .computeIfAbsent(
+                      replica.getShard().getCollection().getName(), k -> new 
HashMap<>())
+                  .computeIfAbsent(replica.getShard().getShardName(), k -> new 
HashSet<>());
+          replicasRemovedForShard.add(replica);
+
+          if (replicasRemovedForShard.size()
+              >= getReplicasForShardOnNode(replica.getShard()).size()) {
+            replicaRemovalExceptions.put(
+                replica, "co-located with replicas of " + 
collocatedCollections);
           }
         }
+        return replicaRemovalExceptions;
       }
-      return replicaTypeToNodes;
-    }
 
-    /**
-     * Picks nodes from {@code targetNodes} for placing {@code numReplicas} 
replicas.
-     *
-     * <p>The criteria used in this method are, in this order:
-     *
-     * <ol>
-     *   <li>No more than one replica of a given shard on a given node 
(strictly enforced)
-     *   <li>Balance as much as possible replicas of a given {@link
-     *       org.apache.solr.cluster.Replica.ReplicaType} over available AZ's. 
This balancing takes
-     *       into account existing replicas <b>of the corresponding replica 
type</b>, if any.
-     *   <li>Place replicas if possible on nodes having more than a certain 
amount of free disk
-     *       space (note that nodes with a too small amount of free disk space 
were eliminated as
-     *       placement targets earlier, in {@link 
#getAvailableNodesForReplicaTypes(Set,
-     *       AttributeValues, ReplicaMetrics)}). There's a threshold here 
rather than sorting on the
-     *       amount of free disk space, because sorting on that value would in 
practice lead to
-     *       never considering the number of cores on a node.
-     *   <li>Place replicas on nodes having a smaller number of cores (the 
number of cores
-     *       considered for this decision includes previous placement 
decisions made during the
-     *       processing of the placement request)
-     * </ol>
-     */
-    @SuppressForbidden(
-        reason =
-            "Ordering.arbitrary() has no equivalent in Comparator class. 
Rather reuse than copy.")
-    private void makePlacementDecisions(
-        SolrCollection solrCollection,
-        String shardName,
-        Set<String> availabilityZones,
-        Replica.ReplicaType replicaType,
-        int numReplicas,
-        final AttributeValues attrValues,
-        final ReplicaMetrics leaderMetrics,
-        EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes,
-        Set<Node> nodesWithReplicas,
-        Map<Node, Integer> coresOnNodes,
-        PlacementPlanFactory placementPlanFactory,
-        Set<ReplicaPlacement> replicaPlacements,
-        boolean doSpreadAcrossDomains)
-        throws PlacementException {
-      // Count existing replicas per AZ. We count only instances of the type 
of replica for which we
-      // need to do placement. If we ever want to balance replicas of any type 
across AZ's (and not
-      // each replica type balanced independently), we'd have to move this 
data structure to the
-      // caller of this method so it can be reused across different replica 
type placements for a
-      // given shard. Note then that this change would be risky. For example 
all NRT's and PULL
-      // replicas for a shard my be correctly balanced over three AZ's, but 
then all NRT can end up
-      // in the same AZ...
-      Map<String, Integer> azToNumReplicas = new HashMap<>();
-      for (String az : availabilityZones) {
-        azToNumReplicas.put(az, 0);
+      @Override
+      protected boolean addProjectedReplicaWeights(Replica replica) {
+        nodeFreeDiskGB -= getProjectedSizeOfReplica(replica);
+        coresOnNode += 1;
+        return addReplicaToAzAndSpread(replica);
       }
 
-      // Build the set of candidate nodes for the placement, i.e. nodes that 
can accept the replica
-      // type
-      Set<Node> candidateNodes = new 
HashSet<>(replicaTypeToNodes.get(replicaType));
-      // Remove nodes that already have a replica for the shard (no two 
replicas of same shard can
-      // be put on same node)
-      candidateNodes.removeAll(nodesWithReplicas);
+      @Override
+      protected void initReplicaWeights(Replica replica) {
+        addReplicaToAzAndSpread(replica);
+      }
 
-      // This Map will include the affinity labels for the nodes that are 
currently hosting replicas
-      // of this shard. It will be modified with new placement decisions.
-      Map<String, Integer> spreadDomainsInUse = new HashMap<>();
-      Shard shard = solrCollection.getShard(shardName);
-      if (shard != null) {
-        // shard is non null if we're adding replicas to an already existing 
collection.
-        // If we're creating the collection, the shards do not exist yet.
-        for (Replica replica : shard.replicas()) {
-          // The node's AZ is counted as having a replica if it has a replica 
of the same type as
-          // the one we need to place here.
-          if (replica.getType() == replicaType) {
-            final String az = getNodeAZ(replica.getNode(), attrValues);
-            if (azToNumReplicas.containsKey(az)) {
-              // We do not count replicas on AZ's for which we don't have any 
node to place on
-              // because it would not help the placement decision. If we did 
want to do that, note
-              // the dereferencing below can't be assumed as the entry will 
not exist in the map.
-              azToNumReplicas.put(az, azToNumReplicas.get(az) + 1);
-            }
-            if (doSpreadAcrossDomains) {
-              attrValues
-                  .getSystemProperty(
-                      replica.getNode(), 
AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP)
-                  .ifPresent(nodeDomain -> 
spreadDomainsInUse.merge(nodeDomain, 1, Integer::sum));
-            }
-          }
+      private boolean addReplicaToAzAndSpread(Replica replica) {
+        boolean needsResort = false;
+        needsResort |=

Review Comment:
   you can remove the default value in the first line



##########
solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.solr.cloud.ActiveReplicaWatcher;
+import org.apache.solr.common.SolrCloseableLatch;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicaMigrationUtils {
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Code to migrate replicas to already chosen nodes. This will create new 
replicas, and delete the
+   * old replicas after the creation is done.
+   *
+   * <p>If an error occurs during the creation of new replicas, all new 
replicas will be deleted.
+   *
+   * @param ccc The collection command context to use from the API that calls 
this method
+   * @param movements a map from replica to the new node that the replica 
should live on
+   * @param parallel whether the replica creations should be done in parallel
+   * @param waitForFinalState wait for the final state of all newly created 
replicas before
+   *     continuing
+   * @param timeout the amount of time to wait for new replicas to be created
+   * @param asyncId If provided, the command will be run under the given 
asyncId
+   * @param results push results (successful and failure) onto this list
+   * @return whether the command was successful
+   */
+  static boolean migrateReplicas(
+      CollectionCommandContext ccc,
+      Map<Replica, String> movements,
+      boolean parallel,
+      boolean waitForFinalState,
+      int timeout,
+      String asyncId,
+      NamedList<Object> results)
+      throws IOException, InterruptedException, KeeperException {
+    // how many leaders are we moving? for these replicas we have to make sure 
that either:
+    // * another existing replica can become a leader, or
+    // * we wait until the newly created replica completes recovery (and can 
become the new leader)
+    // If waitForFinalState=true we wait for all replicas
+    int numLeaders = 0;
+    for (Replica replica : movements.keySet()) {
+      if (replica.isLeader() || waitForFinalState) {
+        numLeaders++;
+      }
+    }
+    // map of collectionName_coreNodeName to watchers
+    Map<String, CollectionStateWatcher> watchers = new HashMap<>();
+    List<ZkNodeProps> createdReplicas = new ArrayList<>();
+
+    AtomicBoolean anyOneFailed = new AtomicBoolean(false);
+    SolrCloseableLatch countDownLatch =
+        new SolrCloseableLatch(movements.size(), ccc.getCloseableToLatchOn());
+
+    SolrCloseableLatch replicasToRecover =
+        new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn());
+
+    ClusterState clusterState = ccc.getZkStateReader().getClusterState();
+
+    for (Map.Entry<Replica, String> movement : movements.entrySet()) {
+      Replica sourceReplica = movement.getKey();
+      String targetNode = movement.getValue();
+      String sourceCollection = sourceReplica.getCollection();
+      if (log.isInfoEnabled()) {
+        log.info(
+            "Going to create replica for collection={} shard={} on node={}",
+            sourceCollection,
+            sourceReplica.getShard(),
+            targetNode);
+      }
+
+      ZkNodeProps msg =
+          sourceReplica
+              .toFullProps()
+              .plus("parallel", String.valueOf(parallel))
+              .plus(CoreAdminParams.NODE, targetNode);
+      if (asyncId != null) msg.getProperties().put(ASYNC, asyncId);
+      NamedList<Object> nl = new NamedList<>();
+      final ZkNodeProps addedReplica =
+          new AddReplicaCmd(ccc)
+              .addReplica(
+                  clusterState,
+                  msg,
+                  nl,
+                  () -> {
+                    countDownLatch.countDown();

Review Comment:
   Should this happen at the end of the method instead of the beginning? 
Otherwise there is a race condition with the `anyOneFailed` I think?



##########
solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.solr.cloud.ActiveReplicaWatcher;
+import org.apache.solr.common.SolrCloseableLatch;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicaMigrationUtils {
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Code to migrate replicas to already chosen nodes. This will create new 
replicas, and delete the
+   * old replicas after the creation is done.
+   *
+   * <p>If an error occurs during the creation of new replicas, all new 
replicas will be deleted.
+   *
+   * @param ccc The collection command context to use from the API that calls 
this method
+   * @param movements a map from replica to the new node that the replica 
should live on
+   * @param parallel whether the replica creations should be done in parallel
+   * @param waitForFinalState wait for the final state of all newly created 
replicas before
+   *     continuing
+   * @param timeout the amount of time to wait for new replicas to be created
+   * @param asyncId If provided, the command will be run under the given 
asyncId
+   * @param results push results (successful and failure) onto this list
+   * @return whether the command was successful
+   */
+  static boolean migrateReplicas(
+      CollectionCommandContext ccc,
+      Map<Replica, String> movements,
+      boolean parallel,
+      boolean waitForFinalState,
+      int timeout,
+      String asyncId,
+      NamedList<Object> results)
+      throws IOException, InterruptedException, KeeperException {
+    // how many leaders are we moving? for these replicas we have to make sure 
that either:
+    // * another existing replica can become a leader, or
+    // * we wait until the newly created replica completes recovery (and can 
become the new leader)
+    // If waitForFinalState=true we wait for all replicas
+    int numLeaders = 0;
+    for (Replica replica : movements.keySet()) {
+      if (replica.isLeader() || waitForFinalState) {
+        numLeaders++;
+      }
+    }
+    // map of collectionName_coreNodeName to watchers
+    Map<String, CollectionStateWatcher> watchers = new HashMap<>();
+    List<ZkNodeProps> createdReplicas = new ArrayList<>();
+
+    AtomicBoolean anyOneFailed = new AtomicBoolean(false);
+    SolrCloseableLatch countDownLatch =
+        new SolrCloseableLatch(movements.size(), ccc.getCloseableToLatchOn());
+
+    SolrCloseableLatch replicasToRecover =
+        new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn());
+
+    ClusterState clusterState = ccc.getZkStateReader().getClusterState();
+
+    for (Map.Entry<Replica, String> movement : movements.entrySet()) {
+      Replica sourceReplica = movement.getKey();
+      String targetNode = movement.getValue();
+      String sourceCollection = sourceReplica.getCollection();
+      if (log.isInfoEnabled()) {
+        log.info(
+            "Going to create replica for collection={} shard={} on node={}",
+            sourceCollection,
+            sourceReplica.getShard(),
+            targetNode);
+      }
+
+      ZkNodeProps msg =
+          sourceReplica
+              .toFullProps()
+              .plus("parallel", String.valueOf(parallel))
+              .plus(CoreAdminParams.NODE, targetNode);
+      if (asyncId != null) msg.getProperties().put(ASYNC, asyncId);
+      NamedList<Object> nl = new NamedList<>();
+      final ZkNodeProps addedReplica =
+          new AddReplicaCmd(ccc)
+              .addReplica(
+                  clusterState,
+                  msg,
+                  nl,
+                  () -> {
+                    countDownLatch.countDown();
+                    if (nl.get("failure") != null) {
+                      String errorString =
+                          String.format(
+                              Locale.ROOT,
+                              "Failed to create replica for collection=%s 
shard=%s" + " on node=%s",
+                              sourceCollection,
+                              sourceReplica.getShard(),
+                              targetNode);
+                      log.warn(errorString);
+                      // one replica creation failed. Make the best attempt to
+                      // delete all the replicas created so far in the target
+                      // and exit
+                      synchronized (results) {
+                        results.add("failure", errorString);
+                        anyOneFailed.set(true);
+                      }
+                    } else {
+                      if (log.isDebugEnabled()) {
+                        log.debug(
+                            "Successfully created replica for collection={} 
shard={} on node={}",
+                            sourceCollection,
+                            sourceReplica.getShard(),
+                            targetNode);
+                      }
+                    }
+                  })
+              .get(0);
+
+      if (addedReplica != null) {
+        createdReplicas.add(addedReplica);
+        if (sourceReplica.isLeader() || waitForFinalState) {
+          String shardName = sourceReplica.getShard();
+          String replicaName = sourceReplica.getName();
+          String key = sourceCollection + "_" + replicaName;
+          CollectionStateWatcher watcher;
+          if (waitForFinalState) {
+            watcher =
+                new ActiveReplicaWatcher(
+                    sourceCollection,
+                    null,
+                    
Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)),
+                    replicasToRecover);
+          } else {
+            watcher =
+                new LeaderRecoveryWatcher(
+                    sourceCollection,
+                    shardName,
+                    replicaName,
+                    addedReplica.getStr(ZkStateReader.CORE_NAME_PROP),
+                    replicasToRecover);
+          }
+          watchers.put(key, watcher);
+          log.debug("--- adding {}, {}", key, watcher);
+          
ccc.getZkStateReader().registerCollectionStateWatcher(sourceCollection, 
watcher);
+        } else {
+          log.debug("--- not waiting for {}", addedReplica);
+        }
+      }
+    }
+
+    log.debug("Waiting for replicas to be added");
+    if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
+      log.info("Timed out waiting for replicas to be added");
+      anyOneFailed.set(true);
+    } else {
+      log.debug("Finished waiting for replicas to be added");
+    }
+
+    // now wait for leader replicas to recover
+    log.debug("Waiting for {} leader replicas to recover", numLeaders);
+    if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {

Review Comment:
   Do we need to wait for this if `anyOneFailed` is `true`?



##########
solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.solr.cloud.ActiveReplicaWatcher;
+import org.apache.solr.common.SolrCloseableLatch;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicaMigrationUtils {
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Code to migrate replicas to already chosen nodes. This will create new 
replicas, and delete the
+   * old replicas after the creation is done.
+   *
+   * <p>If an error occurs during the creation of new replicas, all new 
replicas will be deleted.
+   *
+   * @param ccc The collection command context to use from the API that calls 
this method
+   * @param movements a map from replica to the new node that the replica 
should live on
+   * @param parallel whether the replica creations should be done in parallel
+   * @param waitForFinalState wait for the final state of all newly created 
replicas before
+   *     continuing
+   * @param timeout the amount of time to wait for new replicas to be created
+   * @param asyncId If provided, the command will be run under the given 
asyncId
+   * @param results push results (successful and failure) onto this list
+   * @return whether the command was successful
+   */
+  static boolean migrateReplicas(
+      CollectionCommandContext ccc,
+      Map<Replica, String> movements,
+      boolean parallel,
+      boolean waitForFinalState,
+      int timeout,
+      String asyncId,
+      NamedList<Object> results)
+      throws IOException, InterruptedException, KeeperException {
+    // how many leaders are we moving? for these replicas we have to make sure 
that either:
+    // * another existing replica can become a leader, or
+    // * we wait until the newly created replica completes recovery (and can 
become the new leader)
+    // If waitForFinalState=true we wait for all replicas
+    int numLeaders = 0;
+    for (Replica replica : movements.keySet()) {
+      if (replica.isLeader() || waitForFinalState) {
+        numLeaders++;
+      }
+    }
+    // map of collectionName_coreNodeName to watchers
+    Map<String, CollectionStateWatcher> watchers = new HashMap<>();
+    List<ZkNodeProps> createdReplicas = new ArrayList<>();
+
+    AtomicBoolean anyOneFailed = new AtomicBoolean(false);
+    SolrCloseableLatch countDownLatch =
+        new SolrCloseableLatch(movements.size(), ccc.getCloseableToLatchOn());
+
+    SolrCloseableLatch replicasToRecover =
+        new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn());
+
+    ClusterState clusterState = ccc.getZkStateReader().getClusterState();
+
+    for (Map.Entry<Replica, String> movement : movements.entrySet()) {
+      Replica sourceReplica = movement.getKey();
+      String targetNode = movement.getValue();
+      String sourceCollection = sourceReplica.getCollection();
+      if (log.isInfoEnabled()) {
+        log.info(
+            "Going to create replica for collection={} shard={} on node={}",
+            sourceCollection,
+            sourceReplica.getShard(),
+            targetNode);
+      }
+
+      ZkNodeProps msg =
+          sourceReplica
+              .toFullProps()
+              .plus("parallel", String.valueOf(parallel))
+              .plus(CoreAdminParams.NODE, targetNode);
+      if (asyncId != null) msg.getProperties().put(ASYNC, asyncId);
+      NamedList<Object> nl = new NamedList<>();
+      final ZkNodeProps addedReplica =
+          new AddReplicaCmd(ccc)
+              .addReplica(
+                  clusterState,
+                  msg,
+                  nl,
+                  () -> {
+                    countDownLatch.countDown();
+                    if (nl.get("failure") != null) {
+                      String errorString =
+                          String.format(
+                              Locale.ROOT,
+                              "Failed to create replica for collection=%s 
shard=%s" + " on node=%s",
+                              sourceCollection,
+                              sourceReplica.getShard(),
+                              targetNode);
+                      log.warn(errorString);
+                      // one replica creation failed. Make the best attempt to
+                      // delete all the replicas created so far in the target
+                      // and exit
+                      synchronized (results) {
+                        results.add("failure", errorString);
+                        anyOneFailed.set(true);
+                      }
+                    } else {
+                      if (log.isDebugEnabled()) {
+                        log.debug(
+                            "Successfully created replica for collection={} 
shard={} on node={}",
+                            sourceCollection,
+                            sourceReplica.getShard(),
+                            targetNode);
+                      }
+                    }
+                  })
+              .get(0);
+
+      if (addedReplica != null) {
+        createdReplicas.add(addedReplica);
+        if (sourceReplica.isLeader() || waitForFinalState) {
+          String shardName = sourceReplica.getShard();
+          String replicaName = sourceReplica.getName();
+          String key = sourceCollection + "_" + replicaName;
+          CollectionStateWatcher watcher;
+          if (waitForFinalState) {
+            watcher =
+                new ActiveReplicaWatcher(
+                    sourceCollection,
+                    null,
+                    
Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)),
+                    replicasToRecover);
+          } else {
+            watcher =
+                new LeaderRecoveryWatcher(
+                    sourceCollection,
+                    shardName,
+                    replicaName,
+                    addedReplica.getStr(ZkStateReader.CORE_NAME_PROP),
+                    replicasToRecover);
+          }
+          watchers.put(key, watcher);
+          log.debug("--- adding {}, {}", key, watcher);
+          
ccc.getZkStateReader().registerCollectionStateWatcher(sourceCollection, 
watcher);
+        } else {
+          log.debug("--- not waiting for {}", addedReplica);
+        }
+      }
+    }
+
+    log.debug("Waiting for replicas to be added");
+    if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
+      log.info("Timed out waiting for replicas to be added");
+      anyOneFailed.set(true);
+    } else {
+      log.debug("Finished waiting for replicas to be added");
+    }
+
+    // now wait for leader replicas to recover
+    log.debug("Waiting for {} leader replicas to recover", numLeaders);
+    if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
+      if (log.isInfoEnabled()) {
+        log.info(
+            "Timed out waiting for {} leader replicas to recover", 
replicasToRecover.getCount());
+      }
+      anyOneFailed.set(true);
+    } else {
+      log.debug("Finished waiting for leader replicas to recover");
+    }
+    // remove the watchers, we're done either way
+    for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) {
+      ccc.getZkStateReader().removeCollectionStateWatcher(e.getKey(), 
e.getValue());
+    }
+    if (anyOneFailed.get()) {
+      log.info("Failed to create some replicas. Cleaning up all replicas on 
target node");
+      SolrCloseableLatch cleanupLatch =
+          new SolrCloseableLatch(createdReplicas.size(), 
ccc.getCloseableToLatchOn());
+      for (ZkNodeProps createdReplica : createdReplicas) {
+        NamedList<Object> deleteResult = new NamedList<>();
+        try {
+          new DeleteReplicaCmd(ccc)
+              .deleteReplica(
+                  ccc.getZkStateReader().getClusterState(),
+                  createdReplica.plus("parallel", "true"),
+                  deleteResult,
+                  () -> {
+                    cleanupLatch.countDown();
+                    if (deleteResult.get("failure") != null) {
+                      synchronized (results) {
+                        results.add(
+                            "failure",
+                            "Could not cleanup, because of : " + 
deleteResult.get("failure"));
+                      }
+                    }
+                  });
+        } catch (KeeperException e) {
+          cleanupLatch.countDown();
+          log.warn("Error deleting replica ", e);
+        } catch (Exception e) {
+          log.warn("Error deleting replica ", e);
+          cleanupLatch.countDown();
+          throw e;
+        }
+      }
+      cleanupLatch.await(5, TimeUnit.MINUTES);
+      return false;
+    }
+
+    // we have reached this far, meaning all replicas should have been 
recreated.
+    // now cleanup the original replicas
+    return cleanupReplicas(
+        results, ccc.getZkStateReader().getClusterState(), movements.keySet(), 
ccc, asyncId);
+  }
+
+  static boolean cleanupReplicas(
+      NamedList<Object> results,
+      ClusterState clusterState,
+      Collection<Replica> sourceReplicas,
+      CollectionCommandContext ccc,
+      String async)
+      throws IOException, InterruptedException {
+    CountDownLatch cleanupLatch = new CountDownLatch(sourceReplicas.size());

Review Comment:
   We don't use `SolrCloseableLatch` here?



##########
solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java:
##########
@@ -251,1006 +231,463 @@ private AffinityPlacementPlugin(
 
       // We make things reproducible in tests by using test seed if any
       String seed = System.getProperty("tests.seed");

Review Comment:
   Leftover?



##########
solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.solr.cloud.ActiveReplicaWatcher;
+import org.apache.solr.common.SolrCloseableLatch;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicaMigrationUtils {
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Code to migrate replicas to already chosen nodes. This will create new 
replicas, and delete the
+   * old replicas after the creation is done.
+   *
+   * <p>If an error occurs during the creation of new replicas, all new 
replicas will be deleted.
+   *
+   * @param ccc The collection command context to use from the API that calls 
this method
+   * @param movements a map from replica to the new node that the replica 
should live on
+   * @param parallel whether the replica creations should be done in parallel
+   * @param waitForFinalState wait for the final state of all newly created 
replicas before
+   *     continuing
+   * @param timeout the amount of time to wait for new replicas to be created
+   * @param asyncId If provided, the command will be run under the given 
asyncId
+   * @param results push results (successful and failure) onto this list
+   * @return whether the command was successful
+   */
+  static boolean migrateReplicas(
+      CollectionCommandContext ccc,
+      Map<Replica, String> movements,
+      boolean parallel,
+      boolean waitForFinalState,
+      int timeout,
+      String asyncId,
+      NamedList<Object> results)
+      throws IOException, InterruptedException, KeeperException {
+    // how many leaders are we moving? for these replicas we have to make sure 
that either:
+    // * another existing replica can become a leader, or
+    // * we wait until the newly created replica completes recovery (and can 
become the new leader)
+    // If waitForFinalState=true we wait for all replicas
+    int numLeaders = 0;
+    for (Replica replica : movements.keySet()) {
+      if (replica.isLeader() || waitForFinalState) {
+        numLeaders++;
+      }
+    }
+    // map of collectionName_coreNodeName to watchers
+    Map<String, CollectionStateWatcher> watchers = new HashMap<>();
+    List<ZkNodeProps> createdReplicas = new ArrayList<>();
+
+    AtomicBoolean anyOneFailed = new AtomicBoolean(false);
+    SolrCloseableLatch countDownLatch =
+        new SolrCloseableLatch(movements.size(), ccc.getCloseableToLatchOn());
+
+    SolrCloseableLatch replicasToRecover =
+        new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn());
+
+    ClusterState clusterState = ccc.getZkStateReader().getClusterState();
+
+    for (Map.Entry<Replica, String> movement : movements.entrySet()) {
+      Replica sourceReplica = movement.getKey();
+      String targetNode = movement.getValue();
+      String sourceCollection = sourceReplica.getCollection();
+      if (log.isInfoEnabled()) {
+        log.info(
+            "Going to create replica for collection={} shard={} on node={}",
+            sourceCollection,
+            sourceReplica.getShard(),
+            targetNode);
+      }
+
+      ZkNodeProps msg =
+          sourceReplica
+              .toFullProps()
+              .plus("parallel", String.valueOf(parallel))
+              .plus(CoreAdminParams.NODE, targetNode);
+      if (asyncId != null) msg.getProperties().put(ASYNC, asyncId);
+      NamedList<Object> nl = new NamedList<>();
+      final ZkNodeProps addedReplica =
+          new AddReplicaCmd(ccc)
+              .addReplica(
+                  clusterState,
+                  msg,
+                  nl,
+                  () -> {
+                    countDownLatch.countDown();
+                    if (nl.get("failure") != null) {
+                      String errorString =
+                          String.format(
+                              Locale.ROOT,
+                              "Failed to create replica for collection=%s 
shard=%s" + " on node=%s",

Review Comment:
   No need to have two strings here



##########
solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java:
##########
@@ -135,190 +100,35 @@ public void call(ClusterState state, ZkNodeProps 
message, NamedList<Object> resu
         assignRequests.add(assignRequest);
       }
       Assign.AssignStrategy assignStrategy = 
Assign.createAssignStrategy(ccc.getCoreContainer());
-      replicaPositions = assignStrategy.assign(ccc.getSolrCloudManager(), 
assignRequests);
-    }
-    int replicaPositionIdx = 0;
-    for (ZkNodeProps sourceReplica : sourceReplicas) {
-      String sourceCollection = sourceReplica.getStr(COLLECTION_PROP);
-      if (log.isInfoEnabled()) {
-        log.info(
-            "Going to create replica for collection={} shard={} on node={}",
-            sourceCollection,
-            sourceReplica.getStr(SHARD_ID_PROP),
-            target);
-      }
-      String targetNode;
-      // Use the assigned replica positions, if target is null or empty 
(checked above)
-      if (replicaPositions != null) {
-        targetNode = replicaPositions.get(replicaPositionIdx).node;
-        replicaPositionIdx++;
-      } else {
-        targetNode = target;
-      }
-      ZkNodeProps msg =
-          sourceReplica
-              .plus("parallel", String.valueOf(parallel))
-              .plus(CoreAdminParams.NODE, targetNode);
-      if (async != null) msg.getProperties().put(ASYNC, async);
-      NamedList<Object> nl = new NamedList<>();
-      final ZkNodeProps addedReplica =
-          new AddReplicaCmd(ccc)
-              .addReplica(
-                  clusterState,
-                  msg,
-                  nl,
-                  () -> {
-                    countDownLatch.countDown();
-                    if (nl.get("failure") != null) {
-                      String errorString =
-                          String.format(
-                              Locale.ROOT,
-                              "Failed to create replica for collection=%s 
shard=%s" + " on node=%s",
-                              sourceCollection,
-                              sourceReplica.getStr(SHARD_ID_PROP),
-                              target);
-                      log.warn(errorString);
-                      // one replica creation failed. Make the best attempt to
-                      // delete all the replicas created so far in the target
-                      // and exit
-                      synchronized (results) {
-                        results.add("failure", errorString);
-                        anyOneFailed.set(true);
-                      }
-                    } else {
-                      if (log.isDebugEnabled()) {
-                        log.debug(
-                            "Successfully created replica for collection={} 
shard={} on node={}",
-                            sourceCollection,
-                            sourceReplica.getStr(SHARD_ID_PROP),
-                            target);
-                      }
-                    }
-                  })
-              .get(0);
-
-      if (addedReplica != null) {
-        createdReplicas.add(addedReplica);
-        if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || 
waitForFinalState) {
-          String shardName = sourceReplica.getStr(SHARD_ID_PROP);
-          String replicaName = 
sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
-          String collectionName = sourceCollection;
-          String key = collectionName + "_" + replicaName;
-          CollectionStateWatcher watcher;
-          if (waitForFinalState) {
-            watcher =
-                new ActiveReplicaWatcher(
-                    collectionName,
-                    null,
-                    
Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)),
-                    replicasToRecover);
-          } else {
-            watcher =
-                new LeaderRecoveryWatcher(
-                    collectionName,
-                    shardName,
-                    replicaName,
-                    addedReplica.getStr(ZkStateReader.CORE_NAME_PROP),
-                    replicasToRecover);
-          }
-          watchers.put(key, watcher);
-          log.debug("--- adding {}, {}", key, watcher);
-          zkStateReader.registerCollectionStateWatcher(collectionName, 
watcher);
-        } else {
-          log.debug("--- not waiting for {}", addedReplica);
-        }
+      List<ReplicaPosition> replicaPositions =
+          assignStrategy.assign(ccc.getSolrCloudManager(), assignRequests);
+      int position = 0;
+      for (Replica sourceReplica : sourceReplicas) {
+        replicaMovements.put(sourceReplica, 
replicaPositions.get(position++).node);
       }
-    }
-
-    log.debug("Waiting for replicas to be added");
-    if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
-      log.info("Timed out waiting for replicas to be added");
-      anyOneFailed.set(true);
-    } else {
-      log.debug("Finished waiting for replicas to be added");
-    }
-
-    // now wait for leader replicas to recover
-    log.debug("Waiting for {} leader replicas to recover", numLeaders);
-    if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
-      if (log.isInfoEnabled()) {
-        log.info(
-            "Timed out waiting for {} leader replicas to recover", 
replicasToRecover.getCount());
-      }
-      anyOneFailed.set(true);
     } else {
-      log.debug("Finished waiting for leader replicas to recover");
-    }
-    // remove the watchers, we're done either way
-    for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) {
-      zkStateReader.removeCollectionStateWatcher(e.getKey(), e.getValue());
-    }
-    if (anyOneFailed.get()) {
-      log.info("Failed to create some replicas. Cleaning up all replicas on 
target node");
-      SolrCloseableLatch cleanupLatch =
-          new SolrCloseableLatch(createdReplicas.size(), 
ccc.getCloseableToLatchOn());
-      for (ZkNodeProps createdReplica : createdReplicas) {
-        NamedList<Object> deleteResult = new NamedList<>();
-        try {
-          new DeleteReplicaCmd(ccc)
-              .deleteReplica(
-                  zkStateReader.getClusterState(),
-                  createdReplica.plus("parallel", "true"),
-                  deleteResult,
-                  () -> {
-                    cleanupLatch.countDown();
-                    if (deleteResult.get("failure") != null) {
-                      synchronized (results) {
-                        results.add(
-                            "failure",
-                            "Could not cleanup, because of : " + 
deleteResult.get("failure"));
-                      }
-                    }
-                  });
-        } catch (KeeperException e) {
-          cleanupLatch.countDown();
-          log.warn("Error deleting replica ", e);
-        } catch (Exception e) {
-          log.warn("Error deleting replica ", e);
-          cleanupLatch.countDown();
-          throw e;
-        }
+      for (Replica sourceReplica : sourceReplicas) {
+        replicaMovements.put(sourceReplica, target);
       }
-      cleanupLatch.await(5, TimeUnit.MINUTES);
-      return;
     }
 
-    // we have reached this far means all replicas could be recreated
-    // now cleanup the replicas in the source node
-    DeleteNodeCmd.cleanupReplicas(results, state, sourceReplicas, ccc, source, 
async);
-    results.add(
-        "success",
-        "REPLACENODE action completed successfully from  : " + source + " to : 
" + target);
+    boolean migrationSuccessful =
+        ReplicaMigrationUtils.migrateReplicas(
+            ccc, replicaMovements, parallel, waitForFinalState, timeout, 
async, results);
+    if (migrationSuccessful) {
+      results.add(
+          "success",
+          "REPLACENODE action completed successfully from  : " + source + " to 
: " + target);

Review Comment:
   I know it's not new, but this log line is misleading in the case of async 
(and maybe in waitForFinalState=false) too. Should we at least include also the 
values for async? or maybe have something like:
   ```
   "REPLACENODE action " + (async == null ? "completed": "submitted") + " 
successfully from : "
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org
For additional commands, e-mail: issues-h...@solr.apache.org

Reply via email to