This is an automated email from the ASF dual-hosted git repository. psalagnac pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push: new b2f8b109dd7 SOLR-17578: Remove ZkController internal core supplier. (#2891) b2f8b109dd7 is described below commit b2f8b109dd7819708eb41066c679e4c9e425e057 Author: Pierre Salagnac <psalag...@apache.org> AuthorDate: Wed Dec 11 11:30:28 2024 +0100 SOLR-17578: Remove ZkController internal core supplier. (#2891) --- solr/CHANGES.txt | 2 +- .../java/org/apache/solr/cloud/ZkController.java | 198 +++++++++------------ .../src/java/org/apache/solr/core/ZkContainer.java | 12 +- .../solr/cloud/LeaderElectionIntegrationTest.java | 3 +- .../apache/solr/cloud/MockSimpleZkController.java | 11 +- .../solr/cloud/TestLeaderElectionZkExpiry.java | 4 +- .../org/apache/solr/cloud/ZkControllerTest.java | 11 +- 7 files changed, 98 insertions(+), 143 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 7e057e943c4..f8a7ae0de27 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -152,7 +152,7 @@ Improvements Optimizations --------------------- -(No changes) +* SOLR-17578: Remove ZkController internal core supplier, for slightly faster reconnection after Zookeeper session loss. (Pierre Salagnac) Bug Fixes --------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 76f9618d4b1..7dd30a14d24 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -50,7 +50,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.curator.framework.api.ACLProvider; import org.apache.solr.client.solrj.SolrClient; @@ -298,16 +297,12 @@ public class ZkController implements Closeable { * @param zkClientConnectTimeout timeout in ms * @param cloudConfig configuration for this controller. TODO: possibly redundant with * CoreContainer - * @param descriptorsSupplier a supplier of the current core descriptors. used to know which cores - * to re-register on reconnect */ - @SuppressWarnings({"unchecked"}) public ZkController( final CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, - CloudConfig cloudConfig, - final Supplier<List<CoreDescriptor>> descriptorsSupplier) + CloudConfig cloudConfig) throws InterruptedException, TimeoutException, IOException { if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null."); @@ -366,9 +361,8 @@ public class ZkController implements Closeable { .withUrl(zkServerAddress) .withTimeout(clientTimeout, TimeUnit.MILLISECONDS) .withConnTimeOut(zkClientConnectTimeout, TimeUnit.MILLISECONDS) - .withReconnectListener(() -> onReconnect(descriptorsSupplier)) - .withDisconnectListener( - (sessionExpired) -> onDisconnect(descriptorsSupplier, sessionExpired)) + .withReconnectListener(this::onReconnect) + .withDisconnectListener((sessionExpired) -> onDisconnect(sessionExpired)) .withAclProvider(zkACLProvider) .withClosedCheck(cc::isShutDown) .withCompressor(compressor) @@ -404,18 +398,27 @@ public class ZkController implements Closeable { assert ObjectReleaseTracker.track(this); } - private void onDisconnect( - Supplier<List<CoreDescriptor>> descriptorsSupplier, boolean sessionExpired) { + private void onDisconnect(boolean sessionExpired) { try { overseer.close(); } catch (Exception e) { log.error("Error trying to stop any Overseer threads", e); } - closeOutstandingElections(descriptorsSupplier, sessionExpired); - markAllAsNotLeader(descriptorsSupplier); + + // Close outstanding leader elections + List<CoreDescriptor> descriptors = cc.getCoreDescriptors(); + for (CoreDescriptor descriptor : descriptors) { + closeExistingElectionContext(descriptor, sessionExpired); + } + + // Mark all cores as not leader + for (CoreDescriptor descriptor : descriptors) { + descriptor.getCloudDescriptor().setLeader(false); + descriptor.getCloudDescriptor().setHasRegistered(false); + } } - private void onReconnect(Supplier<List<CoreDescriptor>> descriptorsSupplier) { + private void onReconnect() { // on reconnect, reload cloud info log.info("ZooKeeper session re-connected ... refreshing core states after session expiration."); clearZkCollectionTerms(); @@ -456,7 +459,7 @@ public class ZkController implements Closeable { cc.cancelCoreRecoveries(); try { - registerAllCoresAsDown(descriptorsSupplier, false); + registerAllCoresAsDown(false); } catch (SessionExpiredException e) { // zk has to reconnect and this will all be tried again throw e; @@ -469,26 +472,24 @@ public class ZkController implements Closeable { // we have to register as live first to pick up docs in the buffer createEphemeralLiveNode(); - List<CoreDescriptor> descriptors = descriptorsSupplier.get(); + List<CoreDescriptor> descriptors = cc.getCoreDescriptors(); // re register all descriptors ExecutorService executorService = (cc != null) ? cc.getCoreZkRegisterExecutorService() : null; - if (descriptors != null) { - for (CoreDescriptor descriptor : descriptors) { - // TODO: we need to think carefully about what happens when it was a leader - // that was expired - as well as what to do about leaders/overseers with - // connection loss - try { - // unload solr cores that have been 'failed over' - throwErrorIfReplicaReplaced(descriptor); + for (CoreDescriptor descriptor : descriptors) { + // TODO: we need to think carefully about what happens when it was a leader + // that was expired - as well as what to do about leaders/overseers with + // connection loss + try { + // unload solr cores that have been 'failed over' + throwErrorIfReplicaReplaced(descriptor); - if (executorService != null) { - executorService.submit(new RegisterCoreAsync(descriptor, true, true)); - } else { - register(descriptor.getName(), descriptor, true, true, false); - } - } catch (Exception e) { - log.error("Error registering SolrCore", e); + if (executorService != null) { + executorService.submit(new RegisterCoreAsync(descriptor, true, true)); + } else { + register(descriptor.getName(), descriptor, true, true, false); } + } catch (Exception e) { + log.error("Error registering SolrCore", e); } } @@ -588,75 +589,72 @@ public class ZkController implements Closeable { return leaderConflictResolveWait; } - private void registerAllCoresAsDown( - final Supplier<List<CoreDescriptor>> registerOnReconnect, boolean updateLastPublished) - throws SessionExpiredException { - List<CoreDescriptor> descriptors = registerOnReconnect.get(); + private void registerAllCoresAsDown(boolean updateLastPublished) throws SessionExpiredException { + List<CoreDescriptor> descriptors = cc.getCoreDescriptors(); if (isClosed) return; - if (descriptors != null) { - // before registering as live, make sure everyone is in a - // down state - publishNodeAsDown(getNodeName()); - for (CoreDescriptor descriptor : descriptors) { - // if it looks like we are going to be the leader, we don't - // want to wait for the following stuff - CloudDescriptor cloudDesc = descriptor.getCloudDescriptor(); - String collection = cloudDesc.getCollectionName(); - String slice = cloudDesc.getShardId(); - try { - int children = - zkStateReader - .getZkClient() - .getChildren( - ZkStateReader.COLLECTIONS_ZKNODE - + "/" - + collection - + "/leader_elect/" - + slice - + "/election", - null, - true) - .size(); - if (children == 0) { - log.debug( - "looks like we are going to be the leader for collection {} shard {}", - collection, - slice); - continue; - } + // before registering as live, make sure everyone is in a + // down state + publishNodeAsDown(getNodeName()); + for (CoreDescriptor descriptor : descriptors) { + // if it looks like we are going to be the leader, we don't + // want to wait for the following stuff + CloudDescriptor cloudDesc = descriptor.getCloudDescriptor(); + String collection = cloudDesc.getCollectionName(); + String slice = cloudDesc.getShardId(); + try { - } catch (NoNodeException e) { + int children = + zkStateReader + .getZkClient() + .getChildren( + ZkStateReader.COLLECTIONS_ZKNODE + + "/" + + collection + + "/leader_elect/" + + slice + + "/election", + null, + true) + .size(); + if (children == 0) { log.debug( "looks like we are going to be the leader for collection {} shard {}", collection, slice); continue; - } catch (InterruptedException e2) { - Thread.currentThread().interrupt(); - } catch (SessionExpiredException e) { - // zk has to reconnect - throw e; - } catch (KeeperException e) { - log.warn("", e); - Thread.currentThread().interrupt(); } - final String coreZkNodeName = descriptor.getCloudDescriptor().getCoreNodeName(); - try { - log.debug( - "calling waitForLeaderToSeeDownState for coreZkNodeName={} collection={} shard={}", - coreZkNodeName, - collection, - slice); - waitForLeaderToSeeDownState(descriptor, coreZkNodeName); - } catch (Exception e) { - log.warn( - "There was a problem while making a best effort to ensure the leader has seen us as down, this is not unexpected as Zookeeper has just reconnected after a session expiration", - e); - if (isClosed) { - return; - } + } catch (NoNodeException e) { + log.debug( + "looks like we are going to be the leader for collection {} shard {}", + collection, + slice); + continue; + } catch (InterruptedException e2) { + Thread.currentThread().interrupt(); + } catch (SessionExpiredException e) { + // zk has to reconnect + throw e; + } catch (KeeperException e) { + log.warn("", e); + Thread.currentThread().interrupt(); + } + + final String coreZkNodeName = descriptor.getCloudDescriptor().getCoreNodeName(); + try { + log.debug( + "calling waitForLeaderToSeeDownState for coreZkNodeName={} collection={} shard={}", + coreZkNodeName, + collection, + slice); + waitForLeaderToSeeDownState(descriptor, coreZkNodeName); + } catch (Exception e) { + log.warn( + "There was a problem while making a best effort to ensure the leader has seen us as down, this is not unexpected as Zookeeper has just reconnected after a session expiration", + e); + if (isClosed) { + return; } } } @@ -666,16 +664,6 @@ public class ZkController implements Closeable { return sysPropsCacher; } - private void closeOutstandingElections( - final Supplier<List<CoreDescriptor>> registerOnReconnect, boolean sessionExpired) { - List<CoreDescriptor> descriptors = registerOnReconnect.get(); - if (descriptors != null) { - for (CoreDescriptor descriptor : descriptors) { - closeExistingElectionContext(descriptor, sessionExpired); - } - } - } - private ContextKey closeExistingElectionContext(CoreDescriptor cd, boolean sessionExpired) { // look for old context - if we find it, cancel it String collection = cd.getCloudDescriptor().getCollectionName(); @@ -696,16 +684,6 @@ public class ZkController implements Closeable { return contextKey; } - private void markAllAsNotLeader(final Supplier<List<CoreDescriptor>> registerOnReconnect) { - List<CoreDescriptor> descriptors = registerOnReconnect.get(); - if (descriptors != null) { - for (CoreDescriptor descriptor : descriptors) { - descriptor.getCloudDescriptor().setLeader(false); - descriptor.getCloudDescriptor().setHasRegistered(false); - } - } - } - public void preClose() { this.isClosed = true; diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java b/solr/core/src/java/org/apache/solr/core/ZkContainer.java index 6dd7b2d571d..3dbd1652993 100644 --- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java +++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java @@ -23,12 +23,9 @@ import java.io.File; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.nio.file.Paths; -import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; import java.util.function.Predicate; -import java.util.function.Supplier; -import java.util.stream.Collectors; import org.apache.solr.client.solrj.impl.SolrZkClientTimeout; import org.apache.solr.cloud.SolrZkServer; import org.apache.solr.cloud.ZkController; @@ -132,15 +129,8 @@ public class ZkContainer { "A chroot was specified in ZkHost but the znode doesn't exist. " + zookeeperHost); } - Supplier<List<CoreDescriptor>> descriptorsSupplier = - () -> - cc.getCores().stream() - .map(SolrCore::getCoreDescriptor) - .collect(Collectors.toList()); - ZkController zkController = - new ZkController( - cc, zookeeperHost, zkClientConnectTimeout, config, descriptorsSupplier); + new ZkController(cc, zookeeperHost, zkClientConnectTimeout, config); if (zkRun != null) { if (StrUtils.isNotNullOrEmpty(System.getProperty(HTTPS_PORT_PROP))) { diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java index 5da2f862ea2..32f921747d9 100644 --- a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java @@ -73,10 +73,9 @@ public class LeaderElectionIntegrationTest extends SolrCloudTestCase { "shard1", jetty .getCoreContainer() - .getCores() + .getCoreDescriptors() .iterator() .next() - .getCoreDescriptor() .getCloudDescriptor() .getShardId()); String jettyNodeName = jetty.getNodeName(); // must get before shutdown diff --git a/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java b/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java index a553e1064b0..2c678132a86 100644 --- a/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java +++ b/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java @@ -17,23 +17,16 @@ package org.apache.solr.cloud; import java.io.IOException; -import java.util.List; import java.util.concurrent.TimeoutException; -import java.util.function.Supplier; import org.apache.solr.core.CloudConfig; import org.apache.solr.core.CoreContainer; -import org.apache.solr.core.CoreDescriptor; public class MockSimpleZkController extends ZkController { public MockSimpleZkController( - CoreContainer cc, - String zkServerAddress, - int zkClientConnectTimeout, - CloudConfig cloudConfig, - Supplier<List<CoreDescriptor>> descriptorsSupplier) + CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig) throws InterruptedException, TimeoutException, IOException { - super(cc, zkServerAddress, zkClientConnectTimeout, cloudConfig, descriptorsSupplier); + super(cc, zkServerAddress, zkClientConnectTimeout, cloudConfig); } @Override diff --git a/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java b/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java index a871fc7176c..2c8520ddc22 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java @@ -18,7 +18,6 @@ package org.apache.solr.cloud; import java.lang.invoke.MethodHandles; import java.nio.file.Path; -import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.lucene.tests.util.LuceneTestCase.BadApple; @@ -63,8 +62,7 @@ public class TestLeaderElectionZkExpiry extends SolrTestCaseJ4 { ExecutorUtil.newMDCAwareSingleThreadExecutor( new SolrNamedThreadFactory(this.getTestName())); try (ZkController zkController = - new ZkController( - cc, server.getZkAddress(), 15000, cloudConfig, Collections::emptyList)) { + new ZkController(cc, server.getZkAddress(), 15000, cloudConfig)) { threadExecutor.execute( () -> { TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME); diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java index 6bda0dd6b53..c5be38b570c 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java @@ -166,8 +166,7 @@ public class ZkControllerTest extends SolrCloudTestCase { try { CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983).build(); - zkController = - new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, () -> null); + zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig); } catch (IllegalArgumentException e) { fail("ZkController did not normalize host name correctly"); } finally { @@ -233,8 +232,7 @@ public class ZkControllerTest extends SolrCloudTestCase { Boolean.getBoolean("solr.distributedCollectionConfigSetExecution")) .build(); zkController = - new ZkController( - cc, cluster.getZkServer().getZkAddress(), TIMEOUT, cloudConfig, () -> null); + new ZkController(cc, cluster.getZkServer().getZkAddress(), TIMEOUT, cloudConfig); zkControllerRef.set(zkController); zkController @@ -399,7 +397,7 @@ public class ZkControllerTest extends SolrCloudTestCase { try { CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983).build(); try (ZkController zkController = - new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, () -> null)) { + new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig)) { final Path dir = createTempDir(); final String configsetName = "testconfigset"; try (ZkSolrResourceLoader loader = @@ -478,8 +476,7 @@ public class ZkControllerTest extends SolrCloudTestCase { cc, server.getZkAddress(), TIMEOUT, - new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983 + index).build(), - () -> null); + new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983 + index).build()); } catch (Exception e) { exception.compareAndSet(null, e); } finally {