This is an automated email from the ASF dual-hosted git repository.

psalagnac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new b2f8b109dd7 SOLR-17578: Remove ZkController internal core supplier. 
(#2891)
b2f8b109dd7 is described below

commit b2f8b109dd7819708eb41066c679e4c9e425e057
Author: Pierre Salagnac <psalag...@apache.org>
AuthorDate: Wed Dec 11 11:30:28 2024 +0100

    SOLR-17578: Remove ZkController internal core supplier. (#2891)
---
 solr/CHANGES.txt                                   |   2 +-
 .../java/org/apache/solr/cloud/ZkController.java   | 198 +++++++++------------
 .../src/java/org/apache/solr/core/ZkContainer.java |  12 +-
 .../solr/cloud/LeaderElectionIntegrationTest.java  |   3 +-
 .../apache/solr/cloud/MockSimpleZkController.java  |  11 +-
 .../solr/cloud/TestLeaderElectionZkExpiry.java     |   4 +-
 .../org/apache/solr/cloud/ZkControllerTest.java    |  11 +-
 7 files changed, 98 insertions(+), 143 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 7e057e943c4..f8a7ae0de27 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -152,7 +152,7 @@ Improvements
 
 Optimizations
 ---------------------
-(No changes)
+* SOLR-17578: Remove ZkController internal core supplier, for slightly faster 
reconnection after Zookeeper session loss. (Pierre Salagnac)
 
 Bug Fixes
 ---------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java 
b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 76f9618d4b1..7dd30a14d24 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -50,7 +50,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.solr.client.solrj.SolrClient;
@@ -298,16 +297,12 @@ public class ZkController implements Closeable {
    * @param zkClientConnectTimeout timeout in ms
    * @param cloudConfig configuration for this controller. TODO: possibly 
redundant with
    *     CoreContainer
-   * @param descriptorsSupplier a supplier of the current core descriptors. 
used to know which cores
-   *     to re-register on reconnect
    */
-  @SuppressWarnings({"unchecked"})
   public ZkController(
       final CoreContainer cc,
       String zkServerAddress,
       int zkClientConnectTimeout,
-      CloudConfig cloudConfig,
-      final Supplier<List<CoreDescriptor>> descriptorsSupplier)
+      CloudConfig cloudConfig)
       throws InterruptedException, TimeoutException, IOException {
 
     if (cc == null) throw new IllegalArgumentException("CoreContainer cannot 
be null.");
@@ -366,9 +361,8 @@ public class ZkController implements Closeable {
             .withUrl(zkServerAddress)
             .withTimeout(clientTimeout, TimeUnit.MILLISECONDS)
             .withConnTimeOut(zkClientConnectTimeout, TimeUnit.MILLISECONDS)
-            .withReconnectListener(() -> onReconnect(descriptorsSupplier))
-            .withDisconnectListener(
-                (sessionExpired) -> onDisconnect(descriptorsSupplier, 
sessionExpired))
+            .withReconnectListener(this::onReconnect)
+            .withDisconnectListener((sessionExpired) -> 
onDisconnect(sessionExpired))
             .withAclProvider(zkACLProvider)
             .withClosedCheck(cc::isShutDown)
             .withCompressor(compressor)
@@ -404,18 +398,27 @@ public class ZkController implements Closeable {
     assert ObjectReleaseTracker.track(this);
   }
 
-  private void onDisconnect(
-      Supplier<List<CoreDescriptor>> descriptorsSupplier, boolean 
sessionExpired) {
+  private void onDisconnect(boolean sessionExpired) {
     try {
       overseer.close();
     } catch (Exception e) {
       log.error("Error trying to stop any Overseer threads", e);
     }
-    closeOutstandingElections(descriptorsSupplier, sessionExpired);
-    markAllAsNotLeader(descriptorsSupplier);
+
+    // Close outstanding leader elections
+    List<CoreDescriptor> descriptors = cc.getCoreDescriptors();
+    for (CoreDescriptor descriptor : descriptors) {
+      closeExistingElectionContext(descriptor, sessionExpired);
+    }
+
+    // Mark all cores as not leader
+    for (CoreDescriptor descriptor : descriptors) {
+      descriptor.getCloudDescriptor().setLeader(false);
+      descriptor.getCloudDescriptor().setHasRegistered(false);
+    }
   }
 
-  private void onReconnect(Supplier<List<CoreDescriptor>> descriptorsSupplier) 
{
+  private void onReconnect() {
     // on reconnect, reload cloud info
     log.info("ZooKeeper session re-connected ... refreshing core states after 
session expiration.");
     clearZkCollectionTerms();
@@ -456,7 +459,7 @@ public class ZkController implements Closeable {
       cc.cancelCoreRecoveries();
 
       try {
-        registerAllCoresAsDown(descriptorsSupplier, false);
+        registerAllCoresAsDown(false);
       } catch (SessionExpiredException e) {
         // zk has to reconnect and this will all be tried again
         throw e;
@@ -469,26 +472,24 @@ public class ZkController implements Closeable {
       // we have to register as live first to pick up docs in the buffer
       createEphemeralLiveNode();
 
-      List<CoreDescriptor> descriptors = descriptorsSupplier.get();
+      List<CoreDescriptor> descriptors = cc.getCoreDescriptors();
       // re register all descriptors
       ExecutorService executorService = (cc != null) ? 
cc.getCoreZkRegisterExecutorService() : null;
-      if (descriptors != null) {
-        for (CoreDescriptor descriptor : descriptors) {
-          // TODO: we need to think carefully about what happens when it was a 
leader
-          // that was expired - as well as what to do about leaders/overseers 
with
-          // connection loss
-          try {
-            // unload solr cores that have been 'failed over'
-            throwErrorIfReplicaReplaced(descriptor);
+      for (CoreDescriptor descriptor : descriptors) {
+        // TODO: we need to think carefully about what happens when it was a 
leader
+        // that was expired - as well as what to do about leaders/overseers 
with
+        // connection loss
+        try {
+          // unload solr cores that have been 'failed over'
+          throwErrorIfReplicaReplaced(descriptor);
 
-            if (executorService != null) {
-              executorService.submit(new RegisterCoreAsync(descriptor, true, 
true));
-            } else {
-              register(descriptor.getName(), descriptor, true, true, false);
-            }
-          } catch (Exception e) {
-            log.error("Error registering SolrCore", e);
+          if (executorService != null) {
+            executorService.submit(new RegisterCoreAsync(descriptor, true, 
true));
+          } else {
+            register(descriptor.getName(), descriptor, true, true, false);
           }
+        } catch (Exception e) {
+          log.error("Error registering SolrCore", e);
         }
       }
 
@@ -588,75 +589,72 @@ public class ZkController implements Closeable {
     return leaderConflictResolveWait;
   }
 
-  private void registerAllCoresAsDown(
-      final Supplier<List<CoreDescriptor>> registerOnReconnect, boolean 
updateLastPublished)
-      throws SessionExpiredException {
-    List<CoreDescriptor> descriptors = registerOnReconnect.get();
+  private void registerAllCoresAsDown(boolean updateLastPublished) throws 
SessionExpiredException {
+    List<CoreDescriptor> descriptors = cc.getCoreDescriptors();
     if (isClosed) return;
-    if (descriptors != null) {
-      // before registering as live, make sure everyone is in a
-      // down state
-      publishNodeAsDown(getNodeName());
-      for (CoreDescriptor descriptor : descriptors) {
-        // if it looks like we are going to be the leader, we don't
-        // want to wait for the following stuff
-        CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
-        String collection = cloudDesc.getCollectionName();
-        String slice = cloudDesc.getShardId();
-        try {
 
-          int children =
-              zkStateReader
-                  .getZkClient()
-                  .getChildren(
-                      ZkStateReader.COLLECTIONS_ZKNODE
-                          + "/"
-                          + collection
-                          + "/leader_elect/"
-                          + slice
-                          + "/election",
-                      null,
-                      true)
-                  .size();
-          if (children == 0) {
-            log.debug(
-                "looks like we are going to be the leader for collection {} 
shard {}",
-                collection,
-                slice);
-            continue;
-          }
+    // before registering as live, make sure everyone is in a
+    // down state
+    publishNodeAsDown(getNodeName());
+    for (CoreDescriptor descriptor : descriptors) {
+      // if it looks like we are going to be the leader, we don't
+      // want to wait for the following stuff
+      CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
+      String collection = cloudDesc.getCollectionName();
+      String slice = cloudDesc.getShardId();
+      try {
 
-        } catch (NoNodeException e) {
+        int children =
+            zkStateReader
+                .getZkClient()
+                .getChildren(
+                    ZkStateReader.COLLECTIONS_ZKNODE
+                        + "/"
+                        + collection
+                        + "/leader_elect/"
+                        + slice
+                        + "/election",
+                    null,
+                    true)
+                .size();
+        if (children == 0) {
           log.debug(
               "looks like we are going to be the leader for collection {} 
shard {}",
               collection,
               slice);
           continue;
-        } catch (InterruptedException e2) {
-          Thread.currentThread().interrupt();
-        } catch (SessionExpiredException e) {
-          // zk has to reconnect
-          throw e;
-        } catch (KeeperException e) {
-          log.warn("", e);
-          Thread.currentThread().interrupt();
         }
 
-        final String coreZkNodeName = 
descriptor.getCloudDescriptor().getCoreNodeName();
-        try {
-          log.debug(
-              "calling waitForLeaderToSeeDownState for coreZkNodeName={} 
collection={} shard={}",
-              coreZkNodeName,
-              collection,
-              slice);
-          waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
-        } catch (Exception e) {
-          log.warn(
-              "There was a problem while making a best effort to ensure the 
leader has seen us as down, this is not unexpected as Zookeeper has just 
reconnected after a session expiration",
-              e);
-          if (isClosed) {
-            return;
-          }
+      } catch (NoNodeException e) {
+        log.debug(
+            "looks like we are going to be the leader for collection {} shard 
{}",
+            collection,
+            slice);
+        continue;
+      } catch (InterruptedException e2) {
+        Thread.currentThread().interrupt();
+      } catch (SessionExpiredException e) {
+        // zk has to reconnect
+        throw e;
+      } catch (KeeperException e) {
+        log.warn("", e);
+        Thread.currentThread().interrupt();
+      }
+
+      final String coreZkNodeName = 
descriptor.getCloudDescriptor().getCoreNodeName();
+      try {
+        log.debug(
+            "calling waitForLeaderToSeeDownState for coreZkNodeName={} 
collection={} shard={}",
+            coreZkNodeName,
+            collection,
+            slice);
+        waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
+      } catch (Exception e) {
+        log.warn(
+            "There was a problem while making a best effort to ensure the 
leader has seen us as down, this is not unexpected as Zookeeper has just 
reconnected after a session expiration",
+            e);
+        if (isClosed) {
+          return;
         }
       }
     }
@@ -666,16 +664,6 @@ public class ZkController implements Closeable {
     return sysPropsCacher;
   }
 
-  private void closeOutstandingElections(
-      final Supplier<List<CoreDescriptor>> registerOnReconnect, boolean 
sessionExpired) {
-    List<CoreDescriptor> descriptors = registerOnReconnect.get();
-    if (descriptors != null) {
-      for (CoreDescriptor descriptor : descriptors) {
-        closeExistingElectionContext(descriptor, sessionExpired);
-      }
-    }
-  }
-
   private ContextKey closeExistingElectionContext(CoreDescriptor cd, boolean 
sessionExpired) {
     // look for old context - if we find it, cancel it
     String collection = cd.getCloudDescriptor().getCollectionName();
@@ -696,16 +684,6 @@ public class ZkController implements Closeable {
     return contextKey;
   }
 
-  private void markAllAsNotLeader(final Supplier<List<CoreDescriptor>> 
registerOnReconnect) {
-    List<CoreDescriptor> descriptors = registerOnReconnect.get();
-    if (descriptors != null) {
-      for (CoreDescriptor descriptor : descriptors) {
-        descriptor.getCloudDescriptor().setLeader(false);
-        descriptor.getCloudDescriptor().setHasRegistered(false);
-      }
-    }
-  }
-
   public void preClose() {
     this.isClosed = true;
 
diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java 
b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
index 6dd7b2d571d..3dbd1652993 100644
--- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
@@ -23,12 +23,9 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Paths;
-import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
 import org.apache.solr.client.solrj.impl.SolrZkClientTimeout;
 import org.apache.solr.cloud.SolrZkServer;
 import org.apache.solr.cloud.ZkController;
@@ -132,15 +129,8 @@ public class ZkContainer {
               "A chroot was specified in ZkHost but the znode doesn't exist. " 
+ zookeeperHost);
         }
 
-        Supplier<List<CoreDescriptor>> descriptorsSupplier =
-            () ->
-                cc.getCores().stream()
-                    .map(SolrCore::getCoreDescriptor)
-                    .collect(Collectors.toList());
-
         ZkController zkController =
-            new ZkController(
-                cc, zookeeperHost, zkClientConnectTimeout, config, 
descriptorsSupplier);
+            new ZkController(cc, zookeeperHost, zkClientConnectTimeout, 
config);
 
         if (zkRun != null) {
           if (StrUtils.isNotNullOrEmpty(System.getProperty(HTTPS_PORT_PROP))) {
diff --git 
a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java 
b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
index 5da2f862ea2..32f921747d9 100644
--- 
a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
+++ 
b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
@@ -73,10 +73,9 @@ public class LeaderElectionIntegrationTest extends 
SolrCloudTestCase {
           "shard1",
           jetty
               .getCoreContainer()
-              .getCores()
+              .getCoreDescriptors()
               .iterator()
               .next()
-              .getCoreDescriptor()
               .getCloudDescriptor()
               .getShardId());
       String jettyNodeName = jetty.getNodeName(); // must get before shutdown
diff --git 
a/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java 
b/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java
index a553e1064b0..2c678132a86 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java
@@ -17,23 +17,16 @@
 package org.apache.solr.cloud;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.concurrent.TimeoutException;
-import java.util.function.Supplier;
 import org.apache.solr.core.CloudConfig;
 import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.CoreDescriptor;
 
 public class MockSimpleZkController extends ZkController {
 
   public MockSimpleZkController(
-      CoreContainer cc,
-      String zkServerAddress,
-      int zkClientConnectTimeout,
-      CloudConfig cloudConfig,
-      Supplier<List<CoreDescriptor>> descriptorsSupplier)
+      CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, 
CloudConfig cloudConfig)
       throws InterruptedException, TimeoutException, IOException {
-    super(cc, zkServerAddress, zkClientConnectTimeout, cloudConfig, 
descriptorsSupplier);
+    super(cc, zkServerAddress, zkClientConnectTimeout, cloudConfig);
   }
 
   @Override
diff --git 
a/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java 
b/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java
index a871fc7176c..2c8520ddc22 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java
@@ -18,7 +18,6 @@ package org.apache.solr.cloud;
 
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Path;
-import java.util.Collections;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.lucene.tests.util.LuceneTestCase.BadApple;
@@ -63,8 +62,7 @@ public class TestLeaderElectionZkExpiry extends 
SolrTestCaseJ4 {
             ExecutorUtil.newMDCAwareSingleThreadExecutor(
                 new SolrNamedThreadFactory(this.getTestName()));
         try (ZkController zkController =
-            new ZkController(
-                cc, server.getZkAddress(), 15000, cloudConfig, 
Collections::emptyList)) {
+            new ZkController(cc, server.getZkAddress(), 15000, cloudConfig)) {
           threadExecutor.execute(
               () -> {
                 TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, 
TimeSource.NANO_TIME);
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java 
b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
index 6bda0dd6b53..c5be38b570c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
@@ -166,8 +166,7 @@ public class ZkControllerTest extends SolrCloudTestCase {
 
       try {
         CloudConfig cloudConfig = new 
CloudConfig.CloudConfigBuilder("127.0.0.1", 8983).build();
-        zkController =
-            new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, 
() -> null);
+        zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, 
cloudConfig);
       } catch (IllegalArgumentException e) {
         fail("ZkController did not normalize host name correctly");
       } finally {
@@ -233,8 +232,7 @@ public class ZkControllerTest extends SolrCloudTestCase {
                     
Boolean.getBoolean("solr.distributedCollectionConfigSetExecution"))
                 .build();
         zkController =
-            new ZkController(
-                cc, cluster.getZkServer().getZkAddress(), TIMEOUT, 
cloudConfig, () -> null);
+            new ZkController(cc, cluster.getZkServer().getZkAddress(), 
TIMEOUT, cloudConfig);
         zkControllerRef.set(zkController);
 
         zkController
@@ -399,7 +397,7 @@ public class ZkControllerTest extends SolrCloudTestCase {
         try {
           CloudConfig cloudConfig = new 
CloudConfig.CloudConfigBuilder("127.0.0.1", 8983).build();
           try (ZkController zkController =
-              new ZkController(cc, server.getZkAddress(), TIMEOUT, 
cloudConfig, () -> null)) {
+              new ZkController(cc, server.getZkAddress(), TIMEOUT, 
cloudConfig)) {
             final Path dir = createTempDir();
             final String configsetName = "testconfigset";
             try (ZkSolrResourceLoader loader =
@@ -478,8 +476,7 @@ public class ZkControllerTest extends SolrCloudTestCase {
                         cc,
                         server.getZkAddress(),
                         TIMEOUT,
-                        new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983 + 
index).build(),
-                        () -> null);
+                        new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983 + 
index).build());
               } catch (Exception e) {
                 exception.compareAndSet(null, e);
               } finally {

Reply via email to