sonatype-lift[bot] commented on code in PR #1650:
URL: https://github.com/apache/solr/pull/1650#discussion_r1222129466


##########
solr/core/src/java/org/apache/solr/cluster/placement/plugins/OrderedNodePlacementPlugin.java:
##########
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.placement.plugins;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+import java.util.stream.Collectors;
+import org.apache.solr.cluster.Node;
+import org.apache.solr.cluster.Replica;
+import org.apache.solr.cluster.Shard;
+import org.apache.solr.cluster.SolrCollection;
+import org.apache.solr.cluster.placement.BalancePlan;
+import org.apache.solr.cluster.placement.BalanceRequest;
+import org.apache.solr.cluster.placement.DeleteCollectionRequest;
+import org.apache.solr.cluster.placement.DeleteReplicasRequest;
+import org.apache.solr.cluster.placement.DeleteShardsRequest;
+import org.apache.solr.cluster.placement.ModificationRequest;
+import org.apache.solr.cluster.placement.PlacementContext;
+import org.apache.solr.cluster.placement.PlacementException;
+import org.apache.solr.cluster.placement.PlacementModificationException;
+import org.apache.solr.cluster.placement.PlacementPlan;
+import org.apache.solr.cluster.placement.PlacementPlugin;
+import org.apache.solr.cluster.placement.PlacementRequest;
+import org.apache.solr.cluster.placement.ReplicaPlacement;
+import org.apache.solr.common.util.CollectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class OrderedNodePlacementPlugin implements PlacementPlugin {
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @Override
+  public List<PlacementPlan> computePlacements(
+      Collection<PlacementRequest> requests, PlacementContext placementContext)
+      throws PlacementException {
+    List<PlacementPlan> placementPlans = new ArrayList<>(requests.size());
+    Set<Node> allNodes = new HashSet<>();
+    Set<SolrCollection> allCollections = new HashSet<>();
+    for (PlacementRequest request : requests) {
+      allNodes.addAll(request.getTargetNodes());
+      allCollections.add(request.getCollection());
+    }
+    Collection<WeightedNode> weightedNodes =
+        getWeightedNodes(placementContext, allNodes, allCollections).values();
+    for (PlacementRequest request : requests) {
+      int totalReplicasPerShard = 0;
+      for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
+        totalReplicasPerShard += request.getCountReplicasToCreate(rt);
+      }
+
+      List<WeightedNode> nodesForRequest =
+          weightedNodes.stream()
+              .filter(wn -> request.getTargetNodes().contains(wn.getNode()))
+              .collect(Collectors.toList());
+
+      Set<ReplicaPlacement> replicaPlacements =
+          CollectionUtil.newHashSet(totalReplicasPerShard * 
request.getShardNames().size());
+
+      SolrCollection solrCollection = request.getCollection();
+      // Now place randomly all replicas of all shards on available nodes
+      for (String shardName : request.getShardNames()) {
+        log.info("Collection: {}, shard: {}", solrCollection.getName(), 
shardName);
+
+        for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
+          log.info("ReplicaType: {}", replicaType);
+          int replicaCount = request.getCountReplicasToCreate(replicaType);
+          if (replicaCount == 0) {
+            continue;
+          }
+          Replica pr =
+              PlacementPlugin.createProjectedReplica(solrCollection, 
shardName, replicaType, null);
+          PriorityQueue<WeightedNode> nodesForReplicaType = new 
PriorityQueue<>();
+          nodesForRequest.stream()
+              .filter(n -> n.canAddReplica(pr))
+              .forEach(
+                  n -> {
+                    n.sortByRelevantWeightWithReplica(pr);
+                    n.addToSortedCollection(nodesForReplicaType);
+                  });
+
+          if (nodesForReplicaType.size() < replicaCount) {
+            throw new PlacementException(
+                "Not enough eligible nodes to place "
+                    + replicaCount
+                    + " replica(s) of type "
+                    + replicaType
+                    + " for shard "
+                    + shardName
+                    + " of collection "
+                    + solrCollection.getName());
+          }
+
+          int nodesChosen = 0;
+          while (nodesChosen < replicaCount) {
+            if (nodesForReplicaType.isEmpty()) {
+              throw new PlacementException(
+                  "There are not enough nodes to handle request to place 
replica");
+            }
+            WeightedNode node = nodesForReplicaType.poll();
+            while (node.hasWeightChangedSinceSort()) {
+              log.info("Out of date Node: {}", node.getNode());
+              node.addToSortedCollection(nodesForReplicaType);
+              node = nodesForReplicaType.poll();
+            }
+            log.info("Node: {}", node.getNode());
+
+            boolean needsToResort =
+                node.addReplica(
+                    PlacementPlugin.createProjectedReplica(
+                        solrCollection, shardName, replicaType, 
node.getNode()));
+            nodesChosen += 1;
+            replicaPlacements.add(
+                placementContext
+                    .getPlacementPlanFactory()
+                    .createReplicaPlacement(
+                        solrCollection, shardName, node.getNode(), 
replicaType));
+            if (needsToResort) {
+              List<WeightedNode> nodeList = new 
ArrayList<>(nodesForReplicaType);
+              nodesForReplicaType.clear();
+              nodeList.forEach(n -> 
n.addToSortedCollection(nodesForReplicaType));
+            }
+          }
+        }
+      }
+
+      placementPlans.add(
+          placementContext
+              .getPlacementPlanFactory()
+              .createPlacementPlan(request, replicaPlacements));
+    }
+    return placementPlans;
+  }
+
+  @Override
+  public BalancePlan computeBalancing(
+      BalanceRequest balanceRequest, PlacementContext placementContext) throws 
PlacementException {
+    Map<Replica, Node> replicaMovements = new HashMap<>();
+    TreeSet<WeightedNode> orderedNodes = new TreeSet<>();
+    Collection<WeightedNode> weightedNodes =
+        getWeightedNodes(
+                placementContext,
+                balanceRequest.getNodes(),
+                placementContext.getCluster().collections())
+            .values();
+    // This is critical to store the last sort weight for this node
+    weightedNodes.forEach(node -> node.addToSortedCollection(orderedNodes));
+
+    // While the node with the least cores still has room to take a replica 
from the node with the
+    // most cores, loop
+    Map<Replica, Node> newReplicaMovements = new HashMap<>();
+    ArrayList<WeightedNode> traversedHighNodes = new 
ArrayList<>(orderedNodes.size() - 1);
+    while (orderedNodes.size() > 1
+        && orderedNodes.first().calcWeight() < 
orderedNodes.last().calcWeight()) {
+      WeightedNode lowestWeight = orderedNodes.pollFirst();
+      if (lowestWeight == null) {
+        break;
+      }
+      if (lowestWeight.hasWeightChangedSinceSort()) {
+        // Re-sort this node and go back to find the lowest
+        lowestWeight.addToSortedCollection(orderedNodes);
+        continue;
+      }
+      log.debug(
+          "Lowest node: {}, weight: {}",
+          lowestWeight.getNode().getName(),
+          lowestWeight.calcWeight());
+
+      newReplicaMovements.clear();
+      // If a compatible node was found to move replicas, break and find the 
lowest weighted node
+      // again
+      while (newReplicaMovements.isEmpty()
+          && !orderedNodes.isEmpty()
+          && orderedNodes.last().calcWeight() > lowestWeight.calcWeight() + 1) 
{
+        WeightedNode highestWeight = orderedNodes.pollLast();
+        if (highestWeight == null) {
+          break;
+        }
+        if (highestWeight.hasWeightChangedSinceSort()) {
+          // Re-sort this node and go back to find the lowest
+          highestWeight.addToSortedCollection(orderedNodes);
+          continue;
+        }
+        log.debug(
+            "Highest node: {}, weight: {}",
+            highestWeight.getNode().getName(),
+            highestWeight.calcWeight());
+
+        traversedHighNodes.add(highestWeight);
+        // select a replica from the node with the most cores to move to the 
node with the least
+        // cores
+        Set<Replica> availableReplicasToMove = 
highestWeight.getAllReplicasOnNode();
+        int combinedNodeWeights = highestWeight.calcWeight() + 
lowestWeight.calcWeight();
+        for (Replica r : availableReplicasToMove) {
+          // Only continue if the replica can be removed from the old node and 
moved to the new node
+          if (highestWeight.canRemoveReplicas(Set.of(r)).isEmpty()
+              || !lowestWeight.canAddReplica(r)) {
+            continue;
+          }
+          lowestWeight.addReplica(r);
+          highestWeight.removeReplica(r);
+          int lowestWeightWithReplica = lowestWeight.calcWeight();
+          int highestWeightWithoutReplica = highestWeight.calcWeight();
+          log.debug(
+              "Replica: {}, lowestWith: {} ({}), highestWithout: {} ({})",
+              r.getReplicaName(),
+              lowestWeightWithReplica,
+              lowestWeight.canAddReplica(r),
+              highestWeightWithoutReplica,
+              highestWeight.canRemoveReplicas(Set.of(r)));
+
+          // If the combined weight of both nodes is lower after the move, 
make the move.
+          // Otherwise, make the move if it doesn't cause the weight of the 
higher node to
+          // go below the weight of the lower node, because that is 
over-correction.
+          if (highestWeightWithoutReplica + lowestWeightWithReplica >= 
combinedNodeWeights
+              && highestWeightWithoutReplica < lowestWeightWithReplica) {
+            // Undo the move
+            lowestWeight.removeReplica(r);
+            highestWeight.addReplica(r);
+            continue;
+          }
+          log.debug("Replica Movement Chosen!");
+          newReplicaMovements.put(r, lowestWeight.getNode());
+
+          // Do not go beyond here, do another loop and see if other nodes can 
move replicas.
+          // It might end up being the same nodes in the next loop that end up 
moving another
+          // replica, but that's ok.
+          break;
+        }
+      }
+      orderedNodes.addAll(traversedHighNodes);
+      traversedHighNodes.clear();
+      if (newReplicaMovements.size() > 0) {
+        replicaMovements.putAll(newReplicaMovements);
+        // There are no replicas to move to the lowestWeight, remove it from 
our loop
+        orderedNodes.add(lowestWeight);
+      }
+    }
+
+    return placementContext
+        .getBalancePlanFactory()
+        .createBalancePlan(balanceRequest, replicaMovements);
+  }
+
+  protected Map<Node, WeightedNode> getWeightedNodes(
+      PlacementContext placementContext,
+      Set<Node> nodes,
+      Iterable<SolrCollection> relevantCollections)
+      throws PlacementException {
+    Map<Node, WeightedNode> weightedNodes =
+        getBaseWeightedNodes(placementContext, nodes, relevantCollections);
+
+    for (SolrCollection collection : 
placementContext.getCluster().collections()) {
+      for (Shard shard : collection.shards()) {
+        for (Replica replica : shard.replicas()) {
+          WeightedNode weightedNode = weightedNodes.get(replica.getNode());
+          if (weightedNode != null) {
+            weightedNode.initReplica(replica);
+          }
+        }
+      }
+    }
+
+    return weightedNodes;
+  }
+
+  protected abstract Map<Node, WeightedNode> getBaseWeightedNodes(
+      PlacementContext placementContext,
+      Set<Node> nodes,
+      Iterable<SolrCollection> relevantCollections)
+      throws PlacementException;
+
+  @Override
+  public void verifyAllowedModification(
+      ModificationRequest modificationRequest, PlacementContext 
placementContext)
+      throws PlacementException {
+    if (modificationRequest instanceof DeleteShardsRequest) {
+      log.warn("DeleteShardsRequest not implemented yet, skipping: {}", 
modificationRequest);
+    } else if (modificationRequest instanceof DeleteCollectionRequest) {
+      verifyDeleteCollection((DeleteCollectionRequest) modificationRequest, 
placementContext);
+    } else if (modificationRequest instanceof DeleteReplicasRequest) {
+      verifyDeleteReplicas((DeleteReplicasRequest) modificationRequest, 
placementContext);
+    } else {
+      log.warn("unsupported request type, skipping: {}", modificationRequest);
+    }
+  }
+
+  protected void verifyDeleteCollection(
+      DeleteCollectionRequest deleteCollectionRequest, PlacementContext 
placementContext)
+      throws PlacementException {
+    // NO-OP
+  }
+
+  protected void verifyDeleteReplicas(
+      DeleteReplicasRequest deleteReplicasRequest, PlacementContext 
placementContext)
+      throws PlacementException {
+    Map<Node, List<Replica>> nodesRepresented =
+        deleteReplicasRequest.getReplicas().stream()
+            .collect(Collectors.groupingBy(Replica::getNode));
+
+    Map<Node, WeightedNode> weightedNodes =
+        getWeightedNodes(
+            placementContext,
+            nodesRepresented.keySet(),
+            placementContext.getCluster().collections());
+
+    PlacementModificationException placementModificationException =
+        new PlacementModificationException("delete replica(s) rejected");
+    for (Map.Entry<Node, List<Replica>> entry : nodesRepresented.entrySet()) {
+      WeightedNode node = weightedNodes.get(entry.getKey());
+      if (node == null) {
+        entry
+            .getValue()
+            .forEach(
+                replica ->
+                    placementModificationException.addRejectedModification(
+                        replica.toString(),
+                        "could not load information for node: " + 
entry.getKey().getName()));
+      }
+      node.canRemoveReplicas(entry.getValue())
+          .forEach(
+              (replica, reason) ->
+                  placementModificationException.addRejectedModification(
+                      replica.toString(), reason));
+    }
+    if (!placementModificationException.getRejectedModifications().isEmpty()) {

Review Comment:
   <picture><img alt="16% of developers fix this issue" 
src="https://lift.sonatype.com/api/commentimage/fixrate/16/display.svg";></picture>
   
   <b>*NULL_DEREFERENCE:</b>*  object returned by 
`getRejectedModifications(placementModificationException)` could be null and is 
dereferenced at line 352.
   
   ---
   
   <details><summary>ℹ️ Expand to see all <b>@sonatype-lift</b> 
commands</summary>
   
   You can reply with the following commands. For example, reply with 
***@sonatype-lift ignoreall*** to leave out all findings.
   | **Command** | **Usage** |
   | ------------- | ------------- |
   | `@sonatype-lift ignore` | Leave out the above finding from this PR |
   | `@sonatype-lift ignoreall` | Leave out all the existing findings from this 
PR |
   | `@sonatype-lift exclude <file\|issue\|path\|tool>` | Exclude specified 
`file\|issue\|path\|tool` from Lift findings by updating your config.toml file |
   
   **Note:** When talking to LiftBot, you need to **refresh** the page to see 
its response.
   <sub>[Click here](https://github.com/apps/sonatype-lift/installations/new) 
to add LiftBot to another repo.</sub></details>
   
   



##########
solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java:
##########
@@ -53,40 +40,33 @@ public void call(ClusterState state, ZkNodeProps message, 
NamedList<Object> resu
       throws Exception {
     CollectionHandlingUtils.checkRequired(message, "node");
     String node = message.getStr("node");
-    List<ZkNodeProps> sourceReplicas = ReplaceNodeCmd.getReplicasOfNode(node, 
state);
+    List<Replica> sourceReplicas = ReplaceNodeCmd.getReplicasOfNode(node, 
state);
     List<String> singleReplicas = verifyReplicaAvailability(sourceReplicas, 
state);
     if (!singleReplicas.isEmpty()) {
       results.add(
           "failure",
           "Can't delete the only existing non-PULL replica(s) on node "
               + node
               + ": "
-              + singleReplicas.toString());
+              + singleReplicas);
     } else {
-      cleanupReplicas(results, state, sourceReplicas, ccc, node, 
message.getStr(ASYNC));
+      ReplicaMigrationUtils.cleanupReplicas(
+          results, state, sourceReplicas, ccc, message.getStr(ASYNC));
     }
   }
 
   // collect names of replicas that cannot be deleted
-  static List<String> verifyReplicaAvailability(
-      List<ZkNodeProps> sourceReplicas, ClusterState state) {
+  static List<String> verifyReplicaAvailability(List<Replica> sourceReplicas, 
ClusterState state) {
     List<String> res = new ArrayList<>();
-    for (ZkNodeProps sourceReplica : sourceReplicas) {
-      String coll = sourceReplica.getStr(COLLECTION_PROP);
-      String shard = sourceReplica.getStr(SHARD_ID_PROP);
-      String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
+    for (Replica sourceReplica : sourceReplicas) {
+      String coll = sourceReplica.getCollection();
+      String shard = sourceReplica.getShard();
+      String replicaName = sourceReplica.getName();
       DocCollection collection = state.getCollection(coll);
       Slice slice = collection.getSlice(shard);
       if (slice.getReplicas().size() < 2) {

Review Comment:
   <picture><img alt="7% of developers fix this issue" 
src="https://lift.sonatype.com/api/commentimage/fixrate/7/display.svg";></picture>
   
   <b>*NULLPTR_DEREFERENCE:</b>*  `slice` could be null (from the call to 
`DocCollection.getSlice(...)` on line 66) and is dereferenced.
   
   ---
   
   <details><summary>ℹ️ Expand to see all <b>@sonatype-lift</b> 
commands</summary>
   
   You can reply with the following commands. For example, reply with 
***@sonatype-lift ignoreall*** to leave out all findings.
   | **Command** | **Usage** |
   | ------------- | ------------- |
   | `@sonatype-lift ignore` | Leave out the above finding from this PR |
   | `@sonatype-lift ignoreall` | Leave out all the existing findings from this 
PR |
   | `@sonatype-lift exclude <file\|issue\|path\|tool>` | Exclude specified 
`file\|issue\|path\|tool` from Lift findings by updating your config.toml file |
   
   **Note:** When talking to LiftBot, you need to **refresh** the page to see 
its response.
   <sub>[Click here](https://github.com/apps/sonatype-lift/installations/new) 
to add LiftBot to another repo.</sub></details>
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org
For additional commands, e-mail: issues-h...@solr.apache.org

Reply via email to