This is an automated email from the ASF dual-hosted git repository.

broustant pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new 89d911119dd SOLR-17054: Remove unused and duplicate code in 
DistributedZkUpdateProcessor (#2038)
89d911119dd is described below

commit 89d911119dd0bd7de47644031257047cb505e482
Author: Vincent P <[email protected]>
AuthorDate: Thu Oct 26 10:58:35 2023 +0200

    SOLR-17054: Remove unused and duplicate code in 
DistributedZkUpdateProcessor (#2038)
    
    Co-authored-by: Vincent Primault <[email protected]>
---
 .../processor/DistributedZkUpdateProcessor.java    | 98 ++++------------------
 1 file changed, 17 insertions(+), 81 deletions(-)

diff --git 
a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
 
b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index a1e491c82bb..35d96a7c14a 100644
--- 
a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ 
b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -179,15 +179,12 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
     }
     isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName());
 
-    nodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG, 
Replica.Type.NRT), true);
+    nodes = getCollectionUrls(collection);
     if (nodes == null) {
       // This could happen if there are only pull replicas
       throw new SolrException(
           SolrException.ErrorCode.SERVER_ERROR,
-          "Unable to distribute commit operation. No replicas available of 
types "
-              + Replica.Type.TLOG
-              + " or "
-              + Replica.Type.NRT);
+          "Unable to distribute commit operation. No leader replicas 
available.");
     }
 
     nodes.removeIf(
@@ -212,13 +209,11 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
         useNodes = nodes;
         params.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
         params.set(COMMIT_END_POINT, "leaders");
-        if (useNodes != null) {
-          params.set(
-              DISTRIB_FROM,
-              ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), 
req.getCore().getName()));
-          cmdDistrib.distribCommit(cmd, useNodes, params);
-          issuedDistribCommit = true;
-        }
+        params.set(
+            DISTRIB_FROM,
+            ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), 
req.getCore().getName()));
+        cmdDistrib.distribCommit(cmd, useNodes, params);
+        issuedDistribCommit = true;
       }
 
       if (isLeader) {
@@ -232,7 +227,7 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
 
         params.set(COMMIT_END_POINT, "replicas");
 
-        useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), 
leaderReplica);
+        useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), 
leaderReplica, 0);
 
         if (useNodes != null) {
           params.set(
@@ -786,52 +781,7 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
         // that means I want to forward onto my replicas...
         // so get the replicas...
         forwardToLeader = false;
-        String leaderCoreNodeName = leaderReplica.getName();
-        List<Replica> replicas =
-            clusterState
-                .getCollection(collection)
-                .getSlice(shardId)
-                .getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
-        replicas.removeIf((replica) -> 
replica.getName().equals(leaderCoreNodeName));
-        if (replicas.isEmpty()) {
-          return null;
-        }
-
-        // check for test param that lets us miss replicas
-        String[] skipList = 
req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS);
-        Set<String> skipListSet = null;
-        if (skipList != null) {
-          skipListSet = CollectionUtil.newHashSet(skipList.length);
-          skipListSet.addAll(Arrays.asList(skipList));
-          log.info("test.distrib.skip.servers was found and contains:{}", 
skipListSet);
-        }
-
-        List<SolrCmdDistributor.Node> nodes = new ArrayList<>(replicas.size());
-        skippedCoreNodeNames = new HashSet<>();
-        ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, 
shardId);
-        for (Replica replica : replicas) {
-          String coreNodeName = replica.getName();
-          if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
-            if (log.isInfoEnabled()) {
-              log.info("check url:{} against:{} result:true", 
replica.getCoreUrl(), skipListSet);
-            }
-          } else if (zkShardTerms.registered(coreNodeName)
-              && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) {
-            if (log.isDebugEnabled()) {
-              log.debug("skip url:{} cause its term is less than leader", 
replica.getCoreUrl());
-            }
-            skippedCoreNodeNames.add(replica.getName());
-          } else if 
(!clusterState.getLiveNodes().contains(replica.getNodeName())
-              || replica.getState() == Replica.State.DOWN) {
-            skippedCoreNodeNames.add(replica.getName());
-          } else {
-            nodes.add(
-                new SolrCmdDistributor.StdNode(
-                    new ZkCoreNodeProps(replica), collection, shardId, 
maxRetriesToFollowers));
-          }
-        }
-        return nodes;
-
+        return getReplicaNodesForLeader(shardId, leaderReplica, 
maxRetriesToFollowers);
       } else {
         // I need to forward on to the leader...
         forwardToLeader = true;
@@ -883,8 +833,7 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
     }
   }
 
-  private List<SolrCmdDistributor.Node> getCollectionUrls(
-      String collection, EnumSet<Replica.Type> types, boolean onlyLeaders) {
+  private List<SolrCmdDistributor.Node> getCollectionUrls(String collection) {
     final DocCollection docCollection = 
clusterState.getCollectionOrNull(collection);
     if (docCollection == null || docCollection.getSlicesMap() == null) {
       throw new ZooKeeperException(
@@ -894,24 +843,10 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
     final List<SolrCmdDistributor.Node> urls = new ArrayList<>(slices.size());
     for (Map.Entry<String, Slice> sliceEntry : slices.entrySet()) {
       Slice replicas = slices.get(sliceEntry.getKey());
-      if (onlyLeaders) {
-        Replica replica = docCollection.getLeader(replicas.getName());
-        if (replica != null) {
-          ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(replica);
-          urls.add(new SolrCmdDistributor.StdNode(nodeProps, collection, 
replicas.getName()));
-        }
-        continue;
-      }
-      Map<String, Replica> shardMap = replicas.getReplicasMap();
-
-      for (Map.Entry<String, Replica> entry : shardMap.entrySet()) {
-        if (!types.contains(entry.getValue().getType())) {
-          continue;
-        }
-        ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
-        if (clusterState.liveNodesContain(nodeProps.getNodeName())) {
-          urls.add(new SolrCmdDistributor.StdNode(nodeProps, collection, 
replicas.getName()));
-        }
+      Replica replica = docCollection.getLeader(replicas.getName());
+      if (replica != null) {
+        ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(replica);
+        urls.add(new SolrCmdDistributor.StdNode(nodeProps, collection, 
replicas.getName()));
       }
     }
     if (urls.isEmpty()) {
@@ -959,7 +894,7 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
   }
 
   protected List<SolrCmdDistributor.Node> getReplicaNodesForLeader(
-      String shardId, Replica leaderReplica) {
+      String shardId, Replica leaderReplica, int maxRetries) {
     String leaderCoreNodeName = leaderReplica.getName();
     List<Replica> replicas =
         clusterState
@@ -1000,7 +935,8 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
         skippedCoreNodeNames.add(replica.getName());
       } else {
         nodes.add(
-            new SolrCmdDistributor.StdNode(new ZkCoreNodeProps(replica), 
collection, shardId));
+            new SolrCmdDistributor.StdNode(
+                new ZkCoreNodeProps(replica), collection, shardId, 
maxRetries));
       }
     }
     return nodes;

Reply via email to