This is an automated email from the ASF dual-hosted git repository.
psalagnac pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_9x by this push:
new 36ab9605e58 SOLR-17578: Remove ZkController internal core supplier.
(#2891)
36ab9605e58 is described below
commit 36ab9605e5869a98430b00fad225d6a59330fdd2
Author: Pierre Salagnac <[email protected]>
AuthorDate: Wed Dec 11 11:30:28 2024 +0100
SOLR-17578: Remove ZkController internal core supplier. (#2891)
(cherry picked from commit b2f8b109dd7819708eb41066c679e4c9e425e057)
---
solr/CHANGES.txt | 2 +-
.../java/org/apache/solr/cloud/ZkController.java | 196 +++++++++------------
.../src/java/org/apache/solr/core/ZkContainer.java | 12 +-
.../solr/cloud/LeaderElectionIntegrationTest.java | 3 +-
.../apache/solr/cloud/MockSimpleZkController.java | 11 +-
.../solr/cloud/TestLeaderElectionZkExpiry.java | 4 +-
.../org/apache/solr/cloud/ZkControllerTest.java | 11 +-
7 files changed, 98 insertions(+), 141 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 7b9a36db61c..e8c7b19f797 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -15,7 +15,7 @@ Improvements
Optimizations
---------------------
-(No changes)
+* SOLR-17578: Remove ZkController internal core supplier, for slightly faster
reconnection after Zookeeper session loss. (Pierre Salagnac)
Bug Fixes
---------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 07e7743e457..983826d974c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -54,7 +54,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
@@ -304,16 +303,12 @@ public class ZkController implements Closeable {
* @param zkClientConnectTimeout timeout in ms
* @param cloudConfig configuration for this controller. TODO: possibly
redundant with
* CoreContainer
- * @param descriptorsSupplier a supplier of the current core descriptors.
used to know which cores
- * to re-register on reconnect
*/
- @SuppressWarnings({"unchecked"})
public ZkController(
final CoreContainer cc,
String zkServerAddress,
int zkClientConnectTimeout,
- CloudConfig cloudConfig,
- final Supplier<List<CoreDescriptor>> descriptorsSupplier)
+ CloudConfig cloudConfig)
throws InterruptedException, TimeoutException, IOException {
if (cc == null) throw new IllegalArgumentException("CoreContainer cannot
be null.");
@@ -385,8 +380,8 @@ public class ZkController implements Closeable {
.withTimeout(clientTimeout, TimeUnit.MILLISECONDS)
.withConnTimeOut(zkClientConnectTimeout, TimeUnit.MILLISECONDS)
.withConnStrategy(strat)
- .withReconnectListener(() -> onReconnect(descriptorsSupplier))
- .withBeforeConnect(() -> beforeReconnect(descriptorsSupplier))
+ .withReconnectListener(this::onReconnect)
+ .withBeforeConnect(this::onDisconnect)
.withAclProvider(zkACLProvider)
.withClosedCheck(cc::isShutDown)
.withCompressor(compressor)
@@ -422,18 +417,27 @@ public class ZkController implements Closeable {
assert ObjectReleaseTracker.track(this);
}
- private void beforeReconnect(Supplier<List<CoreDescriptor>>
descriptorsSupplier) {
+ private void onDisconnect() {
try {
overseer.close();
} catch (Exception e) {
log.error("Error trying to stop any Overseer threads", e);
}
- closeOutstandingElections(descriptorsSupplier);
- markAllAsNotLeader(descriptorsSupplier);
+
+ // Close outstanding leader elections
+ List<CoreDescriptor> descriptors = cc.getCoreDescriptors();
+ for (CoreDescriptor descriptor : descriptors) {
+ closeExistingElectionContext(descriptor);
+ }
+
+ // Mark all cores as not leader
+ for (CoreDescriptor descriptor : descriptors) {
+ descriptor.getCloudDescriptor().setLeader(false);
+ descriptor.getCloudDescriptor().setHasRegistered(false);
+ }
}
- private void onReconnect(Supplier<List<CoreDescriptor>> descriptorsSupplier)
- throws SessionExpiredException {
+ private void onReconnect() throws SessionExpiredException {
// on reconnect, reload cloud info
log.info("ZooKeeper session re-connected ... refreshing core states after
session expiration.");
clearZkCollectionTerms();
@@ -472,7 +476,7 @@ public class ZkController implements Closeable {
cc.cancelCoreRecoveries();
try {
- registerAllCoresAsDown(descriptorsSupplier, false);
+ registerAllCoresAsDown(false);
} catch (SessionExpiredException e) {
// zk has to reconnect and this will all be tried again
throw e;
@@ -485,26 +489,24 @@ public class ZkController implements Closeable {
// we have to register as live first to pick up docs in the buffer
createEphemeralLiveNode();
- List<CoreDescriptor> descriptors = descriptorsSupplier.get();
+ List<CoreDescriptor> descriptors = cc.getCoreDescriptors();
// re register all descriptors
ExecutorService executorService = (cc != null) ?
cc.getCoreZkRegisterExecutorService() : null;
- if (descriptors != null) {
- for (CoreDescriptor descriptor : descriptors) {
- // TODO: we need to think carefully about what happens when it was a
leader
- // that was expired - as well as what to do about leaders/overseers
with
- // connection loss
- try {
- // unload solr cores that have been 'failed over'
- throwErrorIfReplicaReplaced(descriptor);
+ for (CoreDescriptor descriptor : descriptors) {
+ // TODO: we need to think carefully about what happens when it was a
leader
+ // that was expired - as well as what to do about leaders/overseers
with
+ // connection loss
+ try {
+ // unload solr cores that have been 'failed over'
+ throwErrorIfReplicaReplaced(descriptor);
- if (executorService != null) {
- executorService.submit(new RegisterCoreAsync(descriptor, true,
true));
- } else {
- register(descriptor.getName(), descriptor, true, true, false);
- }
- } catch (Exception e) {
- log.error("Error registering SolrCore", e);
+ if (executorService != null) {
+ executorService.submit(new RegisterCoreAsync(descriptor, true,
true));
+ } else {
+ register(descriptor.getName(), descriptor, true, true, false);
}
+ } catch (Exception e) {
+ log.error("Error registering SolrCore", e);
}
}
@@ -598,75 +600,72 @@ public class ZkController implements Closeable {
return leaderConflictResolveWait;
}
- private void registerAllCoresAsDown(
- final Supplier<List<CoreDescriptor>> registerOnReconnect, boolean
updateLastPublished)
- throws SessionExpiredException {
- List<CoreDescriptor> descriptors = registerOnReconnect.get();
+ private void registerAllCoresAsDown(boolean updateLastPublished) throws
SessionExpiredException {
+ List<CoreDescriptor> descriptors = cc.getCoreDescriptors();
if (isClosed) return;
- if (descriptors != null) {
- // before registering as live, make sure everyone is in a
- // down state
- publishNodeAsDown(getNodeName());
- for (CoreDescriptor descriptor : descriptors) {
- // if it looks like we are going to be the leader, we don't
- // want to wait for the following stuff
- CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
- String collection = cloudDesc.getCollectionName();
- String slice = cloudDesc.getShardId();
- try {
- int children =
- zkStateReader
- .getZkClient()
- .getChildren(
- ZkStateReader.COLLECTIONS_ZKNODE
- + "/"
- + collection
- + "/leader_elect/"
- + slice
- + "/election",
- null,
- true)
- .size();
- if (children == 0) {
- log.debug(
- "looks like we are going to be the leader for collection {}
shard {}",
- collection,
- slice);
- continue;
- }
+ // before registering as live, make sure everyone is in a
+ // down state
+ publishNodeAsDown(getNodeName());
+ for (CoreDescriptor descriptor : descriptors) {
+ // if it looks like we are going to be the leader, we don't
+ // want to wait for the following stuff
+ CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
+ String collection = cloudDesc.getCollectionName();
+ String slice = cloudDesc.getShardId();
+ try {
- } catch (NoNodeException e) {
+ int children =
+ zkStateReader
+ .getZkClient()
+ .getChildren(
+ ZkStateReader.COLLECTIONS_ZKNODE
+ + "/"
+ + collection
+ + "/leader_elect/"
+ + slice
+ + "/election",
+ null,
+ true)
+ .size();
+ if (children == 0) {
log.debug(
"looks like we are going to be the leader for collection {}
shard {}",
collection,
slice);
continue;
- } catch (InterruptedException e2) {
- Thread.currentThread().interrupt();
- } catch (SessionExpiredException e) {
- // zk has to reconnect
- throw e;
- } catch (KeeperException e) {
- log.warn("", e);
- Thread.currentThread().interrupt();
}
- final String coreZkNodeName =
descriptor.getCloudDescriptor().getCoreNodeName();
- try {
- log.debug(
- "calling waitForLeaderToSeeDownState for coreZkNodeName={}
collection={} shard={}",
- coreZkNodeName,
- collection,
- slice);
- waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
- } catch (Exception e) {
- log.warn(
- "There was a problem while making a best effort to ensure the
leader has seen us as down, this is not unexpected as Zookeeper has just
reconnected after a session expiration",
- e);
- if (isClosed) {
- return;
- }
+ } catch (NoNodeException e) {
+ log.debug(
+ "looks like we are going to be the leader for collection {} shard
{}",
+ collection,
+ slice);
+ continue;
+ } catch (InterruptedException e2) {
+ Thread.currentThread().interrupt();
+ } catch (SessionExpiredException e) {
+ // zk has to reconnect
+ throw e;
+ } catch (KeeperException e) {
+ log.warn("", e);
+ Thread.currentThread().interrupt();
+ }
+
+ final String coreZkNodeName =
descriptor.getCloudDescriptor().getCoreNodeName();
+ try {
+ log.debug(
+ "calling waitForLeaderToSeeDownState for coreZkNodeName={}
collection={} shard={}",
+ coreZkNodeName,
+ collection,
+ slice);
+ waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
+ } catch (Exception e) {
+ log.warn(
+ "There was a problem while making a best effort to ensure the
leader has seen us as down, this is not unexpected as Zookeeper has just
reconnected after a session expiration",
+ e);
+ if (isClosed) {
+ return;
}
}
}
@@ -676,15 +675,6 @@ public class ZkController implements Closeable {
return sysPropsCacher;
}
- private void closeOutstandingElections(final Supplier<List<CoreDescriptor>>
registerOnReconnect) {
- List<CoreDescriptor> descriptors = registerOnReconnect.get();
- if (descriptors != null) {
- for (CoreDescriptor descriptor : descriptors) {
- closeExistingElectionContext(descriptor);
- }
- }
- }
-
private ContextKey closeExistingElectionContext(CoreDescriptor cd) {
// look for old context - if we find it, cancel it
String collection = cd.getCloudDescriptor().getCollectionName();
@@ -701,16 +691,6 @@ public class ZkController implements Closeable {
return contextKey;
}
- private void markAllAsNotLeader(final Supplier<List<CoreDescriptor>>
registerOnReconnect) {
- List<CoreDescriptor> descriptors = registerOnReconnect.get();
- if (descriptors != null) {
- for (CoreDescriptor descriptor : descriptors) {
- descriptor.getCloudDescriptor().setLeader(false);
- descriptor.getCloudDescriptor().setHasRegistered(false);
- }
- }
- }
-
public void preClose() {
this.isClosed = true;
diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java
b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
index f7ea012b249..88a7d67bbb5 100644
--- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
@@ -23,12 +23,9 @@ import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Paths;
-import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
import org.apache.solr.client.solrj.impl.SolrZkClientTimeout;
import org.apache.solr.cloud.SolrZkServer;
import org.apache.solr.cloud.ZkController;
@@ -139,15 +136,8 @@ public class ZkContainer {
"A chroot was specified in ZkHost but the znode doesn't exist. "
+ zookeeperHost);
}
- Supplier<List<CoreDescriptor>> descriptorsSupplier =
- () ->
- cc.getCores().stream()
- .map(SolrCore::getCoreDescriptor)
- .collect(Collectors.toList());
-
ZkController zkController =
- new ZkController(
- cc, zookeeperHost, zkClientConnectTimeout, config,
descriptorsSupplier);
+ new ZkController(cc, zookeeperHost, zkClientConnectTimeout,
config);
if (zkRun != null) {
if (StrUtils.isNotNullOrEmpty(System.getProperty(HTTPS_PORT_PROP))) {
diff --git
a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
index 5da2f862ea2..32f921747d9 100644
---
a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
+++
b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
@@ -73,10 +73,9 @@ public class LeaderElectionIntegrationTest extends
SolrCloudTestCase {
"shard1",
jetty
.getCoreContainer()
- .getCores()
+ .getCoreDescriptors()
.iterator()
.next()
- .getCoreDescriptor()
.getCloudDescriptor()
.getShardId());
String jettyNodeName = jetty.getNodeName(); // must get before shutdown
diff --git
a/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java
b/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java
index a553e1064b0..2c678132a86 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java
@@ -17,23 +17,16 @@
package org.apache.solr.cloud;
import java.io.IOException;
-import java.util.List;
import java.util.concurrent.TimeoutException;
-import java.util.function.Supplier;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.CoreDescriptor;
public class MockSimpleZkController extends ZkController {
public MockSimpleZkController(
- CoreContainer cc,
- String zkServerAddress,
- int zkClientConnectTimeout,
- CloudConfig cloudConfig,
- Supplier<List<CoreDescriptor>> descriptorsSupplier)
+ CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout,
CloudConfig cloudConfig)
throws InterruptedException, TimeoutException, IOException {
- super(cc, zkServerAddress, zkClientConnectTimeout, cloudConfig,
descriptorsSupplier);
+ super(cc, zkServerAddress, zkClientConnectTimeout, cloudConfig);
}
@Override
diff --git
a/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java
b/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java
index 93da4559990..ce12f94fefa 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java
@@ -18,7 +18,6 @@ package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
-import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.tests.util.LuceneTestCase.BadApple;
@@ -63,8 +62,7 @@ public class TestLeaderElectionZkExpiry extends
SolrTestCaseJ4 {
ExecutorUtil.newMDCAwareSingleThreadExecutor(
new SolrNamedThreadFactory(this.getTestName()));
try (ZkController zkController =
- new ZkController(
- cc, server.getZkAddress(), 15000, cloudConfig,
Collections::emptyList)) {
+ new ZkController(cc, server.getZkAddress(), 15000, cloudConfig)) {
threadExecutor.execute(
() -> {
TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS,
TimeSource.NANO_TIME);
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
index 2a6ae440dc3..004b6f0bcf5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
@@ -201,8 +201,7 @@ public class ZkControllerTest extends SolrCloudTestCase {
try {
CloudConfig cloudConfig =
new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983,
"solr").build();
- zkController =
- new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig,
() -> null);
+ zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT,
cloudConfig);
} catch (IllegalArgumentException e) {
fail("ZkController did not normalize host name correctly");
} finally {
@@ -268,8 +267,7 @@ public class ZkControllerTest extends SolrCloudTestCase {
Boolean.getBoolean("solr.distributedCollectionConfigSetExecution"))
.build();
zkController =
- new ZkController(
- cc, cluster.getZkServer().getZkAddress(), TIMEOUT,
cloudConfig, () -> null);
+ new ZkController(cc, cluster.getZkServer().getZkAddress(),
TIMEOUT, cloudConfig);
zkControllerRef.set(zkController);
zkController
@@ -435,7 +433,7 @@ public class ZkControllerTest extends SolrCloudTestCase {
CloudConfig cloudConfig =
new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983,
"solr").build();
try (ZkController zkController =
- new ZkController(cc, server.getZkAddress(), TIMEOUT,
cloudConfig, () -> null)) {
+ new ZkController(cc, server.getZkAddress(), TIMEOUT,
cloudConfig)) {
final Path dir = createTempDir();
final String configsetName = "testconfigset";
try (ZkSolrResourceLoader loader =
@@ -515,8 +513,7 @@ public class ZkControllerTest extends SolrCloudTestCase {
server.getZkAddress(),
TIMEOUT,
new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983 +
index, "solr")
- .build(),
- () -> null);
+ .build());
} catch (Exception e) {
exception.compareAndSet(null, e);
} finally {