Copilot commented on code in PR #2391:
URL: https://github.com/apache/solr/pull/2391#discussion_r3215665335
##########
solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java:
##########
@@ -340,6 +346,274 @@ public MiniSolrCloudCluster(
}
}
+ /**
+ * Create a MiniSolrCloudCluster with embedded ZooKeeper quorum mode. Each
Solr node runs its own
+ * embedded ZooKeeper server, and together they form a quorum.
+ *
+ * @param numServers number of Solr servers (must be at least 3 for quorum)
+ * @param baseDir base directory that the mini cluster should be run from
+ * @param solrXml solr.xml file content
+ * @param jettyConfig Jetty configuration
+ * @param securityJson Optional security.json configuration
+ * @param trackJettyMetrics whether to track Jetty metrics
+ * @throws Exception if there was an error starting the cluster
+ */
+ MiniSolrCloudCluster(
+ int numServers,
+ Path baseDir,
+ String solrXml,
+ JettyConfig jettyConfig,
+ Optional<String> securityJson,
+ boolean trackJettyMetrics,
+ boolean useEmbeddedZkQuorum)
+ throws Exception {
+
+ if (!useEmbeddedZkQuorum) {
+ throw new IllegalArgumentException("This constructor is only for
embedded ZK quorum mode");
+ }
+ if (numServers < 3) {
+ throw new IllegalArgumentException(
+ "ZooKeeper quorum requires at least 3 nodes, got: " + numServers);
+ }
+
+ Objects.requireNonNull(securityJson);
+ this.baseDir = Objects.requireNonNull(baseDir);
+ this.jettyConfig = Objects.requireNonNull(jettyConfig);
+ this.solrXml = solrXml == null ? DEFAULT_CLOUD_SOLR_XML : solrXml;
+ this.trackJettyMetrics = trackJettyMetrics;
+ this.externalZkServer = true; // No ZkTestServer in quorum mode
+ this.zkServer = null; // No single ZK server
+
+ log.info("Starting cluster of {} servers with embedded ZK quorum in {}",
numServers, baseDir);
+ Files.createDirectories(baseDir);
+
+ // Phase 1: Reserve random ports for all nodes
+ int[] ports = reservePortPairs(numServers);
+
+ // Build the zkHost string with all ZK ports (Solr port + 1000)
+ StringBuilder zkHostBuilder = new StringBuilder();
+ for (int i = 0; i < numServers; i++) {
+ if (i > 0) {
+ zkHostBuilder.append(",");
+ }
+ int zkPort = ports[i] + 1000;
+ zkHostBuilder.append("127.0.0.1:").append(zkPort);
+ }
+ this.zkHost = zkHostBuilder.toString(); // Save for later use
+
+ if (log.isInfoEnabled()) {
+ log.info("Reserved ports for {} nodes: {}", numServers,
java.util.Arrays.toString(ports));
+ log.info("ZK connection string: {}", this.zkHost);
+ }
+
+ // Set system properties for embedded ZK quorum mode
+ System.setProperty("solr.zookeeper.server.enabled", "true");
+ System.setProperty("solr.security.manager.enabled", "false");
+ System.setProperty("solr.node.roles",
"data:on,overseer:allowed,zookeeper_quorum:on");
+ System.setProperty("solr.test.sys.prop1", "propone");
+ System.setProperty("solr.test.sys.prop2", "proptwo");
+ System.setProperty("solr.zookeeper.client.timeout", "300000"); // 5 minutes
+
+ // Phase 2: Start all nodes in parallel
+ List<Callable<JettySolrRunner>> startups = new ArrayList<>(numServers);
+ for (int i = 0; i < numServers; i++) {
+ final int solrPort = ports[i];
+ final String nodeName = newNodeName();
+ startups.add(
+ () -> {
+ Path runnerPath = createInstancePath(nodeName);
+ Files.write(runnerPath.resolve("solr.xml"),
solrXml.getBytes(StandardCharsets.UTF_8));
+
+ Properties nodeProps = new Properties();
+ nodeProps.setProperty("zkHost", this.zkHost);
+ nodeProps.setProperty("hostPort", String.valueOf(solrPort));
+
+ JettyConfig newConfig =
JettyConfig.builder(jettyConfig).setPort(solrPort).build();
+
+ JettySolrRunner jetty =
+ !trackJettyMetrics
+ ? new JettySolrRunner(runnerPath.toString(), nodeProps,
newConfig)
+ : new JettySolrRunnerWithMetrics(runnerPath.toString(),
nodeProps, newConfig);
+
+ int zkPort = solrPort + 1000;
+ log.info("Starting {} on port {} with ZK on port {}", nodeName,
solrPort, zkPort);
+ jetty.start();
+ log.info("Node {} started successfully", nodeName);
+
+ jettys.add(jetty);
+ synchronized (startupWait) {
+ startupWait.notifyAll();
+ }
+ return jetty;
+ });
+ }
+
+ final ExecutorService executorLauncher =
+ ExecutorUtil.newMDCAwareCachedThreadPool(new
SolrNamedThreadFactory("jetty-launcher"));
+ Collection<Future<JettySolrRunner>> futures =
executorLauncher.invokeAll(startups);
+ ExecutorUtil.shutdownAndAwaitTermination(executorLauncher);
+ Exception startupError =
+ checkForExceptions(
+ "Error starting up MiniSolrCloudCluster with embedded ZK quorum",
futures);
+ if (startupError != null) {
+ try {
+ this.shutdown();
+ } catch (Throwable t) {
+ startupError.addSuppressed(t);
+ }
+ throw startupError;
+ }
+
+ log.info("All {} nodes started, waiting for quorum formation...",
numServers);
+ Thread.sleep(10000); // Wait for ZK quorum to fully form
+
+ // Initialize ZK paths and security (if provided)
+ try (SolrZkClient zkClient =
+ new SolrZkClient.Builder()
+ .withUrl(this.zkHost)
+ .withTimeout(60000, TimeUnit.MILLISECONDS)
+ .build()) {
+ if (!zkClient.exists("/solr")) {
+ zkClient.makePath("/solr", true);
+ }
+
+ if (jettyConfig.sslConfig != null && jettyConfig.sslConfig.isSSLMode()) {
+ zkClient.makePath(
+ "/solr" + ZkStateReader.CLUSTER_PROPS,
+ "{'urlScheme':'https'}".getBytes(StandardCharsets.UTF_8),
+ true);
+ }
+ if (securityJson.isPresent()) {
+ zkClient.makePath(
+ "/solr/security.json",
securityJson.get().getBytes(Charset.defaultCharset()), true);
+ }
+ }
+
+ solrClient = buildSolrClientForQuorum(this.zkHost);
+
+ if (numServers > 0) {
+ waitForAllNodes(numServers, 60);
+ }
+
+ log.info("Embedded ZK quorum cluster started successfully with {} nodes",
numServers);
+ }
+
+ /**
+ * Reserves port pairs for embedded ZK quorum mode. For each node, we need
both a Solr port and a
+ * ZK port (Solr port + 1000). This method ensures both ports in each pair
are available before
+ * returning.
+ *
+ * <p>The method keeps all ServerSockets open during the search to prevent
race conditions where
+ * another process might grab a port between our check and actual usage.
+ *
+ * @param numPairs the number of port pairs to reserve
+ * @return array of Solr ports (ZK ports are Solr port + 1000)
+ * @throws IOException if unable to find enough available port pairs
+ */
+ private int[] reservePortPairs(int numPairs) throws IOException {
+ List<ServerSocket> solrSockets = new ArrayList<>();
+ List<ServerSocket> zkSockets = new ArrayList<>();
+ int[] ports = new int[numPairs];
+
+ try {
+ int pairsFound = 0;
+ int maxAttempts = numPairs * 100; // Reasonable limit to avoid infinite
loops
+ int attempts = 0;
+
+ while (pairsFound < numPairs && attempts < maxAttempts) {
+ attempts++;
+ ServerSocket solrSocket = null;
+ ServerSocket zkSocket = null;
+
+ try {
+ // Try to get a random available port for Solr
+ solrSocket = new ServerSocket(0);
+ int solrPort = solrSocket.getLocalPort();
+ int zkPort = solrPort + 1000;
+
+ // Check if ZK port would exceed the valid port range (0-65535)
+ if (zkPort > 65535) {
+ solrSocket.close();
+ continue; // Skip this port and try again
+ }
+
+ // Verify the corresponding ZK port is also available
+ zkSocket = new ServerSocket(zkPort);
+
+ // Both ports are available - keep the sockets and record the port
+ solrSockets.add(solrSocket);
+ zkSockets.add(zkSocket);
+ ports[pairsFound] = solrPort;
+ pairsFound++;
+
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Reserved port pair {}/{}: Solr={}, ZK={}", pairsFound,
numPairs, solrPort, zkPort);
+ }
+
+ } catch (IOException | IllegalArgumentException e) {
+ // ZK port was not available or invalid, close sockets and try again
+ if (solrSocket != null) {
+ try {
+ solrSocket.close();
+ } catch (IOException ignored) {
+ }
+ }
+ if (zkSocket != null) {
+ try {
+ zkSocket.close();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+ }
+
+ if (pairsFound < numPairs) {
+ throw new IOException(
+ "Unable to find " + numPairs + " available port pairs after " +
attempts + " attempts");
+ }
+ return ports;
+
+ } finally {
+ // Close all sockets now that we've recorded the ports
+ // The ports will remain available for immediate reuse
+ for (ServerSocket socket : solrSockets) {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ log.warn("Error closing Solr socket", e);
+ }
+ }
+ for (ServerSocket socket : zkSockets) {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ log.warn("Error closing ZK socket", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Get the ZK connection string. Works for both standard mode (using
zkServer) and quorum mode
+ * (using zkHost field).
+ *
+ * @return ZK connection string
+ */
+ private String getZkAddress() {
+ if (zkHost != null) {
+ return zkHost; // Quorum mode
+ }
+ return zkServer.getZkAddress(); // Standard mode
Review Comment:
`zkServer` is null in embedded-quorum mode, but `newSolrClient(String)`
still calls `getZkServer().getZkAddress()` (see `newSolrClient` around line
~1079). That will throw NPE for any caller using `cluster.newSolrClient(...)`
with quorum mode enabled. Update `newSolrClient` (and any other callers) to use
`getZkAddress()` instead of `getZkServer()` and/or guard against `zkServer ==
null`.
##########
solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java:
##########
@@ -389,6 +663,87 @@ public void waitForNode(JettySolrRunner jetty, int
timeoutSeconds)
timeoutSeconds, TimeUnit.SECONDS, (o, n) -> n != null &&
n.contains(nodeName));
}
+ /**
+ * Wait for the expected number of live nodes in the cluster.
+ *
+ * @param expectedCount expected number of live nodes
+ * @param timeoutSeconds timeout in seconds
+ * @throws InterruptedException if interrupted while waiting
+ * @throws TimeoutException if the expected count is not reached within the
timeout
+ */
+ public void waitForLiveNodes(int expectedCount, int timeoutSeconds)
+ throws InterruptedException, TimeoutException {
+ TimeOut timeout = new TimeOut(timeoutSeconds, TimeUnit.SECONDS,
TimeSource.NANO_TIME);
+ while (!timeout.hasTimedOut()) {
+ long runningNodes =
jettys.stream().filter(JettySolrRunner::isRunning).count();
+ if (runningNodes == expectedCount) {
+ log.info("Verified {} live nodes", runningNodes);
+ return;
+ }
+ Thread.sleep(200);
+ }
+ // Final check after timeout
+ long actualCount =
jettys.stream().filter(JettySolrRunner::isRunning).count();
+ throw new TimeoutException(
+ "Live node count mismatch: expected " + expectedCount + " but got " +
actualCount);
Review Comment:
This `waitForLiveNodes` implementation counts locally running Jetty
processes, but the method name/usage implies ZooKeeper `live_nodes` (and
callers use it to wait for node loss/join). Since `stopJettySolrRunner(...)`
removes jettys from the list, this can return immediately without waiting for
ZooKeeper state to update. Consider implementing this using
`getZkStateReader().waitForLiveNodes(...)` (similar to `waitForJettyToStop`)
and/or rename to reflect what it actually checks.
##########
solr/core/src/test/org/apache/solr/cloud/TestEmbeddedZkQuorum.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Path;
+import java.util.Locale;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.SolrQuery;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.embedded.JettySolrRunner;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test embedded ZooKeeper running in quorum mode within Solr nodes.
+ *
+ * <p>This test verifies that:
+ *
+ * <ul>
+ * <li>Multiple Solr nodes can start with embedded ZK in quorum mode
+ * <li>The ZK quorum forms correctly
+ * <li>Collections can be created and used
+ * <li>Documents can be indexed and queried
+ * <li>All resources are properly closed on shutdown
+ * </ul>
+ */
[email protected]
+public class TestEmbeddedZkQuorum extends SolrCloudTestCase {
+
+ private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final String COLLECTION_NAME = "test_quorum_collection";
+ private static final int NUM_NODES = 3;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ // Disable ZooKeeper JMX to avoid MBean registration conflicts during
beasting
Review Comment:
Typo in comment: "during beasting" should be "during testing" (or similar).
##########
solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java:
##########
@@ -340,6 +346,274 @@ public MiniSolrCloudCluster(
}
}
+ /**
+ * Create a MiniSolrCloudCluster with embedded ZooKeeper quorum mode. Each
Solr node runs its own
+ * embedded ZooKeeper server, and together they form a quorum.
+ *
+ * @param numServers number of Solr servers (must be at least 3 for quorum)
+ * @param baseDir base directory that the mini cluster should be run from
+ * @param solrXml solr.xml file content
+ * @param jettyConfig Jetty configuration
+ * @param securityJson Optional security.json configuration
+ * @param trackJettyMetrics whether to track Jetty metrics
+ * @throws Exception if there was an error starting the cluster
+ */
+ MiniSolrCloudCluster(
+ int numServers,
+ Path baseDir,
+ String solrXml,
+ JettyConfig jettyConfig,
+ Optional<String> securityJson,
+ boolean trackJettyMetrics,
+ boolean useEmbeddedZkQuorum)
+ throws Exception {
+
+ if (!useEmbeddedZkQuorum) {
+ throw new IllegalArgumentException("This constructor is only for
embedded ZK quorum mode");
+ }
+ if (numServers < 3) {
+ throw new IllegalArgumentException(
+ "ZooKeeper quorum requires at least 3 nodes, got: " + numServers);
+ }
+
+ Objects.requireNonNull(securityJson);
+ this.baseDir = Objects.requireNonNull(baseDir);
+ this.jettyConfig = Objects.requireNonNull(jettyConfig);
+ this.solrXml = solrXml == null ? DEFAULT_CLOUD_SOLR_XML : solrXml;
+ this.trackJettyMetrics = trackJettyMetrics;
+ this.externalZkServer = true; // No ZkTestServer in quorum mode
+ this.zkServer = null; // No single ZK server
+
+ log.info("Starting cluster of {} servers with embedded ZK quorum in {}",
numServers, baseDir);
+ Files.createDirectories(baseDir);
+
+ // Phase 1: Reserve random ports for all nodes
+ int[] ports = reservePortPairs(numServers);
+
+ // Build the zkHost string with all ZK ports (Solr port + 1000)
+ StringBuilder zkHostBuilder = new StringBuilder();
+ for (int i = 0; i < numServers; i++) {
+ if (i > 0) {
+ zkHostBuilder.append(",");
+ }
+ int zkPort = ports[i] + 1000;
+ zkHostBuilder.append("127.0.0.1:").append(zkPort);
+ }
+ this.zkHost = zkHostBuilder.toString(); // Save for later use
+
+ if (log.isInfoEnabled()) {
+ log.info("Reserved ports for {} nodes: {}", numServers,
java.util.Arrays.toString(ports));
+ log.info("ZK connection string: {}", this.zkHost);
+ }
+
+ // Set system properties for embedded ZK quorum mode
+ System.setProperty("solr.zookeeper.server.enabled", "true");
+ System.setProperty("solr.security.manager.enabled", "false");
+ System.setProperty("solr.node.roles",
"data:on,overseer:allowed,zookeeper_quorum:on");
+ System.setProperty("solr.test.sys.prop1", "propone");
+ System.setProperty("solr.test.sys.prop2", "proptwo");
+ System.setProperty("solr.zookeeper.client.timeout", "300000"); // 5 minutes
+
+ // Phase 2: Start all nodes in parallel
+ List<Callable<JettySolrRunner>> startups = new ArrayList<>(numServers);
+ for (int i = 0; i < numServers; i++) {
+ final int solrPort = ports[i];
+ final String nodeName = newNodeName();
+ startups.add(
+ () -> {
+ Path runnerPath = createInstancePath(nodeName);
+ Files.write(runnerPath.resolve("solr.xml"),
solrXml.getBytes(StandardCharsets.UTF_8));
+
+ Properties nodeProps = new Properties();
+ nodeProps.setProperty("zkHost", this.zkHost);
+ nodeProps.setProperty("hostPort", String.valueOf(solrPort));
+
+ JettyConfig newConfig =
JettyConfig.builder(jettyConfig).setPort(solrPort).build();
+
+ JettySolrRunner jetty =
+ !trackJettyMetrics
+ ? new JettySolrRunner(runnerPath.toString(), nodeProps,
newConfig)
+ : new JettySolrRunnerWithMetrics(runnerPath.toString(),
nodeProps, newConfig);
+
+ int zkPort = solrPort + 1000;
+ log.info("Starting {} on port {} with ZK on port {}", nodeName,
solrPort, zkPort);
+ jetty.start();
+ log.info("Node {} started successfully", nodeName);
+
+ jettys.add(jetty);
+ synchronized (startupWait) {
+ startupWait.notifyAll();
+ }
+ return jetty;
+ });
+ }
+
+ final ExecutorService executorLauncher =
+ ExecutorUtil.newMDCAwareCachedThreadPool(new
SolrNamedThreadFactory("jetty-launcher"));
+ Collection<Future<JettySolrRunner>> futures =
executorLauncher.invokeAll(startups);
+ ExecutorUtil.shutdownAndAwaitTermination(executorLauncher);
+ Exception startupError =
+ checkForExceptions(
+ "Error starting up MiniSolrCloudCluster with embedded ZK quorum",
futures);
+ if (startupError != null) {
+ try {
+ this.shutdown();
+ } catch (Throwable t) {
+ startupError.addSuppressed(t);
+ }
+ throw startupError;
+ }
+
+ log.info("All {} nodes started, waiting for quorum formation...",
numServers);
+ Thread.sleep(10000); // Wait for ZK quorum to fully form
+
+ // Initialize ZK paths and security (if provided)
+ try (SolrZkClient zkClient =
+ new SolrZkClient.Builder()
+ .withUrl(this.zkHost)
+ .withTimeout(60000, TimeUnit.MILLISECONDS)
+ .build()) {
+ if (!zkClient.exists("/solr")) {
+ zkClient.makePath("/solr", true);
+ }
+
+ if (jettyConfig.sslConfig != null && jettyConfig.sslConfig.isSSLMode()) {
+ zkClient.makePath(
+ "/solr" + ZkStateReader.CLUSTER_PROPS,
+ "{'urlScheme':'https'}".getBytes(StandardCharsets.UTF_8),
+ true);
+ }
+ if (securityJson.isPresent()) {
+ zkClient.makePath(
+ "/solr/security.json",
securityJson.get().getBytes(Charset.defaultCharset()), true);
+ }
+ }
+
+ solrClient = buildSolrClientForQuorum(this.zkHost);
+
+ if (numServers > 0) {
+ waitForAllNodes(numServers, 60);
+ }
+
+ log.info("Embedded ZK quorum cluster started successfully with {} nodes",
numServers);
+ }
+
+ /**
+ * Reserves port pairs for embedded ZK quorum mode. For each node, we need
both a Solr port and a
+ * ZK port (Solr port + 1000). This method ensures both ports in each pair
are available before
+ * returning.
+ *
+ * <p>The method keeps all ServerSockets open during the search to prevent
race conditions where
+ * another process might grab a port between our check and actual usage.
+ *
Review Comment:
The Javadoc claims the method "keeps all ServerSockets open" to prevent
races, but the `finally` block closes all sockets before the caller starts
Jetty/ZK. Closing the sockets releases the ports, so another process/thread can
still grab them before use. Either keep the sockets open until after the
servers have bound (and close them later), or adjust the comment/approach
(e.g., let Jetty/ZK bind to ephemeral ports and discover them).
##########
solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java:
##########
@@ -340,6 +346,274 @@ public MiniSolrCloudCluster(
}
}
+ /**
+ * Create a MiniSolrCloudCluster with embedded ZooKeeper quorum mode. Each
Solr node runs its own
+ * embedded ZooKeeper server, and together they form a quorum.
+ *
+ * @param numServers number of Solr servers (must be at least 3 for quorum)
+ * @param baseDir base directory that the mini cluster should be run from
+ * @param solrXml solr.xml file content
+ * @param jettyConfig Jetty configuration
+ * @param securityJson Optional security.json configuration
+ * @param trackJettyMetrics whether to track Jetty metrics
+ * @throws Exception if there was an error starting the cluster
+ */
+ MiniSolrCloudCluster(
+ int numServers,
+ Path baseDir,
+ String solrXml,
+ JettyConfig jettyConfig,
+ Optional<String> securityJson,
+ boolean trackJettyMetrics,
+ boolean useEmbeddedZkQuorum)
+ throws Exception {
+
+ if (!useEmbeddedZkQuorum) {
+ throw new IllegalArgumentException("This constructor is only for
embedded ZK quorum mode");
+ }
+ if (numServers < 3) {
+ throw new IllegalArgumentException(
+ "ZooKeeper quorum requires at least 3 nodes, got: " + numServers);
+ }
+
+ Objects.requireNonNull(securityJson);
+ this.baseDir = Objects.requireNonNull(baseDir);
+ this.jettyConfig = Objects.requireNonNull(jettyConfig);
+ this.solrXml = solrXml == null ? DEFAULT_CLOUD_SOLR_XML : solrXml;
+ this.trackJettyMetrics = trackJettyMetrics;
+ this.externalZkServer = true; // No ZkTestServer in quorum mode
+ this.zkServer = null; // No single ZK server
+
+ log.info("Starting cluster of {} servers with embedded ZK quorum in {}",
numServers, baseDir);
+ Files.createDirectories(baseDir);
+
+ // Phase 1: Reserve random ports for all nodes
+ int[] ports = reservePortPairs(numServers);
+
+ // Build the zkHost string with all ZK ports (Solr port + 1000)
+ StringBuilder zkHostBuilder = new StringBuilder();
+ for (int i = 0; i < numServers; i++) {
+ if (i > 0) {
+ zkHostBuilder.append(",");
+ }
+ int zkPort = ports[i] + 1000;
+ zkHostBuilder.append("127.0.0.1:").append(zkPort);
+ }
+ this.zkHost = zkHostBuilder.toString(); // Save for later use
+
+ if (log.isInfoEnabled()) {
+ log.info("Reserved ports for {} nodes: {}", numServers,
java.util.Arrays.toString(ports));
+ log.info("ZK connection string: {}", this.zkHost);
+ }
+
+ // Set system properties for embedded ZK quorum mode
+ System.setProperty("solr.zookeeper.server.enabled", "true");
+ System.setProperty("solr.security.manager.enabled", "false");
+ System.setProperty("solr.node.roles",
"data:on,overseer:allowed,zookeeper_quorum:on");
+ System.setProperty("solr.test.sys.prop1", "propone");
+ System.setProperty("solr.test.sys.prop2", "proptwo");
+ System.setProperty("solr.zookeeper.client.timeout", "300000"); // 5 minutes
+
+ // Phase 2: Start all nodes in parallel
+ List<Callable<JettySolrRunner>> startups = new ArrayList<>(numServers);
+ for (int i = 0; i < numServers; i++) {
+ final int solrPort = ports[i];
+ final String nodeName = newNodeName();
+ startups.add(
+ () -> {
+ Path runnerPath = createInstancePath(nodeName);
+ Files.write(runnerPath.resolve("solr.xml"),
solrXml.getBytes(StandardCharsets.UTF_8));
+
+ Properties nodeProps = new Properties();
+ nodeProps.setProperty("zkHost", this.zkHost);
+ nodeProps.setProperty("hostPort", String.valueOf(solrPort));
+
+ JettyConfig newConfig =
JettyConfig.builder(jettyConfig).setPort(solrPort).build();
+
+ JettySolrRunner jetty =
+ !trackJettyMetrics
+ ? new JettySolrRunner(runnerPath.toString(), nodeProps,
newConfig)
+ : new JettySolrRunnerWithMetrics(runnerPath.toString(),
nodeProps, newConfig);
+
+ int zkPort = solrPort + 1000;
+ log.info("Starting {} on port {} with ZK on port {}", nodeName,
solrPort, zkPort);
+ jetty.start();
+ log.info("Node {} started successfully", nodeName);
+
+ jettys.add(jetty);
+ synchronized (startupWait) {
+ startupWait.notifyAll();
+ }
+ return jetty;
+ });
+ }
+
+ final ExecutorService executorLauncher =
+ ExecutorUtil.newMDCAwareCachedThreadPool(new
SolrNamedThreadFactory("jetty-launcher"));
+ Collection<Future<JettySolrRunner>> futures =
executorLauncher.invokeAll(startups);
+ ExecutorUtil.shutdownAndAwaitTermination(executorLauncher);
+ Exception startupError =
+ checkForExceptions(
+ "Error starting up MiniSolrCloudCluster with embedded ZK quorum",
futures);
+ if (startupError != null) {
+ try {
+ this.shutdown();
+ } catch (Throwable t) {
+ startupError.addSuppressed(t);
+ }
+ throw startupError;
+ }
+
+ log.info("All {} nodes started, waiting for quorum formation...",
numServers);
+ Thread.sleep(10000); // Wait for ZK quorum to fully form
Review Comment:
The fixed `Thread.sleep(10000)` makes cluster startup timing-dependent and
can be flaky on slow/loaded CI machines. Prefer waiting on a concrete condition
(e.g., connect with retry to `zkHost`, or use `ZkStateReader.waitForLiveNodes`
/ ensemble-ready checks) instead of a hard sleep.
##########
solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java:
##########
@@ -340,6 +346,274 @@ public MiniSolrCloudCluster(
}
}
+ /**
+ * Create a MiniSolrCloudCluster with embedded ZooKeeper quorum mode. Each
Solr node runs its own
+ * embedded ZooKeeper server, and together they form a quorum.
+ *
+ * @param numServers number of Solr servers (must be at least 3 for quorum)
+ * @param baseDir base directory that the mini cluster should be run from
+ * @param solrXml solr.xml file content
+ * @param jettyConfig Jetty configuration
+ * @param securityJson Optional security.json configuration
+ * @param trackJettyMetrics whether to track Jetty metrics
+ * @throws Exception if there was an error starting the cluster
+ */
+ MiniSolrCloudCluster(
+ int numServers,
+ Path baseDir,
+ String solrXml,
+ JettyConfig jettyConfig,
+ Optional<String> securityJson,
+ boolean trackJettyMetrics,
+ boolean useEmbeddedZkQuorum)
+ throws Exception {
+
+ if (!useEmbeddedZkQuorum) {
+ throw new IllegalArgumentException("This constructor is only for
embedded ZK quorum mode");
+ }
+ if (numServers < 3) {
+ throw new IllegalArgumentException(
+ "ZooKeeper quorum requires at least 3 nodes, got: " + numServers);
+ }
+
+ Objects.requireNonNull(securityJson);
+ this.baseDir = Objects.requireNonNull(baseDir);
+ this.jettyConfig = Objects.requireNonNull(jettyConfig);
+ this.solrXml = solrXml == null ? DEFAULT_CLOUD_SOLR_XML : solrXml;
+ this.trackJettyMetrics = trackJettyMetrics;
+ this.externalZkServer = true; // No ZkTestServer in quorum mode
+ this.zkServer = null; // No single ZK server
+
+ log.info("Starting cluster of {} servers with embedded ZK quorum in {}",
numServers, baseDir);
+ Files.createDirectories(baseDir);
+
+ // Phase 1: Reserve random ports for all nodes
+ int[] ports = reservePortPairs(numServers);
+
+ // Build the zkHost string with all ZK ports (Solr port + 1000)
+ StringBuilder zkHostBuilder = new StringBuilder();
+ for (int i = 0; i < numServers; i++) {
+ if (i > 0) {
+ zkHostBuilder.append(",");
+ }
+ int zkPort = ports[i] + 1000;
+ zkHostBuilder.append("127.0.0.1:").append(zkPort);
+ }
+ this.zkHost = zkHostBuilder.toString(); // Save for later use
+
+ if (log.isInfoEnabled()) {
+ log.info("Reserved ports for {} nodes: {}", numServers,
java.util.Arrays.toString(ports));
+ log.info("ZK connection string: {}", this.zkHost);
+ }
+
+ // Set system properties for embedded ZK quorum mode
+ System.setProperty("solr.zookeeper.server.enabled", "true");
+ System.setProperty("solr.security.manager.enabled", "false");
+ System.setProperty("solr.node.roles",
"data:on,overseer:allowed,zookeeper_quorum:on");
+ System.setProperty("solr.test.sys.prop1", "propone");
+ System.setProperty("solr.test.sys.prop2", "proptwo");
+ System.setProperty("solr.zookeeper.client.timeout", "300000"); // 5 minutes
+
Review Comment:
This constructor sets multiple global `System` properties (e.g.,
`solr.zookeeper.server.enabled`, `solr.security.manager.enabled`,
`solr.node.roles`) but never restores the previous values on shutdown. Since
MiniSolrCloudCluster is used across many tests in the same JVM, these
properties can leak into subsequent tests and change their behavior. Capture
previous values and restore/clear them in `shutdown()` (or avoid using global
sysprops if possible).
##########
solr/core/src/test/org/apache/solr/cloud/TestEmbeddedZkQuorum.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Path;
+import java.util.Locale;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.SolrQuery;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.embedded.JettySolrRunner;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test embedded ZooKeeper running in quorum mode within Solr nodes.
+ *
+ * <p>This test verifies that:
+ *
+ * <ul>
+ * <li>Multiple Solr nodes can start with embedded ZK in quorum mode
+ * <li>The ZK quorum forms correctly
+ * <li>Collections can be created and used
+ * <li>Documents can be indexed and queried
+ * <li>All resources are properly closed on shutdown
+ * </ul>
+ */
[email protected]
+public class TestEmbeddedZkQuorum extends SolrCloudTestCase {
+
+ private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final String COLLECTION_NAME = "test_quorum_collection";
+ private static final int NUM_NODES = 3;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ // Disable ZooKeeper JMX to avoid MBean registration conflicts during
beasting
+ System.setProperty("zookeeper.jmx.log4j.disable", "true");
+
+ // Get path to a test config
+ Path configPath = TEST_PATH().resolve("collection1").resolve("conf");
+
+ // Configure cluster with 3 nodes, each running embedded ZK
+ cluster =
+ configureCluster(NUM_NODES).addConfig("conf1",
configPath).withEmbeddedZkQuorum().build();
+ cluster.waitForAllNodes(60);
+ }
+
+ @Test
+ public void testBasicQuorumFunctionality()
+ throws IOException, InterruptedException, TimeoutException {
+ for (int i = 0; i < NUM_NODES; i++) {
+ JettySolrRunner node = cluster.getJettySolrRunner(i);
+ assertTrue("Node " + i + " should be running", node.isRunning());
+ assertNotNull("Node " + i + " should have a NodeName",
node.getNodeName());
+ }
+ }
+
+ @Test
+ public void testCollectionIndexing() throws Exception {
+ try (CloudSolrClient client = cluster.getSolrClient(COLLECTION_NAME)) {
+ CollectionAdminRequest.Create createCmd =
+ CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf1", 1,
3);
+ createCmd.process(client);
+ cluster.waitForActiveCollection(COLLECTION_NAME, 1, 3);
+
+ // Index some documents
+ for (int i = 0; i < 10; i++) {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", i);
+ doc.addField("title_s", "Test Document " + i);
+ doc.addField("content_t", "This is test content for document " + i);
+ client.add(doc);
+ }
+ client.commit();
+
+ // Query the documents
+ SolrQuery query = new SolrQuery("*:*");
+ query.setRows(100);
+ QueryResponse response = client.query(query);
+ SolrDocumentList results = response.getResults();
+
+ // Verify results
+ assertEquals("Should have 10 documents", 10, results.getNumFound());
+
+ CollectionAdminRequest.Delete deleteCmd =
+ CollectionAdminRequest.deleteCollection(COLLECTION_NAME);
+ deleteCmd.process(client);
+ }
+ }
+
+ /**
+ * Tests ZK quorum resilience when a single node fails and recovers.
+ *
+ * <p>This test verifies that:
+ *
+ * <ul>
+ * <li>A 3-node ZK quorum can lose 1 node and maintain quorum (2/3)
+ * <li>The cluster continues to accept writes with 2 nodes
+ * <li>A failed node can rejoin the quorum using the same ports
+ * <li>All data is preserved after node recovery
+ * </ul>
+ *
+ * <p>This test creates its own private cluster to avoid interfering with
other tests.
+ */
+ @Test
+ public void testQuorumResilienceWithNodeFailure() throws Exception {
+ final String collectionName = "quorum_resilience";
+ final int initialDocs = 5;
+ final int docsWhileDown = 5;
+ final int docsAfterRecovery = 5;
+
+ // Create a private cluster for this test
+ Path configPath = TEST_PATH().resolve("collection1").resolve("conf");
+ MiniSolrCloudCluster privateCluster =
+ configureCluster(NUM_NODES).addConfig("conf1",
configPath).withEmbeddedZkQuorum().build();
+
+ try {
+ privateCluster.waitForAllNodes(60);
+
+ // Create collection with replica on each node
+ CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 3)
+ .process(privateCluster.getSolrClient());
+ privateCluster.waitForActiveCollection(collectionName, 1, 3);
+
+ try (CloudSolrClient client =
privateCluster.getSolrClient(collectionName)) {
+ // Index initial documents and verify
+ indexDocuments(client, 0, initialDocs, "initial");
+ privateCluster.waitForDocCount(
+ collectionName, initialDocs, "initial documents", 120,
TimeUnit.SECONDS);
+
+ // Stop one node (quorum maintained with 2/3 nodes)
+ JettySolrRunner stoppedNode = privateCluster.getJettySolrRunner(2);
+ String stoppedNodeName = stoppedNode.getNodeName();
+ if (log.isInfoEnabled()) {
+ log.info("Stopping node to test quorum resilience: {}",
stoppedNodeName);
+ }
+ privateCluster.stopJettySolrRunner(stoppedNode);
+
+ // Wait for ZK to detect node loss and verify cluster still operational
+ privateCluster.waitForLiveNodes(2, 120);
+ indexDocuments(client, initialDocs, docsWhileDown, "during_failure");
+ privateCluster.waitForDocCount(
+ collectionName,
+ initialDocs + docsWhileDown,
+ "documents while node down",
+ 120,
+ TimeUnit.SECONDS);
+ if (log.isInfoEnabled()) {
+ log.info("Starting node {} again and testing functionality",
stoppedNodeName);
+ }
+
+ privateCluster.startJettySolrRunner(stoppedNode, true);
+ privateCluster.waitForNode(stoppedNode, 120);
+
+ // Wait for cluster to stabilize and verify all nodes running
+ privateCluster.waitForLiveNodes(3, 120);
+
+ // CRITICAL: Wait for collection to become active (replicas up, leader
elected)
+ // before attempting to index documents
+ privateCluster.waitForActiveCollection(collectionName, 120,
TimeUnit.SECONDS, 1, 3);
+
+ privateCluster.waitForDocCount(
+ collectionName,
+ initialDocs + docsWhileDown,
+ "documents after recovery",
+ 120,
+ TimeUnit.SECONDS);
+
+ // Verify full cluster functionality by adding more documents
+ indexDocuments(client, initialDocs + docsWhileDown, docsAfterRecovery,
"after_recovery");
+ privateCluster.waitForDocCount(
+ collectionName,
+ initialDocs + docsWhileDown + docsAfterRecovery,
+ "all documents",
+ 120,
+ TimeUnit.SECONDS);
+ }
+ } finally {
+ CollectionAdminRequest.deleteCollection(collectionName)
+ .process(privateCluster.getSolrClient());
+ privateCluster.shutdown();
+ }
+ }
+
+ /**
+ * Tests ZK quorum loss and recovery when majority of nodes fail.
+ *
+ * <p>This test verifies that:
+ *
+ * <ul>
+ * <li>A 3-node ZK quorum loses quorum when 2 nodes are down (1/3
remaining)
+ * <li>The surviving node maintains its replica but cannot process updates
without quorum
+ * <li>Both failed nodes can be restarted to restore quorum
+ * <li>The cluster becomes operational again (can query and index
documents)
+ * <li>Note: After catastrophic failure, some replicas may need time or
manual intervention to
+ * fully recover
+ * </ul>
+ *
+ * <p>This test creates its own private cluster to avoid interfering with
other tests. <b>Hard to
+ * make this test pass</b>
+ */
+ @AwaitsFix(bugUrl = "https://example.com/foo")
Review Comment:
`@AwaitsFix` uses a placeholder URL (`https://example.com/foo`) and the
Javadoc says the test is "Hard to make this test pass". In this codebase,
`@AwaitsFix` links to a real Jira (see other tests). Please either provide the
real SOLR issue URL explaining the failure mode, or remove/disable the test
until it’s reliable (otherwise this adds a permanently flaky/ignored test).
##########
solr/core/src/java/org/apache/solr/core/ZkContainer.java:
##########
@@ -70,42 +80,105 @@ public class ZkContainer {
public ZkContainer() {}
public void initZooKeeper(final CoreContainer cc, CloudConfig config) {
- boolean zkRun =
EnvUtils.getPropertyAsBool("solr.zookeeper.server.enabled", false);
+ // zkServerEnabled is set whenever in solrCloud mode ('-c') but no
explicit zkHost/ZK_HOST is
+ // provided.
+ final boolean zkServerEnabled =
+ EnvUtils.getPropertyAsBool("solr.zookeeper.server.enabled", false);
+ boolean zkQuorumNode = false;
+ if
(NodeRoles.MODE_ON.equals(cc.nodeRoles.getRoleMode(NodeRoles.Role.ZOOKEEPER_QUORUM)))
{
+ zkQuorumNode = true;
+ log.info("Starting node in ZooKeeper Quorum role.");
+ }
- if (zkRun && config == null)
+ if (zkServerEnabled && config == null) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"Cannot start Solr in cloud mode - no cloud config provided");
+ }
+
+ if (config == null) {
+ log.info("Solr is running in standalone mode");
+ return;
+ }
- if (config == null) return; // not in zk mode
+ final boolean runAsQuorum = config.getZkHost() != null && zkQuorumNode;
String zookeeperHost = config.getZkHost();
+ final var solrHome = cc.getSolrHome();
+ if (zkServerEnabled) {
+ if (!runAsQuorum) {
+ // Old school ZooKeeperServerMain being used under the covers.
+ zkServer =
+ SolrZkServer.createAndStart(config.getZkHost(), solrHome,
config.getSolrHostPort());
+
+ // set client from server config if not already set
+ if (zookeeperHost == null) {
+ zookeeperHost = zkServer.getClientString();
+ }
+ } else {
+ // ZooKeeperServerEmbedded being used under the covers.
+ // Figure out where to put zoo-data
+ final var zkHomeDir = solrHome.resolve("zoo_home");
+ final var zkDataDir = zkHomeDir.resolve("data");
+
+ // Populate a zoo.cfg
+ final String zooCfgTemplate =
+ ""
+ + "tickTime=2000\n"
+ + "initLimit=10\n"
+ + "syncLimit=5\n"
+ + "dataDir=@@DATA_DIR@@\n"
+ + "4lw.commands.whitelist=mntr,conf,ruok\n"
+ + "admin.enableServer=false\n"
+ + "clientPort=@@ZK_CLIENT_PORT@@\n";
+
+ final int zkPort = config.getSolrHostPort() + 1000;
+ String zooCfgContents =
+ zooCfgTemplate
+ .replace("@@DATA_DIR@@", zkDataDir.toString())
+ .replace("@@ZK_CLIENT_PORT@@", String.valueOf(zkPort));
+ final String[] zkHosts = config.getZkHost().split(",");
+ int myId = -1;
+ final String targetConnStringSection = config.getHost() + ":" + zkPort;
+ if (log.isInfoEnabled()) {
+ log.info(
+ "Trying to match {} against zkHostString {} to determine myid",
+ targetConnStringSection,
+ config.getZkHost());
+ }
+ for (int i = 0; i < zkHosts.length; i++) {
+ final String host = zkHosts[i];
+ if (targetConnStringSection.equals(zkHosts[i])) {
+ myId = (i + 1);
+ }
+ final var hostComponents = host.split(":");
+ final var zkServer = hostComponents[0];
+ final var zkClientPort = Integer.valueOf(hostComponents[1]);
+ final var zkQuorumPort = zkClientPort - 4000;
+ final var zkLeaderPort = zkClientPort - 3000;
Review Comment:
The quorum/leader ports are derived by subtracting constants (`clientPort -
4000` / `- 3000`). This doesn’t match the port scheme used elsewhere in Solr’s
embedded-ZK code (`SolrZkServerProps.injectServers` uses `clientPort+1` and
`clientPort+2`) and can produce invalid/negative ports depending on the client
port. This will prevent the ZooKeeper quorum from forming reliably. Use an
explicit, consistent mapping for peer/election ports (and ensure those ports
are available).
##########
solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java:
##########
@@ -340,6 +346,274 @@ public MiniSolrCloudCluster(
}
}
+ /**
+ * Create a MiniSolrCloudCluster with embedded ZooKeeper quorum mode. Each
Solr node runs its own
+ * embedded ZooKeeper server, and together they form a quorum.
+ *
+ * @param numServers number of Solr servers (must be at least 3 for quorum)
+ * @param baseDir base directory that the mini cluster should be run from
+ * @param solrXml solr.xml file content
+ * @param jettyConfig Jetty configuration
+ * @param securityJson Optional security.json configuration
+ * @param trackJettyMetrics whether to track Jetty metrics
+ * @throws Exception if there was an error starting the cluster
+ */
+ MiniSolrCloudCluster(
+ int numServers,
+ Path baseDir,
+ String solrXml,
+ JettyConfig jettyConfig,
+ Optional<String> securityJson,
+ boolean trackJettyMetrics,
+ boolean useEmbeddedZkQuorum)
+ throws Exception {
+
+ if (!useEmbeddedZkQuorum) {
+ throw new IllegalArgumentException("This constructor is only for
embedded ZK quorum mode");
+ }
+ if (numServers < 3) {
+ throw new IllegalArgumentException(
+ "ZooKeeper quorum requires at least 3 nodes, got: " + numServers);
+ }
+
+ Objects.requireNonNull(securityJson);
+ this.baseDir = Objects.requireNonNull(baseDir);
+ this.jettyConfig = Objects.requireNonNull(jettyConfig);
+ this.solrXml = solrXml == null ? DEFAULT_CLOUD_SOLR_XML : solrXml;
+ this.trackJettyMetrics = trackJettyMetrics;
+ this.externalZkServer = true; // No ZkTestServer in quorum mode
+ this.zkServer = null; // No single ZK server
+
+ log.info("Starting cluster of {} servers with embedded ZK quorum in {}",
numServers, baseDir);
+ Files.createDirectories(baseDir);
+
+ // Phase 1: Reserve random ports for all nodes
+ int[] ports = reservePortPairs(numServers);
+
+ // Build the zkHost string with all ZK ports (Solr port + 1000)
+ StringBuilder zkHostBuilder = new StringBuilder();
+ for (int i = 0; i < numServers; i++) {
+ if (i > 0) {
+ zkHostBuilder.append(",");
+ }
+ int zkPort = ports[i] + 1000;
+ zkHostBuilder.append("127.0.0.1:").append(zkPort);
+ }
+ this.zkHost = zkHostBuilder.toString(); // Save for later use
+
+ if (log.isInfoEnabled()) {
+ log.info("Reserved ports for {} nodes: {}", numServers,
java.util.Arrays.toString(ports));
+ log.info("ZK connection string: {}", this.zkHost);
+ }
+
+ // Set system properties for embedded ZK quorum mode
+ System.setProperty("solr.zookeeper.server.enabled", "true");
+ System.setProperty("solr.security.manager.enabled", "false");
+ System.setProperty("solr.node.roles",
"data:on,overseer:allowed,zookeeper_quorum:on");
+ System.setProperty("solr.test.sys.prop1", "propone");
+ System.setProperty("solr.test.sys.prop2", "proptwo");
+ System.setProperty("solr.zookeeper.client.timeout", "300000"); // 5 minutes
+
+ // Phase 2: Start all nodes in parallel
+ List<Callable<JettySolrRunner>> startups = new ArrayList<>(numServers);
+ for (int i = 0; i < numServers; i++) {
+ final int solrPort = ports[i];
+ final String nodeName = newNodeName();
+ startups.add(
+ () -> {
+ Path runnerPath = createInstancePath(nodeName);
+ Files.write(runnerPath.resolve("solr.xml"),
solrXml.getBytes(StandardCharsets.UTF_8));
+
+ Properties nodeProps = new Properties();
+ nodeProps.setProperty("zkHost", this.zkHost);
+ nodeProps.setProperty("hostPort", String.valueOf(solrPort));
+
+ JettyConfig newConfig =
JettyConfig.builder(jettyConfig).setPort(solrPort).build();
+
+ JettySolrRunner jetty =
+ !trackJettyMetrics
+ ? new JettySolrRunner(runnerPath.toString(), nodeProps,
newConfig)
+ : new JettySolrRunnerWithMetrics(runnerPath.toString(),
nodeProps, newConfig);
+
+ int zkPort = solrPort + 1000;
+ log.info("Starting {} on port {} with ZK on port {}", nodeName,
solrPort, zkPort);
+ jetty.start();
+ log.info("Node {} started successfully", nodeName);
+
+ jettys.add(jetty);
+ synchronized (startupWait) {
+ startupWait.notifyAll();
+ }
+ return jetty;
+ });
+ }
+
+ final ExecutorService executorLauncher =
+ ExecutorUtil.newMDCAwareCachedThreadPool(new
SolrNamedThreadFactory("jetty-launcher"));
+ Collection<Future<JettySolrRunner>> futures =
executorLauncher.invokeAll(startups);
+ ExecutorUtil.shutdownAndAwaitTermination(executorLauncher);
+ Exception startupError =
+ checkForExceptions(
+ "Error starting up MiniSolrCloudCluster with embedded ZK quorum",
futures);
+ if (startupError != null) {
+ try {
+ this.shutdown();
+ } catch (Throwable t) {
+ startupError.addSuppressed(t);
+ }
+ throw startupError;
+ }
+
+ log.info("All {} nodes started, waiting for quorum formation...",
numServers);
+ Thread.sleep(10000); // Wait for ZK quorum to fully form
+
+ // Initialize ZK paths and security (if provided)
+ try (SolrZkClient zkClient =
+ new SolrZkClient.Builder()
+ .withUrl(this.zkHost)
+ .withTimeout(60000, TimeUnit.MILLISECONDS)
+ .build()) {
+ if (!zkClient.exists("/solr")) {
+ zkClient.makePath("/solr", true);
+ }
+
+ if (jettyConfig.sslConfig != null && jettyConfig.sslConfig.isSSLMode()) {
+ zkClient.makePath(
+ "/solr" + ZkStateReader.CLUSTER_PROPS,
+ "{'urlScheme':'https'}".getBytes(StandardCharsets.UTF_8),
+ true);
+ }
+ if (securityJson.isPresent()) {
+ zkClient.makePath(
+ "/solr/security.json",
securityJson.get().getBytes(Charset.defaultCharset()), true);
+ }
+ }
+
+ solrClient = buildSolrClientForQuorum(this.zkHost);
+
+ if (numServers > 0) {
+ waitForAllNodes(numServers, 60);
+ }
+
+ log.info("Embedded ZK quorum cluster started successfully with {} nodes",
numServers);
+ }
+
+ /**
+ * Reserves port pairs for embedded ZK quorum mode. For each node, we need
both a Solr port and a
+ * ZK port (Solr port + 1000). This method ensures both ports in each pair
are available before
+ * returning.
+ *
+ * <p>The method keeps all ServerSockets open during the search to prevent
race conditions where
+ * another process might grab a port between our check and actual usage.
+ *
+ * @param numPairs the number of port pairs to reserve
+ * @return array of Solr ports (ZK ports are Solr port + 1000)
+ * @throws IOException if unable to find enough available port pairs
+ */
+ private int[] reservePortPairs(int numPairs) throws IOException {
+ List<ServerSocket> solrSockets = new ArrayList<>();
+ List<ServerSocket> zkSockets = new ArrayList<>();
+ int[] ports = new int[numPairs];
+
+ try {
+ int pairsFound = 0;
+ int maxAttempts = numPairs * 100; // Reasonable limit to avoid infinite
loops
+ int attempts = 0;
+
+ while (pairsFound < numPairs && attempts < maxAttempts) {
+ attempts++;
+ ServerSocket solrSocket = null;
+ ServerSocket zkSocket = null;
+
+ try {
+ // Try to get a random available port for Solr
+ solrSocket = new ServerSocket(0);
+ int solrPort = solrSocket.getLocalPort();
+ int zkPort = solrPort + 1000;
+
+ // Check if ZK port would exceed the valid port range (0-65535)
+ if (zkPort > 65535) {
+ solrSocket.close();
+ continue; // Skip this port and try again
+ }
+
+ // Verify the corresponding ZK port is also available
+ zkSocket = new ServerSocket(zkPort);
+
+ // Both ports are available - keep the sockets and record the port
+ solrSockets.add(solrSocket);
+ zkSockets.add(zkSocket);
+ ports[pairsFound] = solrPort;
+ pairsFound++;
+
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Reserved port pair {}/{}: Solr={}, ZK={}", pairsFound,
numPairs, solrPort, zkPort);
+ }
+
+ } catch (IOException | IllegalArgumentException e) {
+ // ZK port was not available or invalid, close sockets and try again
+ if (solrSocket != null) {
+ try {
+ solrSocket.close();
+ } catch (IOException ignored) {
+ }
+ }
+ if (zkSocket != null) {
+ try {
+ zkSocket.close();
+ } catch (IOException ignored) {
+ }
+ }
Review Comment:
`reservePortPairs` only verifies availability of the Solr port and the
ZooKeeper client port (`solrPort+1000`). A ZooKeeper quorum peer also needs
additional ports for quorum/election traffic (typically `clientPort+1` and
`clientPort+2`). If those ports are in use, the embedded ZK server can fail to
bind even though this method succeeded; consider reserving/checking those ports
too (or making the port scheme explicit).
##########
solr/core/src/java/org/apache/solr/core/ZkContainer.java:
##########
@@ -70,42 +80,105 @@ public class ZkContainer {
public ZkContainer() {}
public void initZooKeeper(final CoreContainer cc, CloudConfig config) {
- boolean zkRun =
EnvUtils.getPropertyAsBool("solr.zookeeper.server.enabled", false);
+ // zkServerEnabled is set whenever in solrCloud mode ('-c') but no
explicit zkHost/ZK_HOST is
+ // provided.
+ final boolean zkServerEnabled =
+ EnvUtils.getPropertyAsBool("solr.zookeeper.server.enabled", false);
+ boolean zkQuorumNode = false;
+ if
(NodeRoles.MODE_ON.equals(cc.nodeRoles.getRoleMode(NodeRoles.Role.ZOOKEEPER_QUORUM)))
{
+ zkQuorumNode = true;
+ log.info("Starting node in ZooKeeper Quorum role.");
+ }
- if (zkRun && config == null)
+ if (zkServerEnabled && config == null) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"Cannot start Solr in cloud mode - no cloud config provided");
+ }
+
+ if (config == null) {
+ log.info("Solr is running in standalone mode");
+ return;
+ }
- if (config == null) return; // not in zk mode
+ final boolean runAsQuorum = config.getZkHost() != null && zkQuorumNode;
String zookeeperHost = config.getZkHost();
+ final var solrHome = cc.getSolrHome();
+ if (zkServerEnabled) {
+ if (!runAsQuorum) {
+ // Old school ZooKeeperServerMain being used under the covers.
+ zkServer =
+ SolrZkServer.createAndStart(config.getZkHost(), solrHome,
config.getSolrHostPort());
+
+ // set client from server config if not already set
+ if (zookeeperHost == null) {
+ zookeeperHost = zkServer.getClientString();
+ }
+ } else {
+ // ZooKeeperServerEmbedded being used under the covers.
+ // Figure out where to put zoo-data
+ final var zkHomeDir = solrHome.resolve("zoo_home");
+ final var zkDataDir = zkHomeDir.resolve("data");
+
+ // Populate a zoo.cfg
+ final String zooCfgTemplate =
+ ""
+ + "tickTime=2000\n"
+ + "initLimit=10\n"
+ + "syncLimit=5\n"
+ + "dataDir=@@DATA_DIR@@\n"
+ + "4lw.commands.whitelist=mntr,conf,ruok\n"
+ + "admin.enableServer=false\n"
+ + "clientPort=@@ZK_CLIENT_PORT@@\n";
+
+ final int zkPort = config.getSolrHostPort() + 1000;
+ String zooCfgContents =
+ zooCfgTemplate
+ .replace("@@DATA_DIR@@", zkDataDir.toString())
+ .replace("@@ZK_CLIENT_PORT@@", String.valueOf(zkPort));
+ final String[] zkHosts = config.getZkHost().split(",");
+ int myId = -1;
+ final String targetConnStringSection = config.getHost() + ":" + zkPort;
+ if (log.isInfoEnabled()) {
+ log.info(
+ "Trying to match {} against zkHostString {} to determine myid",
+ targetConnStringSection,
+ config.getZkHost());
+ }
+ for (int i = 0; i < zkHosts.length; i++) {
+ final String host = zkHosts[i];
+ if (targetConnStringSection.equals(zkHosts[i])) {
+ myId = (i + 1);
+ }
+ final var hostComponents = host.split(":");
+ final var zkServer = hostComponents[0];
+ final var zkClientPort = Integer.valueOf(hostComponents[1]);
+ final var zkQuorumPort = zkClientPort - 4000;
+ final var zkLeaderPort = zkClientPort - 3000;
+ final String configEntry =
+ "server." + (i + 1) + "=" + zkServer + ":" + zkQuorumPort + ":"
+ zkLeaderPort + "\n";
+ zooCfgContents = zooCfgContents + configEntry;
+ }
+
+ if (myId == -1) {
+ throw new IllegalStateException(
+ "Unable to determine ZK 'myid' for target " +
targetConnStringSection);
+ }
Review Comment:
`myId` detection requires an exact string match between `config.getHost()`
and the host portion of `zkHost` entries. This will fail for common cases like
`zkHost` using `localhost` while Solr’s configured host is `127.0.0.1`, causing
startup to abort with `Unable to determine ZK 'myid'`. Consider matching by
port (and/or resolving hostnames/loopback equivalence) instead of requiring
exact host string equality.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]