tflobbe commented on code in PR #1650: URL: https://github.com/apache/solr/pull/1650#discussion_r1223482384
########## solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java: ########## @@ -135,190 +100,35 @@ public void call(ClusterState state, ZkNodeProps message, NamedList<Object> resu assignRequests.add(assignRequest); } Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer()); - replicaPositions = assignStrategy.assign(ccc.getSolrCloudManager(), assignRequests); - } - int replicaPositionIdx = 0; - for (ZkNodeProps sourceReplica : sourceReplicas) { - String sourceCollection = sourceReplica.getStr(COLLECTION_PROP); - if (log.isInfoEnabled()) { - log.info( - "Going to create replica for collection={} shard={} on node={}", - sourceCollection, - sourceReplica.getStr(SHARD_ID_PROP), - target); - } - String targetNode; - // Use the assigned replica positions, if target is null or empty (checked above) - if (replicaPositions != null) { - targetNode = replicaPositions.get(replicaPositionIdx).node; - replicaPositionIdx++; - } else { - targetNode = target; - } - ZkNodeProps msg = - sourceReplica - .plus("parallel", String.valueOf(parallel)) - .plus(CoreAdminParams.NODE, targetNode); - if (async != null) msg.getProperties().put(ASYNC, async); - NamedList<Object> nl = new NamedList<>(); - final ZkNodeProps addedReplica = - new AddReplicaCmd(ccc) - .addReplica( - clusterState, - msg, - nl, - () -> { - countDownLatch.countDown(); - if (nl.get("failure") != null) { - String errorString = - String.format( - Locale.ROOT, - "Failed to create replica for collection=%s shard=%s" + " on node=%s", - sourceCollection, - sourceReplica.getStr(SHARD_ID_PROP), - target); - log.warn(errorString); - // one replica creation failed. Make the best attempt to - // delete all the replicas created so far in the target - // and exit - synchronized (results) { - results.add("failure", errorString); - anyOneFailed.set(true); - } - } else { - if (log.isDebugEnabled()) { - log.debug( - "Successfully created replica for collection={} shard={} on node={}", - sourceCollection, - sourceReplica.getStr(SHARD_ID_PROP), - target); - } - } - }) - .get(0); - - if (addedReplica != null) { - createdReplicas.add(addedReplica); - if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) { - String shardName = sourceReplica.getStr(SHARD_ID_PROP); - String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP); - String collectionName = sourceCollection; - String key = collectionName + "_" + replicaName; - CollectionStateWatcher watcher; - if (waitForFinalState) { - watcher = - new ActiveReplicaWatcher( - collectionName, - null, - Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)), - replicasToRecover); - } else { - watcher = - new LeaderRecoveryWatcher( - collectionName, - shardName, - replicaName, - addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), - replicasToRecover); - } - watchers.put(key, watcher); - log.debug("--- adding {}, {}", key, watcher); - zkStateReader.registerCollectionStateWatcher(collectionName, watcher); - } else { - log.debug("--- not waiting for {}", addedReplica); - } + List<ReplicaPosition> replicaPositions = + assignStrategy.assign(ccc.getSolrCloudManager(), assignRequests); + int position = 0; + for (Replica sourceReplica : sourceReplicas) { + replicaMovements.put(sourceReplica, replicaPositions.get(position++).node); } - } - - log.debug("Waiting for replicas to be added"); - if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) { - log.info("Timed out waiting for replicas to be added"); - anyOneFailed.set(true); - } else { - log.debug("Finished waiting for replicas to be added"); - } - - // now wait for leader replicas to recover - log.debug("Waiting for {} leader replicas to recover", numLeaders); - if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) { - if (log.isInfoEnabled()) { - log.info( - "Timed out waiting for {} leader replicas to recover", replicasToRecover.getCount()); - } - anyOneFailed.set(true); } else { - log.debug("Finished waiting for leader replicas to recover"); - } - // remove the watchers, we're done either way - for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) { - zkStateReader.removeCollectionStateWatcher(e.getKey(), e.getValue()); - } - if (anyOneFailed.get()) { - log.info("Failed to create some replicas. Cleaning up all replicas on target node"); - SolrCloseableLatch cleanupLatch = - new SolrCloseableLatch(createdReplicas.size(), ccc.getCloseableToLatchOn()); - for (ZkNodeProps createdReplica : createdReplicas) { - NamedList<Object> deleteResult = new NamedList<>(); - try { - new DeleteReplicaCmd(ccc) - .deleteReplica( - zkStateReader.getClusterState(), - createdReplica.plus("parallel", "true"), - deleteResult, - () -> { - cleanupLatch.countDown(); - if (deleteResult.get("failure") != null) { - synchronized (results) { - results.add( - "failure", - "Could not cleanup, because of : " + deleteResult.get("failure")); - } - } - }); - } catch (KeeperException e) { - cleanupLatch.countDown(); - log.warn("Error deleting replica ", e); - } catch (Exception e) { - log.warn("Error deleting replica ", e); - cleanupLatch.countDown(); - throw e; - } + for (Replica sourceReplica : sourceReplicas) { + replicaMovements.put(sourceReplica, target); } - cleanupLatch.await(5, TimeUnit.MINUTES); - return; } - // we have reached this far means all replicas could be recreated - // now cleanup the replicas in the source node - DeleteNodeCmd.cleanupReplicas(results, state, sourceReplicas, ccc, source, async); - results.add( - "success", - "REPLACENODE action completed successfully from : " + source + " to : " + target); + boolean migrationSuccessful = + ReplicaMigrationUtils.migrateReplicas( + ccc, replicaMovements, parallel, waitForFinalState, timeout, async, results); + if (migrationSuccessful) { + results.add( + "success", + "REPLACENODE action completed successfully from : " + source + " to : " + target); + } } - static List<ZkNodeProps> getReplicasOfNode(String source, ClusterState state) { - List<ZkNodeProps> sourceReplicas = new ArrayList<>(); + static List<Replica> getReplicasOfNode(String source, ClusterState state) { Review Comment: maybe rename `source` to `node` or `nodeName`? ########## solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.api.collections; + +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.solr.cloud.ActiveReplicaWatcher; +import org.apache.solr.common.SolrCloseableLatch; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.CollectionStateWatcher; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CoreAdminParams; +import org.apache.solr.common.util.NamedList; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicaMigrationUtils { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * Code to migrate replicas to already chosen nodes. This will create new replicas, and delete the + * old replicas after the creation is done. + * + * <p>If an error occurs during the creation of new replicas, all new replicas will be deleted. + * + * @param ccc The collection command context to use from the API that calls this method + * @param movements a map from replica to the new node that the replica should live on + * @param parallel whether the replica creations should be done in parallel + * @param waitForFinalState wait for the final state of all newly created replicas before + * continuing + * @param timeout the amount of time to wait for new replicas to be created + * @param asyncId If provided, the command will be run under the given asyncId + * @param results push results (successful and failure) onto this list + * @return whether the command was successful + */ + static boolean migrateReplicas( + CollectionCommandContext ccc, + Map<Replica, String> movements, + boolean parallel, + boolean waitForFinalState, + int timeout, + String asyncId, + NamedList<Object> results) + throws IOException, InterruptedException, KeeperException { + // how many leaders are we moving? for these replicas we have to make sure that either: + // * another existing replica can become a leader, or + // * we wait until the newly created replica completes recovery (and can become the new leader) + // If waitForFinalState=true we wait for all replicas + int numLeaders = 0; + for (Replica replica : movements.keySet()) { + if (replica.isLeader() || waitForFinalState) { + numLeaders++; + } + } + // map of collectionName_coreNodeName to watchers + Map<String, CollectionStateWatcher> watchers = new HashMap<>(); + List<ZkNodeProps> createdReplicas = new ArrayList<>(); + + AtomicBoolean anyOneFailed = new AtomicBoolean(false); + SolrCloseableLatch countDownLatch = + new SolrCloseableLatch(movements.size(), ccc.getCloseableToLatchOn()); + + SolrCloseableLatch replicasToRecover = + new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn()); + + ClusterState clusterState = ccc.getZkStateReader().getClusterState(); + + for (Map.Entry<Replica, String> movement : movements.entrySet()) { + Replica sourceReplica = movement.getKey(); + String targetNode = movement.getValue(); + String sourceCollection = sourceReplica.getCollection(); + if (log.isInfoEnabled()) { + log.info( + "Going to create replica for collection={} shard={} on node={}", + sourceCollection, + sourceReplica.getShard(), + targetNode); + } + + ZkNodeProps msg = + sourceReplica + .toFullProps() + .plus("parallel", String.valueOf(parallel)) + .plus(CoreAdminParams.NODE, targetNode); + if (asyncId != null) msg.getProperties().put(ASYNC, asyncId); + NamedList<Object> nl = new NamedList<>(); + final ZkNodeProps addedReplica = + new AddReplicaCmd(ccc) + .addReplica( + clusterState, + msg, + nl, + () -> { + countDownLatch.countDown(); + if (nl.get("failure") != null) { + String errorString = + String.format( + Locale.ROOT, + "Failed to create replica for collection=%s shard=%s" + " on node=%s", + sourceCollection, + sourceReplica.getShard(), + targetNode); + log.warn(errorString); + // one replica creation failed. Make the best attempt to + // delete all the replicas created so far in the target + // and exit + synchronized (results) { + results.add("failure", errorString); + anyOneFailed.set(true); + } + } else { + if (log.isDebugEnabled()) { + log.debug( + "Successfully created replica for collection={} shard={} on node={}", + sourceCollection, + sourceReplica.getShard(), + targetNode); + } + } + }) + .get(0); + + if (addedReplica != null) { + createdReplicas.add(addedReplica); + if (sourceReplica.isLeader() || waitForFinalState) { + String shardName = sourceReplica.getShard(); + String replicaName = sourceReplica.getName(); + String key = sourceCollection + "_" + replicaName; + CollectionStateWatcher watcher; + if (waitForFinalState) { + watcher = + new ActiveReplicaWatcher( + sourceCollection, + null, + Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)), + replicasToRecover); + } else { + watcher = + new LeaderRecoveryWatcher( + sourceCollection, + shardName, + replicaName, + addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), + replicasToRecover); + } + watchers.put(key, watcher); + log.debug("--- adding {}, {}", key, watcher); + ccc.getZkStateReader().registerCollectionStateWatcher(sourceCollection, watcher); + } else { + log.debug("--- not waiting for {}", addedReplica); + } + } + } + + log.debug("Waiting for replicas to be added"); + if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) { + log.info("Timed out waiting for replicas to be added"); + anyOneFailed.set(true); + } else { + log.debug("Finished waiting for replicas to be added"); + } + + // now wait for leader replicas to recover + log.debug("Waiting for {} leader replicas to recover", numLeaders); + if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) { + if (log.isInfoEnabled()) { + log.info( + "Timed out waiting for {} leader replicas to recover", replicasToRecover.getCount()); + } + anyOneFailed.set(true); + } else { + log.debug("Finished waiting for leader replicas to recover"); + } + // remove the watchers, we're done either way + for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) { + ccc.getZkStateReader().removeCollectionStateWatcher(e.getKey(), e.getValue()); + } + if (anyOneFailed.get()) { + log.info("Failed to create some replicas. Cleaning up all replicas on target node"); + SolrCloseableLatch cleanupLatch = + new SolrCloseableLatch(createdReplicas.size(), ccc.getCloseableToLatchOn()); + for (ZkNodeProps createdReplica : createdReplicas) { + NamedList<Object> deleteResult = new NamedList<>(); + try { + new DeleteReplicaCmd(ccc) + .deleteReplica( + ccc.getZkStateReader().getClusterState(), + createdReplica.plus("parallel", "true"), + deleteResult, + () -> { + cleanupLatch.countDown(); + if (deleteResult.get("failure") != null) { + synchronized (results) { + results.add( + "failure", + "Could not cleanup, because of : " + deleteResult.get("failure")); + } + } + }); + } catch (KeeperException e) { + cleanupLatch.countDown(); + log.warn("Error deleting replica ", e); + } catch (Exception e) { + log.warn("Error deleting replica ", e); + cleanupLatch.countDown(); + throw e; + } + } + cleanupLatch.await(5, TimeUnit.MINUTES); + return false; + } + + // we have reached this far, meaning all replicas should have been recreated. + // now cleanup the original replicas + return cleanupReplicas( + results, ccc.getZkStateReader().getClusterState(), movements.keySet(), ccc, asyncId); + } + + static boolean cleanupReplicas( + NamedList<Object> results, + ClusterState clusterState, + Collection<Replica> sourceReplicas, + CollectionCommandContext ccc, + String async) + throws IOException, InterruptedException { + CountDownLatch cleanupLatch = new CountDownLatch(sourceReplicas.size()); + for (Replica sourceReplica : sourceReplicas) { + String coll = sourceReplica.getCollection(); + String shard = sourceReplica.getShard(); + String type = sourceReplica.getType().toString(); + String node = sourceReplica.getNodeName(); + log.info( + "Deleting replica type={} for collection={} shard={} on node={}", + type, + coll, + shard, + node); + NamedList<Object> deleteResult = new NamedList<>(); + try { + ZkNodeProps cmdMessage = sourceReplica.toFullProps(); + if (async != null) cmdMessage = cmdMessage.plus(ASYNC, async); + new DeleteReplicaCmd(ccc) + .deleteReplica( + clusterState, + cmdMessage.plus("parallel", "true"), + deleteResult, + () -> { + cleanupLatch.countDown(); + if (deleteResult.get("failure") != null) { + synchronized (results) { + results.add( + "failure", + String.format( + Locale.ROOT, + "Failed to delete replica for collection=%s shard=%s" + " on node=%s", Review Comment: No need to add strings ########## solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java: ########## @@ -251,1006 +231,463 @@ private AffinityPlacementPlugin( // We make things reproducible in tests by using test seed if any String seed = System.getProperty("tests.seed"); - if (seed != null) { - replicaPlacementRandom.setSeed(seed.hashCode()); - } - } - - @Override - @SuppressForbidden( - reason = - "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.") - public List<PlacementPlan> computePlacements( - Collection<PlacementRequest> requests, PlacementContext placementContext) - throws PlacementException { - List<PlacementPlan> placementPlans = new ArrayList<>(requests.size()); - Set<Node> allNodes = new HashSet<>(); - Set<SolrCollection> allCollections = new HashSet<>(); - for (PlacementRequest request : requests) { - allNodes.addAll(request.getTargetNodes()); - allCollections.add(request.getCollection()); - } - - // Fetch attributes for a superset of all nodes requested amongst the placementRequests - AttributeFetcher attributeFetcher = placementContext.getAttributeFetcher(); - attributeFetcher - .requestNodeSystemProperty(AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP) - .requestNodeSystemProperty(AffinityPlacementConfig.NODE_TYPE_SYSPROP) - .requestNodeSystemProperty(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP) - .requestNodeSystemProperty(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP); - attributeFetcher - .requestNodeMetric(BuiltInMetrics.NODE_NUM_CORES) - .requestNodeMetric(BuiltInMetrics.NODE_FREE_DISK_GB); - Set<ReplicaMetric<?>> replicaMetrics = Set.of(BuiltInMetrics.REPLICA_INDEX_SIZE_GB); - for (SolrCollection collection : allCollections) { - attributeFetcher.requestCollectionMetrics(collection, replicaMetrics); - } - attributeFetcher.fetchFrom(allNodes); - final AttributeValues attrValues = attributeFetcher.fetchAttributes(); - // Get the number of currently existing cores per node, so we can update as we place new cores - // to not end up always selecting the same node(s). This is used across placement requests - Map<Node, Integer> allCoresOnNodes = getCoreCountPerNode(allNodes, attrValues); - - boolean doSpreadAcrossDomains = shouldSpreadAcrossDomains(allNodes, attrValues); - - // Keep track with nodesWithReplicas across requests - Map<String, Map<String, Set<Node>>> allNodesWithReplicas = new HashMap<>(); - for (PlacementRequest request : requests) { - Set<Node> nodes = request.getTargetNodes(); - SolrCollection solrCollection = request.getCollection(); - - // filter out nodes that don't meet the `withCollection` constraint - nodes = - filterNodesWithCollection(placementContext.getCluster(), request, attrValues, nodes); - // filter out nodes that don't match the "node types" specified in the collection props - nodes = filterNodesByNodeType(placementContext.getCluster(), request, attrValues, nodes); - - // All available zones of live nodes. Due to some nodes not being candidates for placement, - // and some existing replicas being one availability zones that might be offline (i.e. their - // nodes are not live), this set might contain zones on which it is impossible to place - // replicas. That's ok. - Set<String> availabilityZones = getZonesFromNodes(nodes, attrValues); - - // Build the replica placement decisions here - Set<ReplicaPlacement> replicaPlacements = new HashSet<>(); - - // Let's now iterate on all shards to create replicas for and start finding home sweet homes - // for the replicas - for (String shardName : request.getShardNames()) { - ReplicaMetrics leaderMetrics = - attrValues - .getCollectionMetrics(solrCollection.getName()) - .flatMap(colMetrics -> colMetrics.getShardMetrics(shardName)) - .flatMap(ShardMetrics::getLeaderMetrics) - .orElse(null); - - // Split the set of nodes into 3 sets of nodes accepting each replica type (sets can - // overlap - // if nodes accept multiple replica types). These subsets sets are actually maps, because - // we - // capture the number of cores (of any replica type) present on each node. - // - // This also filters out nodes that will not satisfy the rules if the replica is placed - // there - EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes = - getAvailableNodesForReplicaTypes(nodes, attrValues, leaderMetrics); - - // Inventory nodes (if any) that already have a replica of any type for the shard, because - // we can't be placing additional replicas on these. This data structure is updated after - // each replica to node assign and is used to make sure different replica types are not - // allocated to the same nodes (protecting same node assignments within a given replica - // type is done "by construction" in makePlacementDecisions()). - Set<Node> nodesWithReplicas = - allNodesWithReplicas - .computeIfAbsent(solrCollection.getName(), col -> new HashMap<>()) - .computeIfAbsent( - shardName, - s -> { - Set<Node> newNodeSet = new HashSet<>(); - Shard shard = solrCollection.getShard(s); - if (shard != null) { - // Prefill the set with the existing replicas - for (Replica r : shard.replicas()) { - newNodeSet.add(r.getNode()); - } - } - return newNodeSet; - }); - - // Iterate on the replica types in the enum order. We place more strategic replicas first - // (NRT is more strategic than TLOG more strategic than PULL). This is in case we - // eventually decide that less strategic replica placement impossibility is not a problem - // that should lead to replica placement computation failure. Current code does fail if - // placement is impossible (constraint is at most one replica of a shard on any node). - for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) { - int numReplicasToCreate = request.getCountReplicasToCreate(replicaType); - if (numReplicasToCreate > 0) { - makePlacementDecisions( - solrCollection, - shardName, - availabilityZones, - replicaType, - numReplicasToCreate, - attrValues, - leaderMetrics, - replicaTypeToNodes, - nodesWithReplicas, - allCoresOnNodes, - placementContext.getPlacementPlanFactory(), - replicaPlacements, - doSpreadAcrossDomains); - } - } - } - placementPlans.add( - placementContext - .getPlacementPlanFactory() - .createPlacementPlan(request, replicaPlacements)); - } - - return placementPlans; - } - - private boolean shouldSpreadAcrossDomains(Set<Node> allNodes, AttributeValues attrValues) { - boolean doSpreadAcrossDomains = - spreadAcrossDomains && spreadDomainPropPresent(allNodes, attrValues); - if (spreadAcrossDomains && !doSpreadAcrossDomains) { - log.warn( - "AffinityPlacementPlugin configured to spread across domains, but there are nodes in the cluster without the {} system property. Ignoring spreadAcrossDomains.", - AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP); - } - return doSpreadAcrossDomains; - } - - private boolean spreadDomainPropPresent(Set<Node> allNodes, AttributeValues attrValues) { - // We can only use spread domains if all nodes have the system property - return allNodes.stream() - .noneMatch( - n -> - attrValues - .getSystemProperty(n, AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP) - .isEmpty()); } @Override - public void verifyAllowedModification( - ModificationRequest modificationRequest, PlacementContext placementContext) - throws PlacementModificationException { - if (modificationRequest instanceof DeleteShardsRequest) { - log.warn("DeleteShardsRequest not implemented yet, skipping: {}", modificationRequest); - } else if (modificationRequest instanceof DeleteCollectionRequest) { - verifyDeleteCollection((DeleteCollectionRequest) modificationRequest, placementContext); - } else if (modificationRequest instanceof DeleteReplicasRequest) { - verifyDeleteReplicas((DeleteReplicasRequest) modificationRequest, placementContext); - } else { - log.warn("unsupported request type, skipping: {}", modificationRequest); - } - } - - private void verifyDeleteCollection( + protected void verifyDeleteCollection( DeleteCollectionRequest deleteCollectionRequest, PlacementContext placementContext) throws PlacementModificationException { Cluster cluster = placementContext.getCluster(); - Set<String> colocatedCollections = - colocatedWith.getOrDefault(deleteCollectionRequest.getCollection().getName(), Set.of()); - for (String primaryName : colocatedCollections) { + Set<String> collocatedCollections = + collocatedWith.getOrDefault(deleteCollectionRequest.getCollection().getName(), Set.of()); + for (String primaryName : collocatedCollections) { try { if (cluster.getCollection(primaryName) != null) { // still exists throw new PlacementModificationException( - "colocated collection " + "collocated collection " + primaryName + " of " + deleteCollectionRequest.getCollection().getName() + " still present"); } } catch (IOException e) { throw new PlacementModificationException( - "failed to retrieve colocated collection information", e); + "failed to retrieve collocated collection information", e); } } } - private void verifyDeleteReplicas( - DeleteReplicasRequest deleteReplicasRequest, PlacementContext placementContext) - throws PlacementModificationException { - Cluster cluster = placementContext.getCluster(); - SolrCollection secondaryCollection = deleteReplicasRequest.getCollection(); - Set<String> colocatedCollections = colocatedWith.get(secondaryCollection.getName()); - if (colocatedCollections == null) { - return; - } - Map<Node, Map<String, AtomicInteger>> secondaryNodeShardReplicas = new HashMap<>(); - secondaryCollection - .shards() - .forEach( - shard -> - shard - .replicas() - .forEach( - replica -> - secondaryNodeShardReplicas - .computeIfAbsent(replica.getNode(), n -> new HashMap<>()) - .computeIfAbsent( - replica.getShard().getShardName(), s -> new AtomicInteger()) - .incrementAndGet())); + private static final class AffinityPlacementContext { + private final Set<String> allSpreadDomains = new HashSet<>(); + private final Map<String, Map<String, ReplicaSpread>> spreadDomainUsage = new HashMap<>(); + private final Set<String> allAvailabilityZones = new HashSet<>(); + private final Map<String, Map<String, Map<Replica.ReplicaType, ReplicaSpread>>> + availabilityZoneUsage = new HashMap<>(); + private boolean doSpreadAcrossDomains; + } - // find the colocated-with collections - Map<Node, Set<String>> colocatingNodes = new HashMap<>(); - try { - for (String colocatedCollection : colocatedCollections) { - SolrCollection coll = cluster.getCollection(colocatedCollection); - coll.shards() - .forEach( - shard -> - shard - .replicas() - .forEach( - replica -> - colocatingNodes - .computeIfAbsent(replica.getNode(), n -> new HashSet<>()) - .add(coll.getName()))); - } - } catch (IOException ioe) { - throw new PlacementModificationException( - "failed to retrieve colocated collection information", ioe); - } - PlacementModificationException exception = null; - for (Replica replica : deleteReplicasRequest.getReplicas()) { - if (!colocatingNodes.containsKey(replica.getNode())) { - continue; - } - // check that there will be at least one replica remaining - AtomicInteger secondaryCount = - secondaryNodeShardReplicas - .getOrDefault(replica.getNode(), Map.of()) - .getOrDefault(replica.getShard().getShardName(), new AtomicInteger()); - if (secondaryCount.get() > 1) { - // we can delete it - record the deletion - secondaryCount.decrementAndGet(); - continue; - } - // fail - this replica cannot be removed - if (exception == null) { - exception = new PlacementModificationException("delete replica(s) rejected"); + @Override + protected Map<Node, WeightedNode> getBaseWeightedNodes( + PlacementContext placementContext, + Set<Node> nodes, + Iterable<SolrCollection> relevantCollections, + boolean skipNodesWithErrors) + throws PlacementException { + // Fetch attributes for a superset of all nodes requested amongst the placementRequests + AttributeFetcher attributeFetcher = placementContext.getAttributeFetcher(); + attributeFetcher + .requestNodeSystemProperty(AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP) + .requestNodeSystemProperty(AffinityPlacementConfig.NODE_TYPE_SYSPROP) + .requestNodeSystemProperty(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP) + .requestNodeSystemProperty(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP); + attributeFetcher + .requestNodeMetric(BuiltInMetrics.NODE_NUM_CORES) + .requestNodeMetric(BuiltInMetrics.NODE_FREE_DISK_GB); + Set<ReplicaMetric<?>> replicaMetrics = Set.of(BuiltInMetrics.REPLICA_INDEX_SIZE_GB); + Set<String> requestedCollections = new HashSet<>(); + for (SolrCollection collection : relevantCollections) { + if (requestedCollections.add(collection.getName())) { + attributeFetcher.requestCollectionMetrics(collection, replicaMetrics); } - exception.addRejectedModification( - replica.toString(), - "co-located with replicas of " + colocatingNodes.get(replica.getNode())); - } - if (exception != null) { - throw exception; } - } + attributeFetcher.fetchFrom(nodes); + final AttributeValues attrValues = attributeFetcher.fetchAttributes(); - private Set<String> getZonesFromNodes(Set<Node> nodes, final AttributeValues attrValues) { - Set<String> azs = new HashSet<>(); + AffinityPlacementContext affinityPlacementContext = new AffinityPlacementContext(); + affinityPlacementContext.doSpreadAcrossDomains = spreadAcrossDomains; - for (Node n : nodes) { - azs.add(getNodeAZ(n, attrValues)); + Map<Node, WeightedNode> affinityNodeMap = CollectionUtil.newHashMap(nodes.size()); + for (Node node : nodes) { + AffinityNode affinityNode = + newNodeFromMetrics(node, attrValues, affinityPlacementContext, skipNodesWithErrors); + if (affinityNode != null) { + affinityNodeMap.put(node, affinityNode); + } } - return Collections.unmodifiableSet(azs); + return affinityNodeMap; } - /** - * Resolves the AZ of a node and takes care of nodes that have no defined AZ in system property - * {@link AffinityPlacementConfig#AVAILABILITY_ZONE_SYSPROP} to then return {@link - * AffinityPlacementConfig#UNDEFINED_AVAILABILITY_ZONE} as the AZ name. - */ - private String getNodeAZ(Node n, final AttributeValues attrValues) { - Optional<String> nodeAz = - attrValues.getSystemProperty(n, AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP); - // All nodes with undefined AZ will be considered part of the same AZ. This also works for - // deployments that do not care about AZ's - return nodeAz.orElse(AffinityPlacementConfig.UNDEFINED_AVAILABILITY_ZONE); - } - - /** - * This class captures an availability zone and the nodes that are legitimate targets for - * replica placement in that Availability Zone. Instances are used as values in a {@link - * java.util.TreeMap} in which the total number of already existing replicas in the AZ is the - * key. This allows easily picking the set of nodes from which to select a node for placement in - * order to balance the number of replicas per AZ. Picking one of the nodes from the set is done - * using different criteria unrelated to the Availability Zone (picking the node is based on the - * {@link CoresAndDiskComparator} ordering). - */ - private static class AzWithNodes { - final String azName; - private final boolean useSpreadDomains; - private boolean listIsSorted = false; - private final Comparator<Node> nodeComparator; - private final Random random; - private final List<Node> availableNodesForPlacement; - private final AttributeValues attributeValues; - private final ReplicaMetrics leaderMetrics; - private TreeSet<SpreadDomainWithNodes> sortedSpreadDomains; - private final Map<String, Integer> currentSpreadDomainUsageUsage; - private int numNodesForPlacement; - - AzWithNodes( - String azName, - List<Node> availableNodesForPlacement, - boolean useSpreadDomains, - Comparator<Node> nodeComparator, - Random random, - AttributeValues attributeValues, - ReplicaMetrics leaderMetrics, - Map<String, Integer> currentSpreadDomainUsageUsage) { - this.azName = azName; - this.availableNodesForPlacement = availableNodesForPlacement; - this.useSpreadDomains = useSpreadDomains; - this.nodeComparator = nodeComparator; - this.random = random; - this.attributeValues = attributeValues; - this.leaderMetrics = leaderMetrics; - this.currentSpreadDomainUsageUsage = currentSpreadDomainUsageUsage; - this.numNodesForPlacement = availableNodesForPlacement.size(); - } - - private boolean hasBeenSorted() { - return (useSpreadDomains && sortedSpreadDomains != null) - || (!useSpreadDomains && listIsSorted); - } - - void ensureSorted() { - if (!hasBeenSorted()) { - sort(); - } + AffinityNode newNodeFromMetrics( + Node node, + AttributeValues attrValues, + AffinityPlacementContext affinityPlacementContext, + boolean skipNodesWithErrors) + throws PlacementException { + Set<Replica.ReplicaType> supportedReplicaTypes = + attrValues.getSystemProperty(node, AffinityPlacementConfig.REPLICA_TYPE_SYSPROP).stream() + .flatMap(s -> Arrays.stream(s.split(","))) + .map(String::trim) + .map(s -> s.toUpperCase(Locale.ROOT)) + .map( + s -> { + try { + return Replica.ReplicaType.valueOf(s); + } catch (IllegalArgumentException e) { + log.warn( + "Node {} has an invalid value for the {} systemProperty: {}", + node.getName(), + AffinityPlacementConfig.REPLICA_TYPE_SYSPROP, + s); + return null; + } + }) + .collect(Collectors.toSet()); + if (supportedReplicaTypes.isEmpty()) { + // If property not defined or is only whitespace on a node, assuming node can take any + // replica type + supportedReplicaTypes = Set.of(Replica.ReplicaType.values()); } - private void sort() { - assert !listIsSorted && sortedSpreadDomains == null - : "We shouldn't be sorting this list again"; - - // Make sure we do not tend to use always the same nodes (within an AZ) if all - // conditions are identical (well, this likely is not the case since after having added - // a replica to a node its number of cores increases for the next placement decision, - // but let's be defensive here, given that multiple concurrent placement decisions might - // see the same initial cluster state, and we want placement to be reasonable even in - // that case without creating an unnecessary imbalance). For example, if all nodes have - // 0 cores and same amount of free disk space, ideally we want to pick a random node for - // placement, not always the same one due to some internal ordering. - Collections.shuffle(availableNodesForPlacement, random); - - if (useSpreadDomains) { - // When we use spread domains, we don't just sort the list of nodes, instead we generate a - // TreeSet of SpreadDomainWithNodes, - // sorted by the number of times the domain has been used. Each - // SpreadDomainWithNodes internally contains the list of nodes that belong to that - // particular domain, - // and it's sorted internally by the comparator passed to this - // class (which is the same that's used when not using spread domains). - // Whenever a node from a particular SpreadDomainWithNodes is selected as the best - // candidate, the call to "removeBestNode" will: - // 1. Remove the SpreadDomainWithNodes instance from the TreeSet - // 2. Remove the best node from the list within the SpreadDomainWithNodes - // 3. Increment the count of times the domain has been used - // 4. Re-add the SpreadDomainWithNodes instance to the TreeSet if there are still nodes - // available - HashMap<String, List<Node>> spreadDomainToListOfNodesMap = new HashMap<>(); - for (Node node : availableNodesForPlacement) { - spreadDomainToListOfNodesMap - .computeIfAbsent( - attributeValues - .getSystemProperty(node, AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP) - .get(), - k -> new ArrayList<>()) - .add(node); - } - sortedSpreadDomains = - new TreeSet<>(new SpreadDomainComparator(currentSpreadDomainUsageUsage)); - - int i = 0; - for (Map.Entry<String, List<Node>> entry : spreadDomainToListOfNodesMap.entrySet()) { - // Sort the nodes within the spread domain by the provided comparator - entry.getValue().sort(nodeComparator); - sortedSpreadDomains.add( - new SpreadDomainWithNodes(entry.getKey(), entry.getValue(), i++, nodeComparator)); + Set<String> nodeType; + Optional<String> nodePropOpt = + attrValues.getSystemProperty(node, AffinityPlacementConfig.NODE_TYPE_SYSPROP); + if (nodePropOpt.isEmpty()) { + nodeType = Collections.emptySet(); + } else { + nodeType = new HashSet<>(StrUtils.splitSmart(nodePropOpt.get(), ',')); + } + + Optional<Double> nodeFreeDiskGB = + attrValues.getNodeMetric(node, BuiltInMetrics.NODE_FREE_DISK_GB); + Optional<Integer> nodeNumCores = + attrValues.getNodeMetric(node, BuiltInMetrics.NODE_NUM_CORES); + String az = + attrValues + .getSystemProperty(node, AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP) + .orElse(AffinityPlacementConfig.UNDEFINED_AVAILABILITY_ZONE); + affinityPlacementContext.allAvailabilityZones.add(az); + String spreadDomain; + if (affinityPlacementContext.doSpreadAcrossDomains) { + spreadDomain = + attrValues + .getSystemProperty(node, AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP) + .orElse(null); + if (spreadDomain == null) { + if (log.isWarnEnabled()) { + log.warn( + "AffinityPlacementPlugin configured to spread across domains, but node {} does not have the {} system property. Ignoring spreadAcrossDomains.", + node.getName(), + AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP); } + affinityPlacementContext.doSpreadAcrossDomains = false; + affinityPlacementContext.allSpreadDomains.clear(); } else { - availableNodesForPlacement.sort(nodeComparator); - listIsSorted = true; + affinityPlacementContext.allSpreadDomains.add(spreadDomain); } + } else { + spreadDomain = null; } - - Node getBestNode() { - assert hasBeenSorted(); - if (useSpreadDomains) { - return sortedSpreadDomains.first().sortedNodesForPlacement.get(0); - } else { - return availableNodesForPlacement.get(0); + if (nodeFreeDiskGB.isEmpty() && skipNodesWithErrors) { + if (log.isWarnEnabled()) { + log.warn( + "Unknown free disk on node {}, excluding it from placement decisions.", + node.getName()); } - } - - public Node removeBestNode() { - assert hasBeenSorted(); - this.numNodesForPlacement--; - Node n; - if (useSpreadDomains) { - // Since this SpreadDomainWithNodes needs to be re-sorted in the sortedSpreadDomains, we - // remove it and then re-add it, once the best node has been removed. - SpreadDomainWithNodes group = sortedSpreadDomains.pollFirst(); - n = group.sortedNodesForPlacement.remove(0); - this.currentSpreadDomainUsageUsage.merge(group.spreadDomainName, 1, Integer::sum); - if (!group.sortedNodesForPlacement.isEmpty()) { - sortedSpreadDomains.add(group); - } - } else { - n = availableNodesForPlacement.remove(0); + return null; + } else if (nodeNumCores.isEmpty() && skipNodesWithErrors) { + if (log.isWarnEnabled()) { + log.warn( + "Unknown number of cores on node {}, excluding it from placement decisions.", + node.getName()); } - Optional.ofNullable(leaderMetrics) - .flatMap(lrm -> lrm.getReplicaMetric(BuiltInMetrics.REPLICA_INDEX_SIZE_GB)) - .ifPresent( - indexSize -> - attributeValues.decreaseNodeMetric( - n, BuiltInMetrics.NODE_FREE_DISK_GB, indexSize)); - attributeValues.increaseNodeMetric(n, BuiltInMetrics.NODE_NUM_CORES, 1); - return n; - } - - public int numNodes() { - return this.numNodesForPlacement; + return null; + } else { + return new AffinityNode( + node, + attrValues, + affinityPlacementContext, + supportedReplicaTypes, + nodeType, + nodeNumCores.orElse(0), + nodeFreeDiskGB.orElse(0D), + az, + spreadDomain); } } - /** - * This class represents group of nodes with the same {@link - * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label. - */ - static class SpreadDomainWithNodes implements Comparable<SpreadDomainWithNodes> { + private class AffinityNode extends WeightedNode { - /** - * This is the label that all nodes in this group have in {@link - * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label. - */ - final String spreadDomainName; + private final AttributeValues attrValues; - /** - * The list of all nodes that contain the same {@link - * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label. They must be sorted before creating - * this class. - */ - private final List<Node> sortedNodesForPlacement; + private final AffinityPlacementContext affinityPlacementContext; - /** - * This is used for tie breaking the sort of {@link SpreadDomainWithNodes}, when the - * nodeComparator between the top nodes of each group return 0. - */ - private final int tieBreaker; + private final Set<Replica.ReplicaType> supportedReplicaTypes; + private final Set<String> nodeType; - /** - * This is the comparator that is used to compare the top nodes in the {@link - * #sortedNodesForPlacement} lists. Must be the same that was used to sort {@link - * #sortedNodesForPlacement}. - */ - private final Comparator<Node> nodeComparator; + private int coresOnNode; + private double nodeFreeDiskGB; - public SpreadDomainWithNodes( - String spreadDomainName, - List<Node> sortedNodesForPlacement, - int tieBreaker, - Comparator<Node> nodeComparator) { - this.spreadDomainName = spreadDomainName; - this.sortedNodesForPlacement = sortedNodesForPlacement; - this.tieBreaker = tieBreaker; - this.nodeComparator = nodeComparator; + private final String availabilityZone; + private final String spreadDomain; + + AffinityNode( + Node node, + AttributeValues attrValues, + AffinityPlacementContext affinityPlacementContext, + Set<Replica.ReplicaType> supportedReplicaTypes, + Set<String> nodeType, + int coresOnNode, + double nodeFreeDiskGB, + String az, + String spreadDomain) { + super(node); + this.attrValues = attrValues; + this.affinityPlacementContext = affinityPlacementContext; + this.supportedReplicaTypes = supportedReplicaTypes; + this.nodeType = nodeType; + this.coresOnNode = coresOnNode; + this.nodeFreeDiskGB = nodeFreeDiskGB; + this.availabilityZone = az; + this.spreadDomain = spreadDomain; } @Override - public int compareTo(SpreadDomainWithNodes o) { - if (o == this) { - return 0; - } - int result = - nodeComparator.compare( - this.sortedNodesForPlacement.get(0), o.sortedNodesForPlacement.get(0)); - if (result == 0) { - return Integer.compare(this.tieBreaker, o.tieBreaker); - } - return result; + public int calcWeight() { + return coresOnNode + + 100 * (prioritizedFreeDiskGB > 0 && nodeFreeDiskGB < prioritizedFreeDiskGB ? 1 : 0) + + 10000 * getSpreadDomainWeight() + + 1000000 * getAZWeight(); } - } - - /** - * Builds the number of existing cores on each node returned in the attrValues. Nodes for which - * the number of cores is not available for whatever reason are excluded from acceptable - * candidate nodes as it would not be possible to make any meaningful placement decisions. - * - * @param nodes all nodes on which this plugin should compute placement - * @param attrValues attributes fetched for the nodes. This method uses system property {@link - * AffinityPlacementConfig#REPLICA_TYPE_SYSPROP} as well as the number of cores on each - * node. - */ - private Map<Node, Integer> getCoreCountPerNode( - Set<Node> nodes, final AttributeValues attrValues) { - Map<Node, Integer> coresOnNodes = new HashMap<>(); - for (Node node : nodes) { - attrValues - .getNodeMetric(node, BuiltInMetrics.NODE_NUM_CORES) - .ifPresent(count -> coresOnNodes.put(node, count)); + @Override + public int calcRelevantWeightWithReplica(Replica replica) { + return coresOnNode + + 100 + * (prioritizedFreeDiskGB > 0 + && nodeFreeDiskGB - getProjectedSizeOfReplica(replica) + < prioritizedFreeDiskGB + ? 1 + : 0) + + 10000 * projectReplicaSpreadWeight(replica) + + 1000000 * projectAZWeight(replica); } - return coresOnNodes; - } - - /** - * Given the set of all nodes on which to do placement and fetched attributes, builds the sets - * representing candidate nodes for placement of replicas of each replica type. These sets are - * packaged and returned in an EnumMap keyed by replica type. Nodes for which the number of - * cores is not available for whatever reason are excluded from acceptable candidate nodes as it - * would not be possible to make any meaningful placement decisions. - * - * @param nodes all nodes on which this plugin should compute placement - * @param attrValues attributes fetched for the nodes. This method uses system property {@link - * AffinityPlacementConfig#REPLICA_TYPE_SYSPROP} as well as the number of cores on each - * node. - */ - private EnumMap<Replica.ReplicaType, Set<Node>> getAvailableNodesForReplicaTypes( - Set<Node> nodes, final AttributeValues attrValues, final ReplicaMetrics leaderMetrics) { - EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes = - new EnumMap<>(Replica.ReplicaType.class); - - for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) { - replicaTypeToNodes.put(replicaType, new HashSet<>()); + @Override + public boolean canAddReplica(Replica replica) { + String collection = replica.getShard().getCollection().getName(); + return + // By default, do not allow two replicas of the same shard on a node + super.canAddReplica(replica) + && supportedReplicaTypes.contains(replica.getType()) + && Optional.ofNullable(nodeTypes.get(collection)) + .map(s -> s.stream().anyMatch(nodeType::contains)) + .orElse(true) + && Optional.ofNullable(withCollections.get(collection)) + .map(this::hasCollectionOnNode) + .orElse(true) + && (minimalFreeDiskGB <= 0 + || nodeFreeDiskGB - getProjectedSizeOfReplica(replica) > minimalFreeDiskGB); } - for (Node node : nodes) { - // Exclude nodes with unknown or too small disk free space - Optional<Double> nodeFreeDiskGB = - attrValues.getNodeMetric(node, BuiltInMetrics.NODE_FREE_DISK_GB); - if (nodeFreeDiskGB.isEmpty()) { - if (log.isWarnEnabled()) { - log.warn( - "Unknown free disk on node {}, excluding it from placement decisions.", - node.getName()); - } - // We rely later on the fact that the free disk optional is present (see - // CoresAndDiskComparator), be careful it you change anything here. - continue; - } - double replicaIndexSize = - Optional.ofNullable(leaderMetrics) - .flatMap(lm -> lm.getReplicaMetric(BuiltInMetrics.REPLICA_INDEX_SIZE_GB)) - .orElse(0D); - double projectedFreeDiskIfPlaced = - BuiltInMetrics.NODE_FREE_DISK_GB.decrease(nodeFreeDiskGB.get(), replicaIndexSize); - if (projectedFreeDiskIfPlaced < minimalFreeDiskGB) { - if (log.isWarnEnabled()) { - log.warn( - "Node {} free disk ({}GB) minus the projected replica size ({}GB) is lower than configured" - + " minimum {}GB, excluding it from placement decisions.", - node.getName(), - nodeFreeDiskGB.get(), - replicaIndexSize, - minimalFreeDiskGB); - } - continue; - } - - if (attrValues.getNodeMetric(node, BuiltInMetrics.NODE_NUM_CORES).isEmpty()) { - if (log.isWarnEnabled()) { - log.warn( - "Unknown number of cores on node {}, excluding it from placement decisions.", - node.getName()); + @Override + public Map<Replica, String> canRemoveReplicas(Collection<Replica> replicas) { + Map<Replica, String> replicaRemovalExceptions = new HashMap<>(); + Map<String, Map<String, Set<Replica>>> removals = new HashMap<>(); + for (Replica replica : replicas) { + SolrCollection collection = replica.getShard().getCollection(); + Set<String> collocatedCollections = new HashSet<>(); + Optional.ofNullable(collocatedWith.get(collection.getName())) + .ifPresent(collocatedCollections::addAll); + collocatedCollections.retainAll(getCollectionsOnNode()); + if (collocatedCollections.isEmpty()) { + continue; } - // We rely later on the fact that the number of cores optional is present (see - // CoresAndDiskComparator), be careful it you change anything here. - continue; - } - String supportedReplicaTypes = - attrValues - .getSystemProperty(node, AffinityPlacementConfig.REPLICA_TYPE_SYSPROP) - .isPresent() - ? attrValues - .getSystemProperty(node, AffinityPlacementConfig.REPLICA_TYPE_SYSPROP) - .get() - : null; - // If property not defined or is only whitespace on a node, assuming node can take any - // replica type - if (supportedReplicaTypes == null || supportedReplicaTypes.isBlank()) { - for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { - replicaTypeToNodes.get(rt).add(node); - } - } else { - Set<String> acceptedTypes = - Arrays.stream(supportedReplicaTypes.split(",")) - .map(String::trim) - .map(s -> s.toLowerCase(Locale.ROOT)) - .collect(Collectors.toSet()); - for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { - if (acceptedTypes.contains(rt.name().toLowerCase(Locale.ROOT))) { - replicaTypeToNodes.get(rt).add(node); - } + // There are collocatedCollections for this shard, so make sure there is a replica of this + // shard left on the node after it is removed + Set<Replica> replicasRemovedForShard = + removals + .computeIfAbsent( + replica.getShard().getCollection().getName(), k -> new HashMap<>()) + .computeIfAbsent(replica.getShard().getShardName(), k -> new HashSet<>()); + replicasRemovedForShard.add(replica); + + if (replicasRemovedForShard.size() + >= getReplicasForShardOnNode(replica.getShard()).size()) { + replicaRemovalExceptions.put( + replica, "co-located with replicas of " + collocatedCollections); } } + return replicaRemovalExceptions; } - return replicaTypeToNodes; - } - /** - * Picks nodes from {@code targetNodes} for placing {@code numReplicas} replicas. - * - * <p>The criteria used in this method are, in this order: - * - * <ol> - * <li>No more than one replica of a given shard on a given node (strictly enforced) - * <li>Balance as much as possible replicas of a given {@link - * org.apache.solr.cluster.Replica.ReplicaType} over available AZ's. This balancing takes - * into account existing replicas <b>of the corresponding replica type</b>, if any. - * <li>Place replicas if possible on nodes having more than a certain amount of free disk - * space (note that nodes with a too small amount of free disk space were eliminated as - * placement targets earlier, in {@link #getAvailableNodesForReplicaTypes(Set, - * AttributeValues, ReplicaMetrics)}). There's a threshold here rather than sorting on the - * amount of free disk space, because sorting on that value would in practice lead to - * never considering the number of cores on a node. - * <li>Place replicas on nodes having a smaller number of cores (the number of cores - * considered for this decision includes previous placement decisions made during the - * processing of the placement request) - * </ol> - */ - @SuppressForbidden( - reason = - "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.") - private void makePlacementDecisions( - SolrCollection solrCollection, - String shardName, - Set<String> availabilityZones, - Replica.ReplicaType replicaType, - int numReplicas, - final AttributeValues attrValues, - final ReplicaMetrics leaderMetrics, - EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes, - Set<Node> nodesWithReplicas, - Map<Node, Integer> coresOnNodes, - PlacementPlanFactory placementPlanFactory, - Set<ReplicaPlacement> replicaPlacements, - boolean doSpreadAcrossDomains) - throws PlacementException { - // Count existing replicas per AZ. We count only instances of the type of replica for which we - // need to do placement. If we ever want to balance replicas of any type across AZ's (and not - // each replica type balanced independently), we'd have to move this data structure to the - // caller of this method so it can be reused across different replica type placements for a - // given shard. Note then that this change would be risky. For example all NRT's and PULL - // replicas for a shard my be correctly balanced over three AZ's, but then all NRT can end up - // in the same AZ... - Map<String, Integer> azToNumReplicas = new HashMap<>(); - for (String az : availabilityZones) { - azToNumReplicas.put(az, 0); + @Override + protected boolean addProjectedReplicaWeights(Replica replica) { + nodeFreeDiskGB -= getProjectedSizeOfReplica(replica); + coresOnNode += 1; + return addReplicaToAzAndSpread(replica); } - // Build the set of candidate nodes for the placement, i.e. nodes that can accept the replica - // type - Set<Node> candidateNodes = new HashSet<>(replicaTypeToNodes.get(replicaType)); - // Remove nodes that already have a replica for the shard (no two replicas of same shard can - // be put on same node) - candidateNodes.removeAll(nodesWithReplicas); + @Override + protected void initReplicaWeights(Replica replica) { + addReplicaToAzAndSpread(replica); + } - // This Map will include the affinity labels for the nodes that are currently hosting replicas - // of this shard. It will be modified with new placement decisions. - Map<String, Integer> spreadDomainsInUse = new HashMap<>(); - Shard shard = solrCollection.getShard(shardName); - if (shard != null) { - // shard is non null if we're adding replicas to an already existing collection. - // If we're creating the collection, the shards do not exist yet. - for (Replica replica : shard.replicas()) { - // The node's AZ is counted as having a replica if it has a replica of the same type as - // the one we need to place here. - if (replica.getType() == replicaType) { - final String az = getNodeAZ(replica.getNode(), attrValues); - if (azToNumReplicas.containsKey(az)) { - // We do not count replicas on AZ's for which we don't have any node to place on - // because it would not help the placement decision. If we did want to do that, note - // the dereferencing below can't be assumed as the entry will not exist in the map. - azToNumReplicas.put(az, azToNumReplicas.get(az) + 1); - } - if (doSpreadAcrossDomains) { - attrValues - .getSystemProperty( - replica.getNode(), AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP) - .ifPresent(nodeDomain -> spreadDomainsInUse.merge(nodeDomain, 1, Integer::sum)); - } - } + private boolean addReplicaToAzAndSpread(Replica replica) { + boolean needsResort = false; + needsResort |= Review Comment: you can remove the default value in the first line ########## solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.api.collections; + +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.solr.cloud.ActiveReplicaWatcher; +import org.apache.solr.common.SolrCloseableLatch; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.CollectionStateWatcher; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CoreAdminParams; +import org.apache.solr.common.util.NamedList; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicaMigrationUtils { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * Code to migrate replicas to already chosen nodes. This will create new replicas, and delete the + * old replicas after the creation is done. + * + * <p>If an error occurs during the creation of new replicas, all new replicas will be deleted. + * + * @param ccc The collection command context to use from the API that calls this method + * @param movements a map from replica to the new node that the replica should live on + * @param parallel whether the replica creations should be done in parallel + * @param waitForFinalState wait for the final state of all newly created replicas before + * continuing + * @param timeout the amount of time to wait for new replicas to be created + * @param asyncId If provided, the command will be run under the given asyncId + * @param results push results (successful and failure) onto this list + * @return whether the command was successful + */ + static boolean migrateReplicas( + CollectionCommandContext ccc, + Map<Replica, String> movements, + boolean parallel, + boolean waitForFinalState, + int timeout, + String asyncId, + NamedList<Object> results) + throws IOException, InterruptedException, KeeperException { + // how many leaders are we moving? for these replicas we have to make sure that either: + // * another existing replica can become a leader, or + // * we wait until the newly created replica completes recovery (and can become the new leader) + // If waitForFinalState=true we wait for all replicas + int numLeaders = 0; + for (Replica replica : movements.keySet()) { + if (replica.isLeader() || waitForFinalState) { + numLeaders++; + } + } + // map of collectionName_coreNodeName to watchers + Map<String, CollectionStateWatcher> watchers = new HashMap<>(); + List<ZkNodeProps> createdReplicas = new ArrayList<>(); + + AtomicBoolean anyOneFailed = new AtomicBoolean(false); + SolrCloseableLatch countDownLatch = + new SolrCloseableLatch(movements.size(), ccc.getCloseableToLatchOn()); + + SolrCloseableLatch replicasToRecover = + new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn()); + + ClusterState clusterState = ccc.getZkStateReader().getClusterState(); + + for (Map.Entry<Replica, String> movement : movements.entrySet()) { + Replica sourceReplica = movement.getKey(); + String targetNode = movement.getValue(); + String sourceCollection = sourceReplica.getCollection(); + if (log.isInfoEnabled()) { + log.info( + "Going to create replica for collection={} shard={} on node={}", + sourceCollection, + sourceReplica.getShard(), + targetNode); + } + + ZkNodeProps msg = + sourceReplica + .toFullProps() + .plus("parallel", String.valueOf(parallel)) + .plus(CoreAdminParams.NODE, targetNode); + if (asyncId != null) msg.getProperties().put(ASYNC, asyncId); + NamedList<Object> nl = new NamedList<>(); + final ZkNodeProps addedReplica = + new AddReplicaCmd(ccc) + .addReplica( + clusterState, + msg, + nl, + () -> { + countDownLatch.countDown(); Review Comment: Should this happen at the end of the method instead of the beginning? Otherwise there is a race condition with the `anyOneFailed` I think? ########## solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.api.collections; + +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.solr.cloud.ActiveReplicaWatcher; +import org.apache.solr.common.SolrCloseableLatch; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.CollectionStateWatcher; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CoreAdminParams; +import org.apache.solr.common.util.NamedList; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicaMigrationUtils { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * Code to migrate replicas to already chosen nodes. This will create new replicas, and delete the + * old replicas after the creation is done. + * + * <p>If an error occurs during the creation of new replicas, all new replicas will be deleted. + * + * @param ccc The collection command context to use from the API that calls this method + * @param movements a map from replica to the new node that the replica should live on + * @param parallel whether the replica creations should be done in parallel + * @param waitForFinalState wait for the final state of all newly created replicas before + * continuing + * @param timeout the amount of time to wait for new replicas to be created + * @param asyncId If provided, the command will be run under the given asyncId + * @param results push results (successful and failure) onto this list + * @return whether the command was successful + */ + static boolean migrateReplicas( + CollectionCommandContext ccc, + Map<Replica, String> movements, + boolean parallel, + boolean waitForFinalState, + int timeout, + String asyncId, + NamedList<Object> results) + throws IOException, InterruptedException, KeeperException { + // how many leaders are we moving? for these replicas we have to make sure that either: + // * another existing replica can become a leader, or + // * we wait until the newly created replica completes recovery (and can become the new leader) + // If waitForFinalState=true we wait for all replicas + int numLeaders = 0; + for (Replica replica : movements.keySet()) { + if (replica.isLeader() || waitForFinalState) { + numLeaders++; + } + } + // map of collectionName_coreNodeName to watchers + Map<String, CollectionStateWatcher> watchers = new HashMap<>(); + List<ZkNodeProps> createdReplicas = new ArrayList<>(); + + AtomicBoolean anyOneFailed = new AtomicBoolean(false); + SolrCloseableLatch countDownLatch = + new SolrCloseableLatch(movements.size(), ccc.getCloseableToLatchOn()); + + SolrCloseableLatch replicasToRecover = + new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn()); + + ClusterState clusterState = ccc.getZkStateReader().getClusterState(); + + for (Map.Entry<Replica, String> movement : movements.entrySet()) { + Replica sourceReplica = movement.getKey(); + String targetNode = movement.getValue(); + String sourceCollection = sourceReplica.getCollection(); + if (log.isInfoEnabled()) { + log.info( + "Going to create replica for collection={} shard={} on node={}", + sourceCollection, + sourceReplica.getShard(), + targetNode); + } + + ZkNodeProps msg = + sourceReplica + .toFullProps() + .plus("parallel", String.valueOf(parallel)) + .plus(CoreAdminParams.NODE, targetNode); + if (asyncId != null) msg.getProperties().put(ASYNC, asyncId); + NamedList<Object> nl = new NamedList<>(); + final ZkNodeProps addedReplica = + new AddReplicaCmd(ccc) + .addReplica( + clusterState, + msg, + nl, + () -> { + countDownLatch.countDown(); + if (nl.get("failure") != null) { + String errorString = + String.format( + Locale.ROOT, + "Failed to create replica for collection=%s shard=%s" + " on node=%s", + sourceCollection, + sourceReplica.getShard(), + targetNode); + log.warn(errorString); + // one replica creation failed. Make the best attempt to + // delete all the replicas created so far in the target + // and exit + synchronized (results) { + results.add("failure", errorString); + anyOneFailed.set(true); + } + } else { + if (log.isDebugEnabled()) { + log.debug( + "Successfully created replica for collection={} shard={} on node={}", + sourceCollection, + sourceReplica.getShard(), + targetNode); + } + } + }) + .get(0); + + if (addedReplica != null) { + createdReplicas.add(addedReplica); + if (sourceReplica.isLeader() || waitForFinalState) { + String shardName = sourceReplica.getShard(); + String replicaName = sourceReplica.getName(); + String key = sourceCollection + "_" + replicaName; + CollectionStateWatcher watcher; + if (waitForFinalState) { + watcher = + new ActiveReplicaWatcher( + sourceCollection, + null, + Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)), + replicasToRecover); + } else { + watcher = + new LeaderRecoveryWatcher( + sourceCollection, + shardName, + replicaName, + addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), + replicasToRecover); + } + watchers.put(key, watcher); + log.debug("--- adding {}, {}", key, watcher); + ccc.getZkStateReader().registerCollectionStateWatcher(sourceCollection, watcher); + } else { + log.debug("--- not waiting for {}", addedReplica); + } + } + } + + log.debug("Waiting for replicas to be added"); + if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) { + log.info("Timed out waiting for replicas to be added"); + anyOneFailed.set(true); + } else { + log.debug("Finished waiting for replicas to be added"); + } + + // now wait for leader replicas to recover + log.debug("Waiting for {} leader replicas to recover", numLeaders); + if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) { Review Comment: Do we need to wait for this if `anyOneFailed` is `true`? ########## solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.api.collections; + +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.solr.cloud.ActiveReplicaWatcher; +import org.apache.solr.common.SolrCloseableLatch; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.CollectionStateWatcher; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CoreAdminParams; +import org.apache.solr.common.util.NamedList; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicaMigrationUtils { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * Code to migrate replicas to already chosen nodes. This will create new replicas, and delete the + * old replicas after the creation is done. + * + * <p>If an error occurs during the creation of new replicas, all new replicas will be deleted. + * + * @param ccc The collection command context to use from the API that calls this method + * @param movements a map from replica to the new node that the replica should live on + * @param parallel whether the replica creations should be done in parallel + * @param waitForFinalState wait for the final state of all newly created replicas before + * continuing + * @param timeout the amount of time to wait for new replicas to be created + * @param asyncId If provided, the command will be run under the given asyncId + * @param results push results (successful and failure) onto this list + * @return whether the command was successful + */ + static boolean migrateReplicas( + CollectionCommandContext ccc, + Map<Replica, String> movements, + boolean parallel, + boolean waitForFinalState, + int timeout, + String asyncId, + NamedList<Object> results) + throws IOException, InterruptedException, KeeperException { + // how many leaders are we moving? for these replicas we have to make sure that either: + // * another existing replica can become a leader, or + // * we wait until the newly created replica completes recovery (and can become the new leader) + // If waitForFinalState=true we wait for all replicas + int numLeaders = 0; + for (Replica replica : movements.keySet()) { + if (replica.isLeader() || waitForFinalState) { + numLeaders++; + } + } + // map of collectionName_coreNodeName to watchers + Map<String, CollectionStateWatcher> watchers = new HashMap<>(); + List<ZkNodeProps> createdReplicas = new ArrayList<>(); + + AtomicBoolean anyOneFailed = new AtomicBoolean(false); + SolrCloseableLatch countDownLatch = + new SolrCloseableLatch(movements.size(), ccc.getCloseableToLatchOn()); + + SolrCloseableLatch replicasToRecover = + new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn()); + + ClusterState clusterState = ccc.getZkStateReader().getClusterState(); + + for (Map.Entry<Replica, String> movement : movements.entrySet()) { + Replica sourceReplica = movement.getKey(); + String targetNode = movement.getValue(); + String sourceCollection = sourceReplica.getCollection(); + if (log.isInfoEnabled()) { + log.info( + "Going to create replica for collection={} shard={} on node={}", + sourceCollection, + sourceReplica.getShard(), + targetNode); + } + + ZkNodeProps msg = + sourceReplica + .toFullProps() + .plus("parallel", String.valueOf(parallel)) + .plus(CoreAdminParams.NODE, targetNode); + if (asyncId != null) msg.getProperties().put(ASYNC, asyncId); + NamedList<Object> nl = new NamedList<>(); + final ZkNodeProps addedReplica = + new AddReplicaCmd(ccc) + .addReplica( + clusterState, + msg, + nl, + () -> { + countDownLatch.countDown(); + if (nl.get("failure") != null) { + String errorString = + String.format( + Locale.ROOT, + "Failed to create replica for collection=%s shard=%s" + " on node=%s", + sourceCollection, + sourceReplica.getShard(), + targetNode); + log.warn(errorString); + // one replica creation failed. Make the best attempt to + // delete all the replicas created so far in the target + // and exit + synchronized (results) { + results.add("failure", errorString); + anyOneFailed.set(true); + } + } else { + if (log.isDebugEnabled()) { + log.debug( + "Successfully created replica for collection={} shard={} on node={}", + sourceCollection, + sourceReplica.getShard(), + targetNode); + } + } + }) + .get(0); + + if (addedReplica != null) { + createdReplicas.add(addedReplica); + if (sourceReplica.isLeader() || waitForFinalState) { + String shardName = sourceReplica.getShard(); + String replicaName = sourceReplica.getName(); + String key = sourceCollection + "_" + replicaName; + CollectionStateWatcher watcher; + if (waitForFinalState) { + watcher = + new ActiveReplicaWatcher( + sourceCollection, + null, + Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)), + replicasToRecover); + } else { + watcher = + new LeaderRecoveryWatcher( + sourceCollection, + shardName, + replicaName, + addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), + replicasToRecover); + } + watchers.put(key, watcher); + log.debug("--- adding {}, {}", key, watcher); + ccc.getZkStateReader().registerCollectionStateWatcher(sourceCollection, watcher); + } else { + log.debug("--- not waiting for {}", addedReplica); + } + } + } + + log.debug("Waiting for replicas to be added"); + if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) { + log.info("Timed out waiting for replicas to be added"); + anyOneFailed.set(true); + } else { + log.debug("Finished waiting for replicas to be added"); + } + + // now wait for leader replicas to recover + log.debug("Waiting for {} leader replicas to recover", numLeaders); + if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) { + if (log.isInfoEnabled()) { + log.info( + "Timed out waiting for {} leader replicas to recover", replicasToRecover.getCount()); + } + anyOneFailed.set(true); + } else { + log.debug("Finished waiting for leader replicas to recover"); + } + // remove the watchers, we're done either way + for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) { + ccc.getZkStateReader().removeCollectionStateWatcher(e.getKey(), e.getValue()); + } + if (anyOneFailed.get()) { + log.info("Failed to create some replicas. Cleaning up all replicas on target node"); + SolrCloseableLatch cleanupLatch = + new SolrCloseableLatch(createdReplicas.size(), ccc.getCloseableToLatchOn()); + for (ZkNodeProps createdReplica : createdReplicas) { + NamedList<Object> deleteResult = new NamedList<>(); + try { + new DeleteReplicaCmd(ccc) + .deleteReplica( + ccc.getZkStateReader().getClusterState(), + createdReplica.plus("parallel", "true"), + deleteResult, + () -> { + cleanupLatch.countDown(); + if (deleteResult.get("failure") != null) { + synchronized (results) { + results.add( + "failure", + "Could not cleanup, because of : " + deleteResult.get("failure")); + } + } + }); + } catch (KeeperException e) { + cleanupLatch.countDown(); + log.warn("Error deleting replica ", e); + } catch (Exception e) { + log.warn("Error deleting replica ", e); + cleanupLatch.countDown(); + throw e; + } + } + cleanupLatch.await(5, TimeUnit.MINUTES); + return false; + } + + // we have reached this far, meaning all replicas should have been recreated. + // now cleanup the original replicas + return cleanupReplicas( + results, ccc.getZkStateReader().getClusterState(), movements.keySet(), ccc, asyncId); + } + + static boolean cleanupReplicas( + NamedList<Object> results, + ClusterState clusterState, + Collection<Replica> sourceReplicas, + CollectionCommandContext ccc, + String async) + throws IOException, InterruptedException { + CountDownLatch cleanupLatch = new CountDownLatch(sourceReplicas.size()); Review Comment: We don't use `SolrCloseableLatch` here? ########## solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java: ########## @@ -251,1006 +231,463 @@ private AffinityPlacementPlugin( // We make things reproducible in tests by using test seed if any String seed = System.getProperty("tests.seed"); Review Comment: Leftover? ########## solr/core/src/java/org/apache/solr/cloud/api/collections/ReplicaMigrationUtils.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.api.collections; + +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.solr.cloud.ActiveReplicaWatcher; +import org.apache.solr.common.SolrCloseableLatch; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.CollectionStateWatcher; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CoreAdminParams; +import org.apache.solr.common.util.NamedList; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicaMigrationUtils { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * Code to migrate replicas to already chosen nodes. This will create new replicas, and delete the + * old replicas after the creation is done. + * + * <p>If an error occurs during the creation of new replicas, all new replicas will be deleted. + * + * @param ccc The collection command context to use from the API that calls this method + * @param movements a map from replica to the new node that the replica should live on + * @param parallel whether the replica creations should be done in parallel + * @param waitForFinalState wait for the final state of all newly created replicas before + * continuing + * @param timeout the amount of time to wait for new replicas to be created + * @param asyncId If provided, the command will be run under the given asyncId + * @param results push results (successful and failure) onto this list + * @return whether the command was successful + */ + static boolean migrateReplicas( + CollectionCommandContext ccc, + Map<Replica, String> movements, + boolean parallel, + boolean waitForFinalState, + int timeout, + String asyncId, + NamedList<Object> results) + throws IOException, InterruptedException, KeeperException { + // how many leaders are we moving? for these replicas we have to make sure that either: + // * another existing replica can become a leader, or + // * we wait until the newly created replica completes recovery (and can become the new leader) + // If waitForFinalState=true we wait for all replicas + int numLeaders = 0; + for (Replica replica : movements.keySet()) { + if (replica.isLeader() || waitForFinalState) { + numLeaders++; + } + } + // map of collectionName_coreNodeName to watchers + Map<String, CollectionStateWatcher> watchers = new HashMap<>(); + List<ZkNodeProps> createdReplicas = new ArrayList<>(); + + AtomicBoolean anyOneFailed = new AtomicBoolean(false); + SolrCloseableLatch countDownLatch = + new SolrCloseableLatch(movements.size(), ccc.getCloseableToLatchOn()); + + SolrCloseableLatch replicasToRecover = + new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn()); + + ClusterState clusterState = ccc.getZkStateReader().getClusterState(); + + for (Map.Entry<Replica, String> movement : movements.entrySet()) { + Replica sourceReplica = movement.getKey(); + String targetNode = movement.getValue(); + String sourceCollection = sourceReplica.getCollection(); + if (log.isInfoEnabled()) { + log.info( + "Going to create replica for collection={} shard={} on node={}", + sourceCollection, + sourceReplica.getShard(), + targetNode); + } + + ZkNodeProps msg = + sourceReplica + .toFullProps() + .plus("parallel", String.valueOf(parallel)) + .plus(CoreAdminParams.NODE, targetNode); + if (asyncId != null) msg.getProperties().put(ASYNC, asyncId); + NamedList<Object> nl = new NamedList<>(); + final ZkNodeProps addedReplica = + new AddReplicaCmd(ccc) + .addReplica( + clusterState, + msg, + nl, + () -> { + countDownLatch.countDown(); + if (nl.get("failure") != null) { + String errorString = + String.format( + Locale.ROOT, + "Failed to create replica for collection=%s shard=%s" + " on node=%s", Review Comment: No need to have two strings here ########## solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java: ########## @@ -135,190 +100,35 @@ public void call(ClusterState state, ZkNodeProps message, NamedList<Object> resu assignRequests.add(assignRequest); } Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ccc.getCoreContainer()); - replicaPositions = assignStrategy.assign(ccc.getSolrCloudManager(), assignRequests); - } - int replicaPositionIdx = 0; - for (ZkNodeProps sourceReplica : sourceReplicas) { - String sourceCollection = sourceReplica.getStr(COLLECTION_PROP); - if (log.isInfoEnabled()) { - log.info( - "Going to create replica for collection={} shard={} on node={}", - sourceCollection, - sourceReplica.getStr(SHARD_ID_PROP), - target); - } - String targetNode; - // Use the assigned replica positions, if target is null or empty (checked above) - if (replicaPositions != null) { - targetNode = replicaPositions.get(replicaPositionIdx).node; - replicaPositionIdx++; - } else { - targetNode = target; - } - ZkNodeProps msg = - sourceReplica - .plus("parallel", String.valueOf(parallel)) - .plus(CoreAdminParams.NODE, targetNode); - if (async != null) msg.getProperties().put(ASYNC, async); - NamedList<Object> nl = new NamedList<>(); - final ZkNodeProps addedReplica = - new AddReplicaCmd(ccc) - .addReplica( - clusterState, - msg, - nl, - () -> { - countDownLatch.countDown(); - if (nl.get("failure") != null) { - String errorString = - String.format( - Locale.ROOT, - "Failed to create replica for collection=%s shard=%s" + " on node=%s", - sourceCollection, - sourceReplica.getStr(SHARD_ID_PROP), - target); - log.warn(errorString); - // one replica creation failed. Make the best attempt to - // delete all the replicas created so far in the target - // and exit - synchronized (results) { - results.add("failure", errorString); - anyOneFailed.set(true); - } - } else { - if (log.isDebugEnabled()) { - log.debug( - "Successfully created replica for collection={} shard={} on node={}", - sourceCollection, - sourceReplica.getStr(SHARD_ID_PROP), - target); - } - } - }) - .get(0); - - if (addedReplica != null) { - createdReplicas.add(addedReplica); - if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) { - String shardName = sourceReplica.getStr(SHARD_ID_PROP); - String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP); - String collectionName = sourceCollection; - String key = collectionName + "_" + replicaName; - CollectionStateWatcher watcher; - if (waitForFinalState) { - watcher = - new ActiveReplicaWatcher( - collectionName, - null, - Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)), - replicasToRecover); - } else { - watcher = - new LeaderRecoveryWatcher( - collectionName, - shardName, - replicaName, - addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), - replicasToRecover); - } - watchers.put(key, watcher); - log.debug("--- adding {}, {}", key, watcher); - zkStateReader.registerCollectionStateWatcher(collectionName, watcher); - } else { - log.debug("--- not waiting for {}", addedReplica); - } + List<ReplicaPosition> replicaPositions = + assignStrategy.assign(ccc.getSolrCloudManager(), assignRequests); + int position = 0; + for (Replica sourceReplica : sourceReplicas) { + replicaMovements.put(sourceReplica, replicaPositions.get(position++).node); } - } - - log.debug("Waiting for replicas to be added"); - if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) { - log.info("Timed out waiting for replicas to be added"); - anyOneFailed.set(true); - } else { - log.debug("Finished waiting for replicas to be added"); - } - - // now wait for leader replicas to recover - log.debug("Waiting for {} leader replicas to recover", numLeaders); - if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) { - if (log.isInfoEnabled()) { - log.info( - "Timed out waiting for {} leader replicas to recover", replicasToRecover.getCount()); - } - anyOneFailed.set(true); } else { - log.debug("Finished waiting for leader replicas to recover"); - } - // remove the watchers, we're done either way - for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) { - zkStateReader.removeCollectionStateWatcher(e.getKey(), e.getValue()); - } - if (anyOneFailed.get()) { - log.info("Failed to create some replicas. Cleaning up all replicas on target node"); - SolrCloseableLatch cleanupLatch = - new SolrCloseableLatch(createdReplicas.size(), ccc.getCloseableToLatchOn()); - for (ZkNodeProps createdReplica : createdReplicas) { - NamedList<Object> deleteResult = new NamedList<>(); - try { - new DeleteReplicaCmd(ccc) - .deleteReplica( - zkStateReader.getClusterState(), - createdReplica.plus("parallel", "true"), - deleteResult, - () -> { - cleanupLatch.countDown(); - if (deleteResult.get("failure") != null) { - synchronized (results) { - results.add( - "failure", - "Could not cleanup, because of : " + deleteResult.get("failure")); - } - } - }); - } catch (KeeperException e) { - cleanupLatch.countDown(); - log.warn("Error deleting replica ", e); - } catch (Exception e) { - log.warn("Error deleting replica ", e); - cleanupLatch.countDown(); - throw e; - } + for (Replica sourceReplica : sourceReplicas) { + replicaMovements.put(sourceReplica, target); } - cleanupLatch.await(5, TimeUnit.MINUTES); - return; } - // we have reached this far means all replicas could be recreated - // now cleanup the replicas in the source node - DeleteNodeCmd.cleanupReplicas(results, state, sourceReplicas, ccc, source, async); - results.add( - "success", - "REPLACENODE action completed successfully from : " + source + " to : " + target); + boolean migrationSuccessful = + ReplicaMigrationUtils.migrateReplicas( + ccc, replicaMovements, parallel, waitForFinalState, timeout, async, results); + if (migrationSuccessful) { + results.add( + "success", + "REPLACENODE action completed successfully from : " + source + " to : " + target); Review Comment: I know it's not new, but this log line is misleading in the case of async (and maybe in waitForFinalState=false) too. Should we at least include also the values for async? or maybe have something like: ``` "REPLACENODE action " + (async == null ? "completed": "submitted") + " successfully from : " ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org For additional commands, e-mail: issues-h...@solr.apache.org