HoustonPutman commented on code in PR #1650: URL: https://github.com/apache/solr/pull/1650#discussion_r1224379842
########## solr/core/src/java/org/apache/solr/cluster/placement/plugins/AffinityPlacementFactory.java: ########## @@ -251,1006 +231,463 @@ private AffinityPlacementPlugin( // We make things reproducible in tests by using test seed if any String seed = System.getProperty("tests.seed"); - if (seed != null) { - replicaPlacementRandom.setSeed(seed.hashCode()); - } - } - - @Override - @SuppressForbidden( - reason = - "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.") - public List<PlacementPlan> computePlacements( - Collection<PlacementRequest> requests, PlacementContext placementContext) - throws PlacementException { - List<PlacementPlan> placementPlans = new ArrayList<>(requests.size()); - Set<Node> allNodes = new HashSet<>(); - Set<SolrCollection> allCollections = new HashSet<>(); - for (PlacementRequest request : requests) { - allNodes.addAll(request.getTargetNodes()); - allCollections.add(request.getCollection()); - } - - // Fetch attributes for a superset of all nodes requested amongst the placementRequests - AttributeFetcher attributeFetcher = placementContext.getAttributeFetcher(); - attributeFetcher - .requestNodeSystemProperty(AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP) - .requestNodeSystemProperty(AffinityPlacementConfig.NODE_TYPE_SYSPROP) - .requestNodeSystemProperty(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP) - .requestNodeSystemProperty(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP); - attributeFetcher - .requestNodeMetric(BuiltInMetrics.NODE_NUM_CORES) - .requestNodeMetric(BuiltInMetrics.NODE_FREE_DISK_GB); - Set<ReplicaMetric<?>> replicaMetrics = Set.of(BuiltInMetrics.REPLICA_INDEX_SIZE_GB); - for (SolrCollection collection : allCollections) { - attributeFetcher.requestCollectionMetrics(collection, replicaMetrics); - } - attributeFetcher.fetchFrom(allNodes); - final AttributeValues attrValues = attributeFetcher.fetchAttributes(); - // Get the number of currently existing cores per node, so we can update as we place new cores - // to not end up always selecting the same node(s). This is used across placement requests - Map<Node, Integer> allCoresOnNodes = getCoreCountPerNode(allNodes, attrValues); - - boolean doSpreadAcrossDomains = shouldSpreadAcrossDomains(allNodes, attrValues); - - // Keep track with nodesWithReplicas across requests - Map<String, Map<String, Set<Node>>> allNodesWithReplicas = new HashMap<>(); - for (PlacementRequest request : requests) { - Set<Node> nodes = request.getTargetNodes(); - SolrCollection solrCollection = request.getCollection(); - - // filter out nodes that don't meet the `withCollection` constraint - nodes = - filterNodesWithCollection(placementContext.getCluster(), request, attrValues, nodes); - // filter out nodes that don't match the "node types" specified in the collection props - nodes = filterNodesByNodeType(placementContext.getCluster(), request, attrValues, nodes); - - // All available zones of live nodes. Due to some nodes not being candidates for placement, - // and some existing replicas being one availability zones that might be offline (i.e. their - // nodes are not live), this set might contain zones on which it is impossible to place - // replicas. That's ok. - Set<String> availabilityZones = getZonesFromNodes(nodes, attrValues); - - // Build the replica placement decisions here - Set<ReplicaPlacement> replicaPlacements = new HashSet<>(); - - // Let's now iterate on all shards to create replicas for and start finding home sweet homes - // for the replicas - for (String shardName : request.getShardNames()) { - ReplicaMetrics leaderMetrics = - attrValues - .getCollectionMetrics(solrCollection.getName()) - .flatMap(colMetrics -> colMetrics.getShardMetrics(shardName)) - .flatMap(ShardMetrics::getLeaderMetrics) - .orElse(null); - - // Split the set of nodes into 3 sets of nodes accepting each replica type (sets can - // overlap - // if nodes accept multiple replica types). These subsets sets are actually maps, because - // we - // capture the number of cores (of any replica type) present on each node. - // - // This also filters out nodes that will not satisfy the rules if the replica is placed - // there - EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes = - getAvailableNodesForReplicaTypes(nodes, attrValues, leaderMetrics); - - // Inventory nodes (if any) that already have a replica of any type for the shard, because - // we can't be placing additional replicas on these. This data structure is updated after - // each replica to node assign and is used to make sure different replica types are not - // allocated to the same nodes (protecting same node assignments within a given replica - // type is done "by construction" in makePlacementDecisions()). - Set<Node> nodesWithReplicas = - allNodesWithReplicas - .computeIfAbsent(solrCollection.getName(), col -> new HashMap<>()) - .computeIfAbsent( - shardName, - s -> { - Set<Node> newNodeSet = new HashSet<>(); - Shard shard = solrCollection.getShard(s); - if (shard != null) { - // Prefill the set with the existing replicas - for (Replica r : shard.replicas()) { - newNodeSet.add(r.getNode()); - } - } - return newNodeSet; - }); - - // Iterate on the replica types in the enum order. We place more strategic replicas first - // (NRT is more strategic than TLOG more strategic than PULL). This is in case we - // eventually decide that less strategic replica placement impossibility is not a problem - // that should lead to replica placement computation failure. Current code does fail if - // placement is impossible (constraint is at most one replica of a shard on any node). - for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) { - int numReplicasToCreate = request.getCountReplicasToCreate(replicaType); - if (numReplicasToCreate > 0) { - makePlacementDecisions( - solrCollection, - shardName, - availabilityZones, - replicaType, - numReplicasToCreate, - attrValues, - leaderMetrics, - replicaTypeToNodes, - nodesWithReplicas, - allCoresOnNodes, - placementContext.getPlacementPlanFactory(), - replicaPlacements, - doSpreadAcrossDomains); - } - } - } - placementPlans.add( - placementContext - .getPlacementPlanFactory() - .createPlacementPlan(request, replicaPlacements)); - } - - return placementPlans; - } - - private boolean shouldSpreadAcrossDomains(Set<Node> allNodes, AttributeValues attrValues) { - boolean doSpreadAcrossDomains = - spreadAcrossDomains && spreadDomainPropPresent(allNodes, attrValues); - if (spreadAcrossDomains && !doSpreadAcrossDomains) { - log.warn( - "AffinityPlacementPlugin configured to spread across domains, but there are nodes in the cluster without the {} system property. Ignoring spreadAcrossDomains.", - AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP); - } - return doSpreadAcrossDomains; - } - - private boolean spreadDomainPropPresent(Set<Node> allNodes, AttributeValues attrValues) { - // We can only use spread domains if all nodes have the system property - return allNodes.stream() - .noneMatch( - n -> - attrValues - .getSystemProperty(n, AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP) - .isEmpty()); } @Override - public void verifyAllowedModification( - ModificationRequest modificationRequest, PlacementContext placementContext) - throws PlacementModificationException { - if (modificationRequest instanceof DeleteShardsRequest) { - log.warn("DeleteShardsRequest not implemented yet, skipping: {}", modificationRequest); - } else if (modificationRequest instanceof DeleteCollectionRequest) { - verifyDeleteCollection((DeleteCollectionRequest) modificationRequest, placementContext); - } else if (modificationRequest instanceof DeleteReplicasRequest) { - verifyDeleteReplicas((DeleteReplicasRequest) modificationRequest, placementContext); - } else { - log.warn("unsupported request type, skipping: {}", modificationRequest); - } - } - - private void verifyDeleteCollection( + protected void verifyDeleteCollection( DeleteCollectionRequest deleteCollectionRequest, PlacementContext placementContext) throws PlacementModificationException { Cluster cluster = placementContext.getCluster(); - Set<String> colocatedCollections = - colocatedWith.getOrDefault(deleteCollectionRequest.getCollection().getName(), Set.of()); - for (String primaryName : colocatedCollections) { + Set<String> collocatedCollections = + collocatedWith.getOrDefault(deleteCollectionRequest.getCollection().getName(), Set.of()); + for (String primaryName : collocatedCollections) { try { if (cluster.getCollection(primaryName) != null) { // still exists throw new PlacementModificationException( - "colocated collection " + "collocated collection " + primaryName + " of " + deleteCollectionRequest.getCollection().getName() + " still present"); } } catch (IOException e) { throw new PlacementModificationException( - "failed to retrieve colocated collection information", e); + "failed to retrieve collocated collection information", e); } } } - private void verifyDeleteReplicas( - DeleteReplicasRequest deleteReplicasRequest, PlacementContext placementContext) - throws PlacementModificationException { - Cluster cluster = placementContext.getCluster(); - SolrCollection secondaryCollection = deleteReplicasRequest.getCollection(); - Set<String> colocatedCollections = colocatedWith.get(secondaryCollection.getName()); - if (colocatedCollections == null) { - return; - } - Map<Node, Map<String, AtomicInteger>> secondaryNodeShardReplicas = new HashMap<>(); - secondaryCollection - .shards() - .forEach( - shard -> - shard - .replicas() - .forEach( - replica -> - secondaryNodeShardReplicas - .computeIfAbsent(replica.getNode(), n -> new HashMap<>()) - .computeIfAbsent( - replica.getShard().getShardName(), s -> new AtomicInteger()) - .incrementAndGet())); + private static final class AffinityPlacementContext { + private final Set<String> allSpreadDomains = new HashSet<>(); + private final Map<String, Map<String, ReplicaSpread>> spreadDomainUsage = new HashMap<>(); + private final Set<String> allAvailabilityZones = new HashSet<>(); + private final Map<String, Map<String, Map<Replica.ReplicaType, ReplicaSpread>>> + availabilityZoneUsage = new HashMap<>(); + private boolean doSpreadAcrossDomains; + } - // find the colocated-with collections - Map<Node, Set<String>> colocatingNodes = new HashMap<>(); - try { - for (String colocatedCollection : colocatedCollections) { - SolrCollection coll = cluster.getCollection(colocatedCollection); - coll.shards() - .forEach( - shard -> - shard - .replicas() - .forEach( - replica -> - colocatingNodes - .computeIfAbsent(replica.getNode(), n -> new HashSet<>()) - .add(coll.getName()))); - } - } catch (IOException ioe) { - throw new PlacementModificationException( - "failed to retrieve colocated collection information", ioe); - } - PlacementModificationException exception = null; - for (Replica replica : deleteReplicasRequest.getReplicas()) { - if (!colocatingNodes.containsKey(replica.getNode())) { - continue; - } - // check that there will be at least one replica remaining - AtomicInteger secondaryCount = - secondaryNodeShardReplicas - .getOrDefault(replica.getNode(), Map.of()) - .getOrDefault(replica.getShard().getShardName(), new AtomicInteger()); - if (secondaryCount.get() > 1) { - // we can delete it - record the deletion - secondaryCount.decrementAndGet(); - continue; - } - // fail - this replica cannot be removed - if (exception == null) { - exception = new PlacementModificationException("delete replica(s) rejected"); + @Override + protected Map<Node, WeightedNode> getBaseWeightedNodes( + PlacementContext placementContext, + Set<Node> nodes, + Iterable<SolrCollection> relevantCollections, + boolean skipNodesWithErrors) + throws PlacementException { + // Fetch attributes for a superset of all nodes requested amongst the placementRequests + AttributeFetcher attributeFetcher = placementContext.getAttributeFetcher(); + attributeFetcher + .requestNodeSystemProperty(AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP) + .requestNodeSystemProperty(AffinityPlacementConfig.NODE_TYPE_SYSPROP) + .requestNodeSystemProperty(AffinityPlacementConfig.REPLICA_TYPE_SYSPROP) + .requestNodeSystemProperty(AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP); + attributeFetcher + .requestNodeMetric(BuiltInMetrics.NODE_NUM_CORES) + .requestNodeMetric(BuiltInMetrics.NODE_FREE_DISK_GB); + Set<ReplicaMetric<?>> replicaMetrics = Set.of(BuiltInMetrics.REPLICA_INDEX_SIZE_GB); + Set<String> requestedCollections = new HashSet<>(); + for (SolrCollection collection : relevantCollections) { + if (requestedCollections.add(collection.getName())) { + attributeFetcher.requestCollectionMetrics(collection, replicaMetrics); } - exception.addRejectedModification( - replica.toString(), - "co-located with replicas of " + colocatingNodes.get(replica.getNode())); - } - if (exception != null) { - throw exception; } - } + attributeFetcher.fetchFrom(nodes); + final AttributeValues attrValues = attributeFetcher.fetchAttributes(); - private Set<String> getZonesFromNodes(Set<Node> nodes, final AttributeValues attrValues) { - Set<String> azs = new HashSet<>(); + AffinityPlacementContext affinityPlacementContext = new AffinityPlacementContext(); + affinityPlacementContext.doSpreadAcrossDomains = spreadAcrossDomains; - for (Node n : nodes) { - azs.add(getNodeAZ(n, attrValues)); + Map<Node, WeightedNode> affinityNodeMap = CollectionUtil.newHashMap(nodes.size()); + for (Node node : nodes) { + AffinityNode affinityNode = + newNodeFromMetrics(node, attrValues, affinityPlacementContext, skipNodesWithErrors); + if (affinityNode != null) { + affinityNodeMap.put(node, affinityNode); + } } - return Collections.unmodifiableSet(azs); + return affinityNodeMap; } - /** - * Resolves the AZ of a node and takes care of nodes that have no defined AZ in system property - * {@link AffinityPlacementConfig#AVAILABILITY_ZONE_SYSPROP} to then return {@link - * AffinityPlacementConfig#UNDEFINED_AVAILABILITY_ZONE} as the AZ name. - */ - private String getNodeAZ(Node n, final AttributeValues attrValues) { - Optional<String> nodeAz = - attrValues.getSystemProperty(n, AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP); - // All nodes with undefined AZ will be considered part of the same AZ. This also works for - // deployments that do not care about AZ's - return nodeAz.orElse(AffinityPlacementConfig.UNDEFINED_AVAILABILITY_ZONE); - } - - /** - * This class captures an availability zone and the nodes that are legitimate targets for - * replica placement in that Availability Zone. Instances are used as values in a {@link - * java.util.TreeMap} in which the total number of already existing replicas in the AZ is the - * key. This allows easily picking the set of nodes from which to select a node for placement in - * order to balance the number of replicas per AZ. Picking one of the nodes from the set is done - * using different criteria unrelated to the Availability Zone (picking the node is based on the - * {@link CoresAndDiskComparator} ordering). - */ - private static class AzWithNodes { - final String azName; - private final boolean useSpreadDomains; - private boolean listIsSorted = false; - private final Comparator<Node> nodeComparator; - private final Random random; - private final List<Node> availableNodesForPlacement; - private final AttributeValues attributeValues; - private final ReplicaMetrics leaderMetrics; - private TreeSet<SpreadDomainWithNodes> sortedSpreadDomains; - private final Map<String, Integer> currentSpreadDomainUsageUsage; - private int numNodesForPlacement; - - AzWithNodes( - String azName, - List<Node> availableNodesForPlacement, - boolean useSpreadDomains, - Comparator<Node> nodeComparator, - Random random, - AttributeValues attributeValues, - ReplicaMetrics leaderMetrics, - Map<String, Integer> currentSpreadDomainUsageUsage) { - this.azName = azName; - this.availableNodesForPlacement = availableNodesForPlacement; - this.useSpreadDomains = useSpreadDomains; - this.nodeComparator = nodeComparator; - this.random = random; - this.attributeValues = attributeValues; - this.leaderMetrics = leaderMetrics; - this.currentSpreadDomainUsageUsage = currentSpreadDomainUsageUsage; - this.numNodesForPlacement = availableNodesForPlacement.size(); - } - - private boolean hasBeenSorted() { - return (useSpreadDomains && sortedSpreadDomains != null) - || (!useSpreadDomains && listIsSorted); - } - - void ensureSorted() { - if (!hasBeenSorted()) { - sort(); - } + AffinityNode newNodeFromMetrics( + Node node, + AttributeValues attrValues, + AffinityPlacementContext affinityPlacementContext, + boolean skipNodesWithErrors) + throws PlacementException { + Set<Replica.ReplicaType> supportedReplicaTypes = + attrValues.getSystemProperty(node, AffinityPlacementConfig.REPLICA_TYPE_SYSPROP).stream() + .flatMap(s -> Arrays.stream(s.split(","))) + .map(String::trim) + .map(s -> s.toUpperCase(Locale.ROOT)) + .map( + s -> { + try { + return Replica.ReplicaType.valueOf(s); + } catch (IllegalArgumentException e) { + log.warn( + "Node {} has an invalid value for the {} systemProperty: {}", + node.getName(), + AffinityPlacementConfig.REPLICA_TYPE_SYSPROP, + s); + return null; + } + }) + .collect(Collectors.toSet()); + if (supportedReplicaTypes.isEmpty()) { + // If property not defined or is only whitespace on a node, assuming node can take any + // replica type + supportedReplicaTypes = Set.of(Replica.ReplicaType.values()); } - private void sort() { - assert !listIsSorted && sortedSpreadDomains == null - : "We shouldn't be sorting this list again"; - - // Make sure we do not tend to use always the same nodes (within an AZ) if all - // conditions are identical (well, this likely is not the case since after having added - // a replica to a node its number of cores increases for the next placement decision, - // but let's be defensive here, given that multiple concurrent placement decisions might - // see the same initial cluster state, and we want placement to be reasonable even in - // that case without creating an unnecessary imbalance). For example, if all nodes have - // 0 cores and same amount of free disk space, ideally we want to pick a random node for - // placement, not always the same one due to some internal ordering. - Collections.shuffle(availableNodesForPlacement, random); - - if (useSpreadDomains) { - // When we use spread domains, we don't just sort the list of nodes, instead we generate a - // TreeSet of SpreadDomainWithNodes, - // sorted by the number of times the domain has been used. Each - // SpreadDomainWithNodes internally contains the list of nodes that belong to that - // particular domain, - // and it's sorted internally by the comparator passed to this - // class (which is the same that's used when not using spread domains). - // Whenever a node from a particular SpreadDomainWithNodes is selected as the best - // candidate, the call to "removeBestNode" will: - // 1. Remove the SpreadDomainWithNodes instance from the TreeSet - // 2. Remove the best node from the list within the SpreadDomainWithNodes - // 3. Increment the count of times the domain has been used - // 4. Re-add the SpreadDomainWithNodes instance to the TreeSet if there are still nodes - // available - HashMap<String, List<Node>> spreadDomainToListOfNodesMap = new HashMap<>(); - for (Node node : availableNodesForPlacement) { - spreadDomainToListOfNodesMap - .computeIfAbsent( - attributeValues - .getSystemProperty(node, AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP) - .get(), - k -> new ArrayList<>()) - .add(node); - } - sortedSpreadDomains = - new TreeSet<>(new SpreadDomainComparator(currentSpreadDomainUsageUsage)); - - int i = 0; - for (Map.Entry<String, List<Node>> entry : spreadDomainToListOfNodesMap.entrySet()) { - // Sort the nodes within the spread domain by the provided comparator - entry.getValue().sort(nodeComparator); - sortedSpreadDomains.add( - new SpreadDomainWithNodes(entry.getKey(), entry.getValue(), i++, nodeComparator)); + Set<String> nodeType; + Optional<String> nodePropOpt = + attrValues.getSystemProperty(node, AffinityPlacementConfig.NODE_TYPE_SYSPROP); + if (nodePropOpt.isEmpty()) { + nodeType = Collections.emptySet(); + } else { + nodeType = new HashSet<>(StrUtils.splitSmart(nodePropOpt.get(), ',')); + } + + Optional<Double> nodeFreeDiskGB = + attrValues.getNodeMetric(node, BuiltInMetrics.NODE_FREE_DISK_GB); + Optional<Integer> nodeNumCores = + attrValues.getNodeMetric(node, BuiltInMetrics.NODE_NUM_CORES); + String az = + attrValues + .getSystemProperty(node, AffinityPlacementConfig.AVAILABILITY_ZONE_SYSPROP) + .orElse(AffinityPlacementConfig.UNDEFINED_AVAILABILITY_ZONE); + affinityPlacementContext.allAvailabilityZones.add(az); + String spreadDomain; + if (affinityPlacementContext.doSpreadAcrossDomains) { + spreadDomain = + attrValues + .getSystemProperty(node, AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP) + .orElse(null); + if (spreadDomain == null) { + if (log.isWarnEnabled()) { + log.warn( + "AffinityPlacementPlugin configured to spread across domains, but node {} does not have the {} system property. Ignoring spreadAcrossDomains.", + node.getName(), + AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP); } + affinityPlacementContext.doSpreadAcrossDomains = false; + affinityPlacementContext.allSpreadDomains.clear(); } else { - availableNodesForPlacement.sort(nodeComparator); - listIsSorted = true; + affinityPlacementContext.allSpreadDomains.add(spreadDomain); } + } else { + spreadDomain = null; } - - Node getBestNode() { - assert hasBeenSorted(); - if (useSpreadDomains) { - return sortedSpreadDomains.first().sortedNodesForPlacement.get(0); - } else { - return availableNodesForPlacement.get(0); + if (nodeFreeDiskGB.isEmpty() && skipNodesWithErrors) { + if (log.isWarnEnabled()) { + log.warn( + "Unknown free disk on node {}, excluding it from placement decisions.", + node.getName()); } - } - - public Node removeBestNode() { - assert hasBeenSorted(); - this.numNodesForPlacement--; - Node n; - if (useSpreadDomains) { - // Since this SpreadDomainWithNodes needs to be re-sorted in the sortedSpreadDomains, we - // remove it and then re-add it, once the best node has been removed. - SpreadDomainWithNodes group = sortedSpreadDomains.pollFirst(); - n = group.sortedNodesForPlacement.remove(0); - this.currentSpreadDomainUsageUsage.merge(group.spreadDomainName, 1, Integer::sum); - if (!group.sortedNodesForPlacement.isEmpty()) { - sortedSpreadDomains.add(group); - } - } else { - n = availableNodesForPlacement.remove(0); + return null; + } else if (nodeNumCores.isEmpty() && skipNodesWithErrors) { + if (log.isWarnEnabled()) { + log.warn( + "Unknown number of cores on node {}, excluding it from placement decisions.", + node.getName()); } - Optional.ofNullable(leaderMetrics) - .flatMap(lrm -> lrm.getReplicaMetric(BuiltInMetrics.REPLICA_INDEX_SIZE_GB)) - .ifPresent( - indexSize -> - attributeValues.decreaseNodeMetric( - n, BuiltInMetrics.NODE_FREE_DISK_GB, indexSize)); - attributeValues.increaseNodeMetric(n, BuiltInMetrics.NODE_NUM_CORES, 1); - return n; - } - - public int numNodes() { - return this.numNodesForPlacement; + return null; + } else { + return new AffinityNode( + node, + attrValues, + affinityPlacementContext, + supportedReplicaTypes, + nodeType, + nodeNumCores.orElse(0), + nodeFreeDiskGB.orElse(0D), + az, + spreadDomain); } } - /** - * This class represents group of nodes with the same {@link - * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label. - */ - static class SpreadDomainWithNodes implements Comparable<SpreadDomainWithNodes> { + private class AffinityNode extends WeightedNode { - /** - * This is the label that all nodes in this group have in {@link - * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label. - */ - final String spreadDomainName; + private final AttributeValues attrValues; - /** - * The list of all nodes that contain the same {@link - * AffinityPlacementConfig#SPREAD_DOMAIN_SYSPROP} label. They must be sorted before creating - * this class. - */ - private final List<Node> sortedNodesForPlacement; + private final AffinityPlacementContext affinityPlacementContext; - /** - * This is used for tie breaking the sort of {@link SpreadDomainWithNodes}, when the - * nodeComparator between the top nodes of each group return 0. - */ - private final int tieBreaker; + private final Set<Replica.ReplicaType> supportedReplicaTypes; + private final Set<String> nodeType; - /** - * This is the comparator that is used to compare the top nodes in the {@link - * #sortedNodesForPlacement} lists. Must be the same that was used to sort {@link - * #sortedNodesForPlacement}. - */ - private final Comparator<Node> nodeComparator; + private int coresOnNode; + private double nodeFreeDiskGB; - public SpreadDomainWithNodes( - String spreadDomainName, - List<Node> sortedNodesForPlacement, - int tieBreaker, - Comparator<Node> nodeComparator) { - this.spreadDomainName = spreadDomainName; - this.sortedNodesForPlacement = sortedNodesForPlacement; - this.tieBreaker = tieBreaker; - this.nodeComparator = nodeComparator; + private final String availabilityZone; + private final String spreadDomain; + + AffinityNode( + Node node, + AttributeValues attrValues, + AffinityPlacementContext affinityPlacementContext, + Set<Replica.ReplicaType> supportedReplicaTypes, + Set<String> nodeType, + int coresOnNode, + double nodeFreeDiskGB, + String az, + String spreadDomain) { + super(node); + this.attrValues = attrValues; + this.affinityPlacementContext = affinityPlacementContext; + this.supportedReplicaTypes = supportedReplicaTypes; + this.nodeType = nodeType; + this.coresOnNode = coresOnNode; + this.nodeFreeDiskGB = nodeFreeDiskGB; + this.availabilityZone = az; + this.spreadDomain = spreadDomain; } @Override - public int compareTo(SpreadDomainWithNodes o) { - if (o == this) { - return 0; - } - int result = - nodeComparator.compare( - this.sortedNodesForPlacement.get(0), o.sortedNodesForPlacement.get(0)); - if (result == 0) { - return Integer.compare(this.tieBreaker, o.tieBreaker); - } - return result; + public int calcWeight() { + return coresOnNode + + 100 * (prioritizedFreeDiskGB > 0 && nodeFreeDiskGB < prioritizedFreeDiskGB ? 1 : 0) + + 10000 * getSpreadDomainWeight() + + 1000000 * getAZWeight(); } - } - - /** - * Builds the number of existing cores on each node returned in the attrValues. Nodes for which - * the number of cores is not available for whatever reason are excluded from acceptable - * candidate nodes as it would not be possible to make any meaningful placement decisions. - * - * @param nodes all nodes on which this plugin should compute placement - * @param attrValues attributes fetched for the nodes. This method uses system property {@link - * AffinityPlacementConfig#REPLICA_TYPE_SYSPROP} as well as the number of cores on each - * node. - */ - private Map<Node, Integer> getCoreCountPerNode( - Set<Node> nodes, final AttributeValues attrValues) { - Map<Node, Integer> coresOnNodes = new HashMap<>(); - for (Node node : nodes) { - attrValues - .getNodeMetric(node, BuiltInMetrics.NODE_NUM_CORES) - .ifPresent(count -> coresOnNodes.put(node, count)); + @Override + public int calcRelevantWeightWithReplica(Replica replica) { + return coresOnNode + + 100 + * (prioritizedFreeDiskGB > 0 + && nodeFreeDiskGB - getProjectedSizeOfReplica(replica) + < prioritizedFreeDiskGB + ? 1 + : 0) + + 10000 * projectReplicaSpreadWeight(replica) + + 1000000 * projectAZWeight(replica); } - return coresOnNodes; - } - - /** - * Given the set of all nodes on which to do placement and fetched attributes, builds the sets - * representing candidate nodes for placement of replicas of each replica type. These sets are - * packaged and returned in an EnumMap keyed by replica type. Nodes for which the number of - * cores is not available for whatever reason are excluded from acceptable candidate nodes as it - * would not be possible to make any meaningful placement decisions. - * - * @param nodes all nodes on which this plugin should compute placement - * @param attrValues attributes fetched for the nodes. This method uses system property {@link - * AffinityPlacementConfig#REPLICA_TYPE_SYSPROP} as well as the number of cores on each - * node. - */ - private EnumMap<Replica.ReplicaType, Set<Node>> getAvailableNodesForReplicaTypes( - Set<Node> nodes, final AttributeValues attrValues, final ReplicaMetrics leaderMetrics) { - EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes = - new EnumMap<>(Replica.ReplicaType.class); - - for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) { - replicaTypeToNodes.put(replicaType, new HashSet<>()); + @Override + public boolean canAddReplica(Replica replica) { + String collection = replica.getShard().getCollection().getName(); + return + // By default, do not allow two replicas of the same shard on a node + super.canAddReplica(replica) + && supportedReplicaTypes.contains(replica.getType()) + && Optional.ofNullable(nodeTypes.get(collection)) + .map(s -> s.stream().anyMatch(nodeType::contains)) + .orElse(true) + && Optional.ofNullable(withCollections.get(collection)) + .map(this::hasCollectionOnNode) + .orElse(true) + && (minimalFreeDiskGB <= 0 + || nodeFreeDiskGB - getProjectedSizeOfReplica(replica) > minimalFreeDiskGB); } - for (Node node : nodes) { - // Exclude nodes with unknown or too small disk free space - Optional<Double> nodeFreeDiskGB = - attrValues.getNodeMetric(node, BuiltInMetrics.NODE_FREE_DISK_GB); - if (nodeFreeDiskGB.isEmpty()) { - if (log.isWarnEnabled()) { - log.warn( - "Unknown free disk on node {}, excluding it from placement decisions.", - node.getName()); - } - // We rely later on the fact that the free disk optional is present (see - // CoresAndDiskComparator), be careful it you change anything here. - continue; - } - double replicaIndexSize = - Optional.ofNullable(leaderMetrics) - .flatMap(lm -> lm.getReplicaMetric(BuiltInMetrics.REPLICA_INDEX_SIZE_GB)) - .orElse(0D); - double projectedFreeDiskIfPlaced = - BuiltInMetrics.NODE_FREE_DISK_GB.decrease(nodeFreeDiskGB.get(), replicaIndexSize); - if (projectedFreeDiskIfPlaced < minimalFreeDiskGB) { - if (log.isWarnEnabled()) { - log.warn( - "Node {} free disk ({}GB) minus the projected replica size ({}GB) is lower than configured" - + " minimum {}GB, excluding it from placement decisions.", - node.getName(), - nodeFreeDiskGB.get(), - replicaIndexSize, - minimalFreeDiskGB); - } - continue; - } - - if (attrValues.getNodeMetric(node, BuiltInMetrics.NODE_NUM_CORES).isEmpty()) { - if (log.isWarnEnabled()) { - log.warn( - "Unknown number of cores on node {}, excluding it from placement decisions.", - node.getName()); + @Override + public Map<Replica, String> canRemoveReplicas(Collection<Replica> replicas) { + Map<Replica, String> replicaRemovalExceptions = new HashMap<>(); + Map<String, Map<String, Set<Replica>>> removals = new HashMap<>(); + for (Replica replica : replicas) { + SolrCollection collection = replica.getShard().getCollection(); + Set<String> collocatedCollections = new HashSet<>(); + Optional.ofNullable(collocatedWith.get(collection.getName())) + .ifPresent(collocatedCollections::addAll); + collocatedCollections.retainAll(getCollectionsOnNode()); + if (collocatedCollections.isEmpty()) { + continue; } - // We rely later on the fact that the number of cores optional is present (see - // CoresAndDiskComparator), be careful it you change anything here. - continue; - } - String supportedReplicaTypes = - attrValues - .getSystemProperty(node, AffinityPlacementConfig.REPLICA_TYPE_SYSPROP) - .isPresent() - ? attrValues - .getSystemProperty(node, AffinityPlacementConfig.REPLICA_TYPE_SYSPROP) - .get() - : null; - // If property not defined or is only whitespace on a node, assuming node can take any - // replica type - if (supportedReplicaTypes == null || supportedReplicaTypes.isBlank()) { - for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { - replicaTypeToNodes.get(rt).add(node); - } - } else { - Set<String> acceptedTypes = - Arrays.stream(supportedReplicaTypes.split(",")) - .map(String::trim) - .map(s -> s.toLowerCase(Locale.ROOT)) - .collect(Collectors.toSet()); - for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { - if (acceptedTypes.contains(rt.name().toLowerCase(Locale.ROOT))) { - replicaTypeToNodes.get(rt).add(node); - } + // There are collocatedCollections for this shard, so make sure there is a replica of this + // shard left on the node after it is removed + Set<Replica> replicasRemovedForShard = + removals + .computeIfAbsent( + replica.getShard().getCollection().getName(), k -> new HashMap<>()) + .computeIfAbsent(replica.getShard().getShardName(), k -> new HashSet<>()); + replicasRemovedForShard.add(replica); + + if (replicasRemovedForShard.size() + >= getReplicasForShardOnNode(replica.getShard()).size()) { + replicaRemovalExceptions.put( + replica, "co-located with replicas of " + collocatedCollections); } } + return replicaRemovalExceptions; } - return replicaTypeToNodes; - } - /** - * Picks nodes from {@code targetNodes} for placing {@code numReplicas} replicas. - * - * <p>The criteria used in this method are, in this order: - * - * <ol> - * <li>No more than one replica of a given shard on a given node (strictly enforced) - * <li>Balance as much as possible replicas of a given {@link - * org.apache.solr.cluster.Replica.ReplicaType} over available AZ's. This balancing takes - * into account existing replicas <b>of the corresponding replica type</b>, if any. - * <li>Place replicas if possible on nodes having more than a certain amount of free disk - * space (note that nodes with a too small amount of free disk space were eliminated as - * placement targets earlier, in {@link #getAvailableNodesForReplicaTypes(Set, - * AttributeValues, ReplicaMetrics)}). There's a threshold here rather than sorting on the - * amount of free disk space, because sorting on that value would in practice lead to - * never considering the number of cores on a node. - * <li>Place replicas on nodes having a smaller number of cores (the number of cores - * considered for this decision includes previous placement decisions made during the - * processing of the placement request) - * </ol> - */ - @SuppressForbidden( - reason = - "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.") - private void makePlacementDecisions( - SolrCollection solrCollection, - String shardName, - Set<String> availabilityZones, - Replica.ReplicaType replicaType, - int numReplicas, - final AttributeValues attrValues, - final ReplicaMetrics leaderMetrics, - EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes, - Set<Node> nodesWithReplicas, - Map<Node, Integer> coresOnNodes, - PlacementPlanFactory placementPlanFactory, - Set<ReplicaPlacement> replicaPlacements, - boolean doSpreadAcrossDomains) - throws PlacementException { - // Count existing replicas per AZ. We count only instances of the type of replica for which we - // need to do placement. If we ever want to balance replicas of any type across AZ's (and not - // each replica type balanced independently), we'd have to move this data structure to the - // caller of this method so it can be reused across different replica type placements for a - // given shard. Note then that this change would be risky. For example all NRT's and PULL - // replicas for a shard my be correctly balanced over three AZ's, but then all NRT can end up - // in the same AZ... - Map<String, Integer> azToNumReplicas = new HashMap<>(); - for (String az : availabilityZones) { - azToNumReplicas.put(az, 0); + @Override + protected boolean addProjectedReplicaWeights(Replica replica) { + nodeFreeDiskGB -= getProjectedSizeOfReplica(replica); + coresOnNode += 1; + return addReplicaToAzAndSpread(replica); } - // Build the set of candidate nodes for the placement, i.e. nodes that can accept the replica - // type - Set<Node> candidateNodes = new HashSet<>(replicaTypeToNodes.get(replicaType)); - // Remove nodes that already have a replica for the shard (no two replicas of same shard can - // be put on same node) - candidateNodes.removeAll(nodesWithReplicas); + @Override + protected void initReplicaWeights(Replica replica) { + addReplicaToAzAndSpread(replica); + } - // This Map will include the affinity labels for the nodes that are currently hosting replicas - // of this shard. It will be modified with new placement decisions. - Map<String, Integer> spreadDomainsInUse = new HashMap<>(); - Shard shard = solrCollection.getShard(shardName); - if (shard != null) { - // shard is non null if we're adding replicas to an already existing collection. - // If we're creating the collection, the shards do not exist yet. - for (Replica replica : shard.replicas()) { - // The node's AZ is counted as having a replica if it has a replica of the same type as - // the one we need to place here. - if (replica.getType() == replicaType) { - final String az = getNodeAZ(replica.getNode(), attrValues); - if (azToNumReplicas.containsKey(az)) { - // We do not count replicas on AZ's for which we don't have any node to place on - // because it would not help the placement decision. If we did want to do that, note - // the dereferencing below can't be assumed as the entry will not exist in the map. - azToNumReplicas.put(az, azToNumReplicas.get(az) + 1); - } - if (doSpreadAcrossDomains) { - attrValues - .getSystemProperty( - replica.getNode(), AffinityPlacementConfig.SPREAD_DOMAIN_SYSPROP) - .ifPresent(nodeDomain -> spreadDomainsInUse.merge(nodeDomain, 1, Integer::sum)); - } - } + private boolean addReplicaToAzAndSpread(Replica replica) { + boolean needsResort = false; + needsResort |= Review Comment: Instead, I only calculated the azSpread if there are more than one availabilityZone. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org For additional commands, e-mail: issues-h...@solr.apache.org