HoustonPutman commented on code in PR #1650:
URL: https://github.com/apache/solr/pull/1650#discussion_r1224394581


##########
solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java:
##########
@@ -135,190 +100,35 @@ public void call(ClusterState state, ZkNodeProps 
message, NamedList<Object> resu
         assignRequests.add(assignRequest);
       }
       Assign.AssignStrategy assignStrategy = 
Assign.createAssignStrategy(ccc.getCoreContainer());
-      replicaPositions = assignStrategy.assign(ccc.getSolrCloudManager(), 
assignRequests);
-    }
-    int replicaPositionIdx = 0;
-    for (ZkNodeProps sourceReplica : sourceReplicas) {
-      String sourceCollection = sourceReplica.getStr(COLLECTION_PROP);
-      if (log.isInfoEnabled()) {
-        log.info(
-            "Going to create replica for collection={} shard={} on node={}",
-            sourceCollection,
-            sourceReplica.getStr(SHARD_ID_PROP),
-            target);
-      }
-      String targetNode;
-      // Use the assigned replica positions, if target is null or empty 
(checked above)
-      if (replicaPositions != null) {
-        targetNode = replicaPositions.get(replicaPositionIdx).node;
-        replicaPositionIdx++;
-      } else {
-        targetNode = target;
-      }
-      ZkNodeProps msg =
-          sourceReplica
-              .plus("parallel", String.valueOf(parallel))
-              .plus(CoreAdminParams.NODE, targetNode);
-      if (async != null) msg.getProperties().put(ASYNC, async);
-      NamedList<Object> nl = new NamedList<>();
-      final ZkNodeProps addedReplica =
-          new AddReplicaCmd(ccc)
-              .addReplica(
-                  clusterState,
-                  msg,
-                  nl,
-                  () -> {
-                    countDownLatch.countDown();
-                    if (nl.get("failure") != null) {
-                      String errorString =
-                          String.format(
-                              Locale.ROOT,
-                              "Failed to create replica for collection=%s 
shard=%s" + " on node=%s",
-                              sourceCollection,
-                              sourceReplica.getStr(SHARD_ID_PROP),
-                              target);
-                      log.warn(errorString);
-                      // one replica creation failed. Make the best attempt to
-                      // delete all the replicas created so far in the target
-                      // and exit
-                      synchronized (results) {
-                        results.add("failure", errorString);
-                        anyOneFailed.set(true);
-                      }
-                    } else {
-                      if (log.isDebugEnabled()) {
-                        log.debug(
-                            "Successfully created replica for collection={} 
shard={} on node={}",
-                            sourceCollection,
-                            sourceReplica.getStr(SHARD_ID_PROP),
-                            target);
-                      }
-                    }
-                  })
-              .get(0);
-
-      if (addedReplica != null) {
-        createdReplicas.add(addedReplica);
-        if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || 
waitForFinalState) {
-          String shardName = sourceReplica.getStr(SHARD_ID_PROP);
-          String replicaName = 
sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
-          String collectionName = sourceCollection;
-          String key = collectionName + "_" + replicaName;
-          CollectionStateWatcher watcher;
-          if (waitForFinalState) {
-            watcher =
-                new ActiveReplicaWatcher(
-                    collectionName,
-                    null,
-                    
Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)),
-                    replicasToRecover);
-          } else {
-            watcher =
-                new LeaderRecoveryWatcher(
-                    collectionName,
-                    shardName,
-                    replicaName,
-                    addedReplica.getStr(ZkStateReader.CORE_NAME_PROP),
-                    replicasToRecover);
-          }
-          watchers.put(key, watcher);
-          log.debug("--- adding {}, {}", key, watcher);
-          zkStateReader.registerCollectionStateWatcher(collectionName, 
watcher);
-        } else {
-          log.debug("--- not waiting for {}", addedReplica);
-        }
+      List<ReplicaPosition> replicaPositions =
+          assignStrategy.assign(ccc.getSolrCloudManager(), assignRequests);
+      int position = 0;
+      for (Replica sourceReplica : sourceReplicas) {
+        replicaMovements.put(sourceReplica, 
replicaPositions.get(position++).node);
       }
-    }
-
-    log.debug("Waiting for replicas to be added");
-    if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
-      log.info("Timed out waiting for replicas to be added");
-      anyOneFailed.set(true);
-    } else {
-      log.debug("Finished waiting for replicas to be added");
-    }
-
-    // now wait for leader replicas to recover
-    log.debug("Waiting for {} leader replicas to recover", numLeaders);
-    if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
-      if (log.isInfoEnabled()) {
-        log.info(
-            "Timed out waiting for {} leader replicas to recover", 
replicasToRecover.getCount());
-      }
-      anyOneFailed.set(true);
     } else {
-      log.debug("Finished waiting for leader replicas to recover");
-    }
-    // remove the watchers, we're done either way
-    for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) {
-      zkStateReader.removeCollectionStateWatcher(e.getKey(), e.getValue());
-    }
-    if (anyOneFailed.get()) {
-      log.info("Failed to create some replicas. Cleaning up all replicas on 
target node");
-      SolrCloseableLatch cleanupLatch =
-          new SolrCloseableLatch(createdReplicas.size(), 
ccc.getCloseableToLatchOn());
-      for (ZkNodeProps createdReplica : createdReplicas) {
-        NamedList<Object> deleteResult = new NamedList<>();
-        try {
-          new DeleteReplicaCmd(ccc)
-              .deleteReplica(
-                  zkStateReader.getClusterState(),
-                  createdReplica.plus("parallel", "true"),
-                  deleteResult,
-                  () -> {
-                    cleanupLatch.countDown();
-                    if (deleteResult.get("failure") != null) {
-                      synchronized (results) {
-                        results.add(
-                            "failure",
-                            "Could not cleanup, because of : " + 
deleteResult.get("failure"));
-                      }
-                    }
-                  });
-        } catch (KeeperException e) {
-          cleanupLatch.countDown();
-          log.warn("Error deleting replica ", e);
-        } catch (Exception e) {
-          log.warn("Error deleting replica ", e);
-          cleanupLatch.countDown();
-          throw e;
-        }
+      for (Replica sourceReplica : sourceReplicas) {
+        replicaMovements.put(sourceReplica, target);
       }
-      cleanupLatch.await(5, TimeUnit.MINUTES);
-      return;
     }
 
-    // we have reached this far means all replicas could be recreated
-    // now cleanup the replicas in the source node
-    DeleteNodeCmd.cleanupReplicas(results, state, sourceReplicas, ccc, source, 
async);
-    results.add(
-        "success",
-        "REPLACENODE action completed successfully from  : " + source + " to : 
" + target);
+    boolean migrationSuccessful =
+        ReplicaMigrationUtils.migrateReplicas(
+            ccc, replicaMovements, parallel, waitForFinalState, timeout, 
async, results);
+    if (migrationSuccessful) {
+      results.add(
+          "success",
+          "REPLACENODE action completed successfully from  : " + source + " to 
: " + target);

Review Comment:
   So I think this is confusing. The asyncId is passed to the child commands 
(addReplica and cleanupReplicas), but the commands aren't async themselves. 
They would have to be added to the overseer queue for that to be the case 
(right?).
   
   I think the asyncId is just used for tracking. So when it hits this point, 
the migration is actually successful.
   
   As for the waitForFinalState=false, I think "completed" is fine, since the 
user didn't ask for the final state to be healthy at the end of the migration. 
And it's much closer to "completed" than "submitted" IMO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org
For additional commands, e-mail: issues-h...@solr.apache.org

Reply via email to