Copilot commented on code in PR #2391:
URL: https://github.com/apache/solr/pull/2391#discussion_r3215665335


##########
solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java:
##########
@@ -340,6 +346,274 @@ public MiniSolrCloudCluster(
     }
   }
 
+  /**
+   * Create a MiniSolrCloudCluster with embedded ZooKeeper quorum mode. Each 
Solr node runs its own
+   * embedded ZooKeeper server, and together they form a quorum.
+   *
+   * @param numServers number of Solr servers (must be at least 3 for quorum)
+   * @param baseDir base directory that the mini cluster should be run from
+   * @param solrXml solr.xml file content
+   * @param jettyConfig Jetty configuration
+   * @param securityJson Optional security.json configuration
+   * @param trackJettyMetrics whether to track Jetty metrics
+   * @throws Exception if there was an error starting the cluster
+   */
+  MiniSolrCloudCluster(
+      int numServers,
+      Path baseDir,
+      String solrXml,
+      JettyConfig jettyConfig,
+      Optional<String> securityJson,
+      boolean trackJettyMetrics,
+      boolean useEmbeddedZkQuorum)
+      throws Exception {
+
+    if (!useEmbeddedZkQuorum) {
+      throw new IllegalArgumentException("This constructor is only for 
embedded ZK quorum mode");
+    }
+    if (numServers < 3) {
+      throw new IllegalArgumentException(
+          "ZooKeeper quorum requires at least 3 nodes, got: " + numServers);
+    }
+
+    Objects.requireNonNull(securityJson);
+    this.baseDir = Objects.requireNonNull(baseDir);
+    this.jettyConfig = Objects.requireNonNull(jettyConfig);
+    this.solrXml = solrXml == null ? DEFAULT_CLOUD_SOLR_XML : solrXml;
+    this.trackJettyMetrics = trackJettyMetrics;
+    this.externalZkServer = true; // No ZkTestServer in quorum mode
+    this.zkServer = null; // No single ZK server
+
+    log.info("Starting cluster of {} servers with embedded ZK quorum in {}", 
numServers, baseDir);
+    Files.createDirectories(baseDir);
+
+    // Phase 1: Reserve random ports for all nodes
+    int[] ports = reservePortPairs(numServers);
+
+    // Build the zkHost string with all ZK ports (Solr port + 1000)
+    StringBuilder zkHostBuilder = new StringBuilder();
+    for (int i = 0; i < numServers; i++) {
+      if (i > 0) {
+        zkHostBuilder.append(",");
+      }
+      int zkPort = ports[i] + 1000;
+      zkHostBuilder.append("127.0.0.1:").append(zkPort);
+    }
+    this.zkHost = zkHostBuilder.toString(); // Save for later use
+
+    if (log.isInfoEnabled()) {
+      log.info("Reserved ports for {} nodes: {}", numServers, 
java.util.Arrays.toString(ports));
+      log.info("ZK connection string: {}", this.zkHost);
+    }
+
+    // Set system properties for embedded ZK quorum mode
+    System.setProperty("solr.zookeeper.server.enabled", "true");
+    System.setProperty("solr.security.manager.enabled", "false");
+    System.setProperty("solr.node.roles", 
"data:on,overseer:allowed,zookeeper_quorum:on");
+    System.setProperty("solr.test.sys.prop1", "propone");
+    System.setProperty("solr.test.sys.prop2", "proptwo");
+    System.setProperty("solr.zookeeper.client.timeout", "300000"); // 5 minutes
+
+    // Phase 2: Start all nodes in parallel
+    List<Callable<JettySolrRunner>> startups = new ArrayList<>(numServers);
+    for (int i = 0; i < numServers; i++) {
+      final int solrPort = ports[i];
+      final String nodeName = newNodeName();
+      startups.add(
+          () -> {
+            Path runnerPath = createInstancePath(nodeName);
+            Files.write(runnerPath.resolve("solr.xml"), 
solrXml.getBytes(StandardCharsets.UTF_8));
+
+            Properties nodeProps = new Properties();
+            nodeProps.setProperty("zkHost", this.zkHost);
+            nodeProps.setProperty("hostPort", String.valueOf(solrPort));
+
+            JettyConfig newConfig = 
JettyConfig.builder(jettyConfig).setPort(solrPort).build();
+
+            JettySolrRunner jetty =
+                !trackJettyMetrics
+                    ? new JettySolrRunner(runnerPath.toString(), nodeProps, 
newConfig)
+                    : new JettySolrRunnerWithMetrics(runnerPath.toString(), 
nodeProps, newConfig);
+
+            int zkPort = solrPort + 1000;
+            log.info("Starting {} on port {} with ZK on port {}", nodeName, 
solrPort, zkPort);
+            jetty.start();
+            log.info("Node {} started successfully", nodeName);
+
+            jettys.add(jetty);
+            synchronized (startupWait) {
+              startupWait.notifyAll();
+            }
+            return jetty;
+          });
+    }
+
+    final ExecutorService executorLauncher =
+        ExecutorUtil.newMDCAwareCachedThreadPool(new 
SolrNamedThreadFactory("jetty-launcher"));
+    Collection<Future<JettySolrRunner>> futures = 
executorLauncher.invokeAll(startups);
+    ExecutorUtil.shutdownAndAwaitTermination(executorLauncher);
+    Exception startupError =
+        checkForExceptions(
+            "Error starting up MiniSolrCloudCluster with embedded ZK quorum", 
futures);
+    if (startupError != null) {
+      try {
+        this.shutdown();
+      } catch (Throwable t) {
+        startupError.addSuppressed(t);
+      }
+      throw startupError;
+    }
+
+    log.info("All {} nodes started, waiting for quorum formation...", 
numServers);
+    Thread.sleep(10000); // Wait for ZK quorum to fully form
+
+    // Initialize ZK paths and security (if provided)
+    try (SolrZkClient zkClient =
+        new SolrZkClient.Builder()
+            .withUrl(this.zkHost)
+            .withTimeout(60000, TimeUnit.MILLISECONDS)
+            .build()) {
+      if (!zkClient.exists("/solr")) {
+        zkClient.makePath("/solr", true);
+      }
+
+      if (jettyConfig.sslConfig != null && jettyConfig.sslConfig.isSSLMode()) {
+        zkClient.makePath(
+            "/solr" + ZkStateReader.CLUSTER_PROPS,
+            "{'urlScheme':'https'}".getBytes(StandardCharsets.UTF_8),
+            true);
+      }
+      if (securityJson.isPresent()) {
+        zkClient.makePath(
+            "/solr/security.json", 
securityJson.get().getBytes(Charset.defaultCharset()), true);
+      }
+    }
+
+    solrClient = buildSolrClientForQuorum(this.zkHost);
+
+    if (numServers > 0) {
+      waitForAllNodes(numServers, 60);
+    }
+
+    log.info("Embedded ZK quorum cluster started successfully with {} nodes", 
numServers);
+  }
+
+  /**
+   * Reserves port pairs for embedded ZK quorum mode. For each node, we need 
both a Solr port and a
+   * ZK port (Solr port + 1000). This method ensures both ports in each pair 
are available before
+   * returning.
+   *
+   * <p>The method keeps all ServerSockets open during the search to prevent 
race conditions where
+   * another process might grab a port between our check and actual usage.
+   *
+   * @param numPairs the number of port pairs to reserve
+   * @return array of Solr ports (ZK ports are Solr port + 1000)
+   * @throws IOException if unable to find enough available port pairs
+   */
+  private int[] reservePortPairs(int numPairs) throws IOException {
+    List<ServerSocket> solrSockets = new ArrayList<>();
+    List<ServerSocket> zkSockets = new ArrayList<>();
+    int[] ports = new int[numPairs];
+
+    try {
+      int pairsFound = 0;
+      int maxAttempts = numPairs * 100; // Reasonable limit to avoid infinite 
loops
+      int attempts = 0;
+
+      while (pairsFound < numPairs && attempts < maxAttempts) {
+        attempts++;
+        ServerSocket solrSocket = null;
+        ServerSocket zkSocket = null;
+
+        try {
+          // Try to get a random available port for Solr
+          solrSocket = new ServerSocket(0);
+          int solrPort = solrSocket.getLocalPort();
+          int zkPort = solrPort + 1000;
+
+          // Check if ZK port would exceed the valid port range (0-65535)
+          if (zkPort > 65535) {
+            solrSocket.close();
+            continue; // Skip this port and try again
+          }
+
+          // Verify the corresponding ZK port is also available
+          zkSocket = new ServerSocket(zkPort);
+
+          // Both ports are available - keep the sockets and record the port
+          solrSockets.add(solrSocket);
+          zkSockets.add(zkSocket);
+          ports[pairsFound] = solrPort;
+          pairsFound++;
+
+          if (log.isDebugEnabled()) {
+            log.debug(
+                "Reserved port pair {}/{}: Solr={}, ZK={}", pairsFound, 
numPairs, solrPort, zkPort);
+          }
+
+        } catch (IOException | IllegalArgumentException e) {
+          // ZK port was not available or invalid, close sockets and try again
+          if (solrSocket != null) {
+            try {
+              solrSocket.close();
+            } catch (IOException ignored) {
+            }
+          }
+          if (zkSocket != null) {
+            try {
+              zkSocket.close();
+            } catch (IOException ignored) {
+            }
+          }
+        }
+      }
+
+      if (pairsFound < numPairs) {
+        throw new IOException(
+            "Unable to find " + numPairs + " available port pairs after " + 
attempts + " attempts");
+      }
+      return ports;
+
+    } finally {
+      // Close all sockets now that we've recorded the ports
+      // The ports will remain available for immediate reuse
+      for (ServerSocket socket : solrSockets) {
+        try {
+          socket.close();
+        } catch (IOException e) {
+          log.warn("Error closing Solr socket", e);
+        }
+      }
+      for (ServerSocket socket : zkSockets) {
+        try {
+          socket.close();
+        } catch (IOException e) {
+          log.warn("Error closing ZK socket", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Get the ZK connection string. Works for both standard mode (using 
zkServer) and quorum mode
+   * (using zkHost field).
+   *
+   * @return ZK connection string
+   */
+  private String getZkAddress() {
+    if (zkHost != null) {
+      return zkHost; // Quorum mode
+    }
+    return zkServer.getZkAddress(); // Standard mode

Review Comment:
   `zkServer` is null in embedded-quorum mode, but `newSolrClient(String)` 
still calls `getZkServer().getZkAddress()` (see `newSolrClient` around line 
~1079). That will throw NPE for any caller using `cluster.newSolrClient(...)` 
with quorum mode enabled. Update `newSolrClient` (and any other callers) to use 
`getZkAddress()` instead of `getZkServer()` and/or guard against `zkServer == 
null`.
   



##########
solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java:
##########
@@ -389,6 +663,87 @@ public void waitForNode(JettySolrRunner jetty, int 
timeoutSeconds)
             timeoutSeconds, TimeUnit.SECONDS, (o, n) -> n != null && 
n.contains(nodeName));
   }
 
+  /**
+   * Wait for the expected number of live nodes in the cluster.
+   *
+   * @param expectedCount expected number of live nodes
+   * @param timeoutSeconds timeout in seconds
+   * @throws InterruptedException if interrupted while waiting
+   * @throws TimeoutException if the expected count is not reached within the 
timeout
+   */
+  public void waitForLiveNodes(int expectedCount, int timeoutSeconds)
+      throws InterruptedException, TimeoutException {
+    TimeOut timeout = new TimeOut(timeoutSeconds, TimeUnit.SECONDS, 
TimeSource.NANO_TIME);
+    while (!timeout.hasTimedOut()) {
+      long runningNodes = 
jettys.stream().filter(JettySolrRunner::isRunning).count();
+      if (runningNodes == expectedCount) {
+        log.info("Verified {} live nodes", runningNodes);
+        return;
+      }
+      Thread.sleep(200);
+    }
+    // Final check after timeout
+    long actualCount = 
jettys.stream().filter(JettySolrRunner::isRunning).count();
+    throw new TimeoutException(
+        "Live node count mismatch: expected " + expectedCount + " but got " + 
actualCount);

Review Comment:
   This `waitForLiveNodes` implementation counts locally running Jetty 
processes, but the method name/usage implies ZooKeeper `live_nodes` (and 
callers use it to wait for node loss/join). Since `stopJettySolrRunner(...)` 
removes jettys from the list, this can return immediately without waiting for 
ZooKeeper state to update. Consider implementing this using 
`getZkStateReader().waitForLiveNodes(...)` (similar to `waitForJettyToStop`) 
and/or rename to reflect what it actually checks.
   



##########
solr/core/src/test/org/apache/solr/cloud/TestEmbeddedZkQuorum.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Path;
+import java.util.Locale;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.SolrQuery;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.embedded.JettySolrRunner;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test embedded ZooKeeper running in quorum mode within Solr nodes.
+ *
+ * <p>This test verifies that:
+ *
+ * <ul>
+ *   <li>Multiple Solr nodes can start with embedded ZK in quorum mode
+ *   <li>The ZK quorum forms correctly
+ *   <li>Collections can be created and used
+ *   <li>Documents can be indexed and queried
+ *   <li>All resources are properly closed on shutdown
+ * </ul>
+ */
[email protected]
+public class TestEmbeddedZkQuorum extends SolrCloudTestCase {
+
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final String COLLECTION_NAME = "test_quorum_collection";
+  private static final int NUM_NODES = 3;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    // Disable ZooKeeper JMX to avoid MBean registration conflicts during 
beasting

Review Comment:
   Typo in comment: "during beasting" should be "during testing" (or similar).
   



##########
solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java:
##########
@@ -340,6 +346,274 @@ public MiniSolrCloudCluster(
     }
   }
 
+  /**
+   * Create a MiniSolrCloudCluster with embedded ZooKeeper quorum mode. Each 
Solr node runs its own
+   * embedded ZooKeeper server, and together they form a quorum.
+   *
+   * @param numServers number of Solr servers (must be at least 3 for quorum)
+   * @param baseDir base directory that the mini cluster should be run from
+   * @param solrXml solr.xml file content
+   * @param jettyConfig Jetty configuration
+   * @param securityJson Optional security.json configuration
+   * @param trackJettyMetrics whether to track Jetty metrics
+   * @throws Exception if there was an error starting the cluster
+   */
+  MiniSolrCloudCluster(
+      int numServers,
+      Path baseDir,
+      String solrXml,
+      JettyConfig jettyConfig,
+      Optional<String> securityJson,
+      boolean trackJettyMetrics,
+      boolean useEmbeddedZkQuorum)
+      throws Exception {
+
+    if (!useEmbeddedZkQuorum) {
+      throw new IllegalArgumentException("This constructor is only for 
embedded ZK quorum mode");
+    }
+    if (numServers < 3) {
+      throw new IllegalArgumentException(
+          "ZooKeeper quorum requires at least 3 nodes, got: " + numServers);
+    }
+
+    Objects.requireNonNull(securityJson);
+    this.baseDir = Objects.requireNonNull(baseDir);
+    this.jettyConfig = Objects.requireNonNull(jettyConfig);
+    this.solrXml = solrXml == null ? DEFAULT_CLOUD_SOLR_XML : solrXml;
+    this.trackJettyMetrics = trackJettyMetrics;
+    this.externalZkServer = true; // No ZkTestServer in quorum mode
+    this.zkServer = null; // No single ZK server
+
+    log.info("Starting cluster of {} servers with embedded ZK quorum in {}", 
numServers, baseDir);
+    Files.createDirectories(baseDir);
+
+    // Phase 1: Reserve random ports for all nodes
+    int[] ports = reservePortPairs(numServers);
+
+    // Build the zkHost string with all ZK ports (Solr port + 1000)
+    StringBuilder zkHostBuilder = new StringBuilder();
+    for (int i = 0; i < numServers; i++) {
+      if (i > 0) {
+        zkHostBuilder.append(",");
+      }
+      int zkPort = ports[i] + 1000;
+      zkHostBuilder.append("127.0.0.1:").append(zkPort);
+    }
+    this.zkHost = zkHostBuilder.toString(); // Save for later use
+
+    if (log.isInfoEnabled()) {
+      log.info("Reserved ports for {} nodes: {}", numServers, 
java.util.Arrays.toString(ports));
+      log.info("ZK connection string: {}", this.zkHost);
+    }
+
+    // Set system properties for embedded ZK quorum mode
+    System.setProperty("solr.zookeeper.server.enabled", "true");
+    System.setProperty("solr.security.manager.enabled", "false");
+    System.setProperty("solr.node.roles", 
"data:on,overseer:allowed,zookeeper_quorum:on");
+    System.setProperty("solr.test.sys.prop1", "propone");
+    System.setProperty("solr.test.sys.prop2", "proptwo");
+    System.setProperty("solr.zookeeper.client.timeout", "300000"); // 5 minutes
+
+    // Phase 2: Start all nodes in parallel
+    List<Callable<JettySolrRunner>> startups = new ArrayList<>(numServers);
+    for (int i = 0; i < numServers; i++) {
+      final int solrPort = ports[i];
+      final String nodeName = newNodeName();
+      startups.add(
+          () -> {
+            Path runnerPath = createInstancePath(nodeName);
+            Files.write(runnerPath.resolve("solr.xml"), 
solrXml.getBytes(StandardCharsets.UTF_8));
+
+            Properties nodeProps = new Properties();
+            nodeProps.setProperty("zkHost", this.zkHost);
+            nodeProps.setProperty("hostPort", String.valueOf(solrPort));
+
+            JettyConfig newConfig = 
JettyConfig.builder(jettyConfig).setPort(solrPort).build();
+
+            JettySolrRunner jetty =
+                !trackJettyMetrics
+                    ? new JettySolrRunner(runnerPath.toString(), nodeProps, 
newConfig)
+                    : new JettySolrRunnerWithMetrics(runnerPath.toString(), 
nodeProps, newConfig);
+
+            int zkPort = solrPort + 1000;
+            log.info("Starting {} on port {} with ZK on port {}", nodeName, 
solrPort, zkPort);
+            jetty.start();
+            log.info("Node {} started successfully", nodeName);
+
+            jettys.add(jetty);
+            synchronized (startupWait) {
+              startupWait.notifyAll();
+            }
+            return jetty;
+          });
+    }
+
+    final ExecutorService executorLauncher =
+        ExecutorUtil.newMDCAwareCachedThreadPool(new 
SolrNamedThreadFactory("jetty-launcher"));
+    Collection<Future<JettySolrRunner>> futures = 
executorLauncher.invokeAll(startups);
+    ExecutorUtil.shutdownAndAwaitTermination(executorLauncher);
+    Exception startupError =
+        checkForExceptions(
+            "Error starting up MiniSolrCloudCluster with embedded ZK quorum", 
futures);
+    if (startupError != null) {
+      try {
+        this.shutdown();
+      } catch (Throwable t) {
+        startupError.addSuppressed(t);
+      }
+      throw startupError;
+    }
+
+    log.info("All {} nodes started, waiting for quorum formation...", 
numServers);
+    Thread.sleep(10000); // Wait for ZK quorum to fully form
+
+    // Initialize ZK paths and security (if provided)
+    try (SolrZkClient zkClient =
+        new SolrZkClient.Builder()
+            .withUrl(this.zkHost)
+            .withTimeout(60000, TimeUnit.MILLISECONDS)
+            .build()) {
+      if (!zkClient.exists("/solr")) {
+        zkClient.makePath("/solr", true);
+      }
+
+      if (jettyConfig.sslConfig != null && jettyConfig.sslConfig.isSSLMode()) {
+        zkClient.makePath(
+            "/solr" + ZkStateReader.CLUSTER_PROPS,
+            "{'urlScheme':'https'}".getBytes(StandardCharsets.UTF_8),
+            true);
+      }
+      if (securityJson.isPresent()) {
+        zkClient.makePath(
+            "/solr/security.json", 
securityJson.get().getBytes(Charset.defaultCharset()), true);
+      }
+    }
+
+    solrClient = buildSolrClientForQuorum(this.zkHost);
+
+    if (numServers > 0) {
+      waitForAllNodes(numServers, 60);
+    }
+
+    log.info("Embedded ZK quorum cluster started successfully with {} nodes", 
numServers);
+  }
+
+  /**
+   * Reserves port pairs for embedded ZK quorum mode. For each node, we need 
both a Solr port and a
+   * ZK port (Solr port + 1000). This method ensures both ports in each pair 
are available before
+   * returning.
+   *
+   * <p>The method keeps all ServerSockets open during the search to prevent 
race conditions where
+   * another process might grab a port between our check and actual usage.
+   *

Review Comment:
   The Javadoc claims the method "keeps all ServerSockets open" to prevent 
races, but the `finally` block closes all sockets before the caller starts 
Jetty/ZK. Closing the sockets releases the ports, so another process/thread can 
still grab them before use. Either keep the sockets open until after the 
servers have bound (and close them later), or adjust the comment/approach 
(e.g., let Jetty/ZK bind to ephemeral ports and discover them).



##########
solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java:
##########
@@ -340,6 +346,274 @@ public MiniSolrCloudCluster(
     }
   }
 
+  /**
+   * Create a MiniSolrCloudCluster with embedded ZooKeeper quorum mode. Each 
Solr node runs its own
+   * embedded ZooKeeper server, and together they form a quorum.
+   *
+   * @param numServers number of Solr servers (must be at least 3 for quorum)
+   * @param baseDir base directory that the mini cluster should be run from
+   * @param solrXml solr.xml file content
+   * @param jettyConfig Jetty configuration
+   * @param securityJson Optional security.json configuration
+   * @param trackJettyMetrics whether to track Jetty metrics
+   * @throws Exception if there was an error starting the cluster
+   */
+  MiniSolrCloudCluster(
+      int numServers,
+      Path baseDir,
+      String solrXml,
+      JettyConfig jettyConfig,
+      Optional<String> securityJson,
+      boolean trackJettyMetrics,
+      boolean useEmbeddedZkQuorum)
+      throws Exception {
+
+    if (!useEmbeddedZkQuorum) {
+      throw new IllegalArgumentException("This constructor is only for 
embedded ZK quorum mode");
+    }
+    if (numServers < 3) {
+      throw new IllegalArgumentException(
+          "ZooKeeper quorum requires at least 3 nodes, got: " + numServers);
+    }
+
+    Objects.requireNonNull(securityJson);
+    this.baseDir = Objects.requireNonNull(baseDir);
+    this.jettyConfig = Objects.requireNonNull(jettyConfig);
+    this.solrXml = solrXml == null ? DEFAULT_CLOUD_SOLR_XML : solrXml;
+    this.trackJettyMetrics = trackJettyMetrics;
+    this.externalZkServer = true; // No ZkTestServer in quorum mode
+    this.zkServer = null; // No single ZK server
+
+    log.info("Starting cluster of {} servers with embedded ZK quorum in {}", 
numServers, baseDir);
+    Files.createDirectories(baseDir);
+
+    // Phase 1: Reserve random ports for all nodes
+    int[] ports = reservePortPairs(numServers);
+
+    // Build the zkHost string with all ZK ports (Solr port + 1000)
+    StringBuilder zkHostBuilder = new StringBuilder();
+    for (int i = 0; i < numServers; i++) {
+      if (i > 0) {
+        zkHostBuilder.append(",");
+      }
+      int zkPort = ports[i] + 1000;
+      zkHostBuilder.append("127.0.0.1:").append(zkPort);
+    }
+    this.zkHost = zkHostBuilder.toString(); // Save for later use
+
+    if (log.isInfoEnabled()) {
+      log.info("Reserved ports for {} nodes: {}", numServers, 
java.util.Arrays.toString(ports));
+      log.info("ZK connection string: {}", this.zkHost);
+    }
+
+    // Set system properties for embedded ZK quorum mode
+    System.setProperty("solr.zookeeper.server.enabled", "true");
+    System.setProperty("solr.security.manager.enabled", "false");
+    System.setProperty("solr.node.roles", 
"data:on,overseer:allowed,zookeeper_quorum:on");
+    System.setProperty("solr.test.sys.prop1", "propone");
+    System.setProperty("solr.test.sys.prop2", "proptwo");
+    System.setProperty("solr.zookeeper.client.timeout", "300000"); // 5 minutes
+
+    // Phase 2: Start all nodes in parallel
+    List<Callable<JettySolrRunner>> startups = new ArrayList<>(numServers);
+    for (int i = 0; i < numServers; i++) {
+      final int solrPort = ports[i];
+      final String nodeName = newNodeName();
+      startups.add(
+          () -> {
+            Path runnerPath = createInstancePath(nodeName);
+            Files.write(runnerPath.resolve("solr.xml"), 
solrXml.getBytes(StandardCharsets.UTF_8));
+
+            Properties nodeProps = new Properties();
+            nodeProps.setProperty("zkHost", this.zkHost);
+            nodeProps.setProperty("hostPort", String.valueOf(solrPort));
+
+            JettyConfig newConfig = 
JettyConfig.builder(jettyConfig).setPort(solrPort).build();
+
+            JettySolrRunner jetty =
+                !trackJettyMetrics
+                    ? new JettySolrRunner(runnerPath.toString(), nodeProps, 
newConfig)
+                    : new JettySolrRunnerWithMetrics(runnerPath.toString(), 
nodeProps, newConfig);
+
+            int zkPort = solrPort + 1000;
+            log.info("Starting {} on port {} with ZK on port {}", nodeName, 
solrPort, zkPort);
+            jetty.start();
+            log.info("Node {} started successfully", nodeName);
+
+            jettys.add(jetty);
+            synchronized (startupWait) {
+              startupWait.notifyAll();
+            }
+            return jetty;
+          });
+    }
+
+    final ExecutorService executorLauncher =
+        ExecutorUtil.newMDCAwareCachedThreadPool(new 
SolrNamedThreadFactory("jetty-launcher"));
+    Collection<Future<JettySolrRunner>> futures = 
executorLauncher.invokeAll(startups);
+    ExecutorUtil.shutdownAndAwaitTermination(executorLauncher);
+    Exception startupError =
+        checkForExceptions(
+            "Error starting up MiniSolrCloudCluster with embedded ZK quorum", 
futures);
+    if (startupError != null) {
+      try {
+        this.shutdown();
+      } catch (Throwable t) {
+        startupError.addSuppressed(t);
+      }
+      throw startupError;
+    }
+
+    log.info("All {} nodes started, waiting for quorum formation...", 
numServers);
+    Thread.sleep(10000); // Wait for ZK quorum to fully form

Review Comment:
   The fixed `Thread.sleep(10000)` makes cluster startup timing-dependent and 
can be flaky on slow/loaded CI machines. Prefer waiting on a concrete condition 
(e.g., connect with retry to `zkHost`, or use `ZkStateReader.waitForLiveNodes` 
/ ensemble-ready checks) instead of a hard sleep.
   



##########
solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java:
##########
@@ -340,6 +346,274 @@ public MiniSolrCloudCluster(
     }
   }
 
+  /**
+   * Create a MiniSolrCloudCluster with embedded ZooKeeper quorum mode. Each 
Solr node runs its own
+   * embedded ZooKeeper server, and together they form a quorum.
+   *
+   * @param numServers number of Solr servers (must be at least 3 for quorum)
+   * @param baseDir base directory that the mini cluster should be run from
+   * @param solrXml solr.xml file content
+   * @param jettyConfig Jetty configuration
+   * @param securityJson Optional security.json configuration
+   * @param trackJettyMetrics whether to track Jetty metrics
+   * @throws Exception if there was an error starting the cluster
+   */
+  MiniSolrCloudCluster(
+      int numServers,
+      Path baseDir,
+      String solrXml,
+      JettyConfig jettyConfig,
+      Optional<String> securityJson,
+      boolean trackJettyMetrics,
+      boolean useEmbeddedZkQuorum)
+      throws Exception {
+
+    if (!useEmbeddedZkQuorum) {
+      throw new IllegalArgumentException("This constructor is only for 
embedded ZK quorum mode");
+    }
+    if (numServers < 3) {
+      throw new IllegalArgumentException(
+          "ZooKeeper quorum requires at least 3 nodes, got: " + numServers);
+    }
+
+    Objects.requireNonNull(securityJson);
+    this.baseDir = Objects.requireNonNull(baseDir);
+    this.jettyConfig = Objects.requireNonNull(jettyConfig);
+    this.solrXml = solrXml == null ? DEFAULT_CLOUD_SOLR_XML : solrXml;
+    this.trackJettyMetrics = trackJettyMetrics;
+    this.externalZkServer = true; // No ZkTestServer in quorum mode
+    this.zkServer = null; // No single ZK server
+
+    log.info("Starting cluster of {} servers with embedded ZK quorum in {}", 
numServers, baseDir);
+    Files.createDirectories(baseDir);
+
+    // Phase 1: Reserve random ports for all nodes
+    int[] ports = reservePortPairs(numServers);
+
+    // Build the zkHost string with all ZK ports (Solr port + 1000)
+    StringBuilder zkHostBuilder = new StringBuilder();
+    for (int i = 0; i < numServers; i++) {
+      if (i > 0) {
+        zkHostBuilder.append(",");
+      }
+      int zkPort = ports[i] + 1000;
+      zkHostBuilder.append("127.0.0.1:").append(zkPort);
+    }
+    this.zkHost = zkHostBuilder.toString(); // Save for later use
+
+    if (log.isInfoEnabled()) {
+      log.info("Reserved ports for {} nodes: {}", numServers, 
java.util.Arrays.toString(ports));
+      log.info("ZK connection string: {}", this.zkHost);
+    }
+
+    // Set system properties for embedded ZK quorum mode
+    System.setProperty("solr.zookeeper.server.enabled", "true");
+    System.setProperty("solr.security.manager.enabled", "false");
+    System.setProperty("solr.node.roles", 
"data:on,overseer:allowed,zookeeper_quorum:on");
+    System.setProperty("solr.test.sys.prop1", "propone");
+    System.setProperty("solr.test.sys.prop2", "proptwo");
+    System.setProperty("solr.zookeeper.client.timeout", "300000"); // 5 minutes
+

Review Comment:
   This constructor sets multiple global `System` properties (e.g., 
`solr.zookeeper.server.enabled`, `solr.security.manager.enabled`, 
`solr.node.roles`) but never restores the previous values on shutdown. Since 
MiniSolrCloudCluster is used across many tests in the same JVM, these 
properties can leak into subsequent tests and change their behavior. Capture 
previous values and restore/clear them in `shutdown()` (or avoid using global 
sysprops if possible).



##########
solr/core/src/test/org/apache/solr/cloud/TestEmbeddedZkQuorum.java:
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Path;
+import java.util.Locale;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.SolrQuery;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.embedded.JettySolrRunner;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test embedded ZooKeeper running in quorum mode within Solr nodes.
+ *
+ * <p>This test verifies that:
+ *
+ * <ul>
+ *   <li>Multiple Solr nodes can start with embedded ZK in quorum mode
+ *   <li>The ZK quorum forms correctly
+ *   <li>Collections can be created and used
+ *   <li>Documents can be indexed and queried
+ *   <li>All resources are properly closed on shutdown
+ * </ul>
+ */
[email protected]
+public class TestEmbeddedZkQuorum extends SolrCloudTestCase {
+
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final String COLLECTION_NAME = "test_quorum_collection";
+  private static final int NUM_NODES = 3;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    // Disable ZooKeeper JMX to avoid MBean registration conflicts during 
beasting
+    System.setProperty("zookeeper.jmx.log4j.disable", "true");
+
+    // Get path to a test config
+    Path configPath = TEST_PATH().resolve("collection1").resolve("conf");
+
+    // Configure cluster with 3 nodes, each running embedded ZK
+    cluster =
+        configureCluster(NUM_NODES).addConfig("conf1", 
configPath).withEmbeddedZkQuorum().build();
+    cluster.waitForAllNodes(60);
+  }
+
+  @Test
+  public void testBasicQuorumFunctionality()
+      throws IOException, InterruptedException, TimeoutException {
+    for (int i = 0; i < NUM_NODES; i++) {
+      JettySolrRunner node = cluster.getJettySolrRunner(i);
+      assertTrue("Node " + i + " should be running", node.isRunning());
+      assertNotNull("Node " + i + " should have a NodeName", 
node.getNodeName());
+    }
+  }
+
+  @Test
+  public void testCollectionIndexing() throws Exception {
+    try (CloudSolrClient client = cluster.getSolrClient(COLLECTION_NAME)) {
+      CollectionAdminRequest.Create createCmd =
+          CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf1", 1, 
3);
+      createCmd.process(client);
+      cluster.waitForActiveCollection(COLLECTION_NAME, 1, 3);
+
+      // Index some documents
+      for (int i = 0; i < 10; i++) {
+        SolrInputDocument doc = new SolrInputDocument();
+        doc.addField("id", i);
+        doc.addField("title_s", "Test Document " + i);
+        doc.addField("content_t", "This is test content for document " + i);
+        client.add(doc);
+      }
+      client.commit();
+
+      // Query the documents
+      SolrQuery query = new SolrQuery("*:*");
+      query.setRows(100);
+      QueryResponse response = client.query(query);
+      SolrDocumentList results = response.getResults();
+
+      // Verify results
+      assertEquals("Should have 10 documents", 10, results.getNumFound());
+
+      CollectionAdminRequest.Delete deleteCmd =
+          CollectionAdminRequest.deleteCollection(COLLECTION_NAME);
+      deleteCmd.process(client);
+    }
+  }
+
+  /**
+   * Tests ZK quorum resilience when a single node fails and recovers.
+   *
+   * <p>This test verifies that:
+   *
+   * <ul>
+   *   <li>A 3-node ZK quorum can lose 1 node and maintain quorum (2/3)
+   *   <li>The cluster continues to accept writes with 2 nodes
+   *   <li>A failed node can rejoin the quorum using the same ports
+   *   <li>All data is preserved after node recovery
+   * </ul>
+   *
+   * <p>This test creates its own private cluster to avoid interfering with 
other tests.
+   */
+  @Test
+  public void testQuorumResilienceWithNodeFailure() throws Exception {
+    final String collectionName = "quorum_resilience";
+    final int initialDocs = 5;
+    final int docsWhileDown = 5;
+    final int docsAfterRecovery = 5;
+
+    // Create a private cluster for this test
+    Path configPath = TEST_PATH().resolve("collection1").resolve("conf");
+    MiniSolrCloudCluster privateCluster =
+        configureCluster(NUM_NODES).addConfig("conf1", 
configPath).withEmbeddedZkQuorum().build();
+
+    try {
+      privateCluster.waitForAllNodes(60);
+
+      // Create collection with replica on each node
+      CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 3)
+          .process(privateCluster.getSolrClient());
+      privateCluster.waitForActiveCollection(collectionName, 1, 3);
+
+      try (CloudSolrClient client = 
privateCluster.getSolrClient(collectionName)) {
+        // Index initial documents and verify
+        indexDocuments(client, 0, initialDocs, "initial");
+        privateCluster.waitForDocCount(
+            collectionName, initialDocs, "initial documents", 120, 
TimeUnit.SECONDS);
+
+        // Stop one node (quorum maintained with 2/3 nodes)
+        JettySolrRunner stoppedNode = privateCluster.getJettySolrRunner(2);
+        String stoppedNodeName = stoppedNode.getNodeName();
+        if (log.isInfoEnabled()) {
+          log.info("Stopping node to test quorum resilience: {}", 
stoppedNodeName);
+        }
+        privateCluster.stopJettySolrRunner(stoppedNode);
+
+        // Wait for ZK to detect node loss and verify cluster still operational
+        privateCluster.waitForLiveNodes(2, 120);
+        indexDocuments(client, initialDocs, docsWhileDown, "during_failure");
+        privateCluster.waitForDocCount(
+            collectionName,
+            initialDocs + docsWhileDown,
+            "documents while node down",
+            120,
+            TimeUnit.SECONDS);
+        if (log.isInfoEnabled()) {
+          log.info("Starting node {} again and testing functionality", 
stoppedNodeName);
+        }
+
+        privateCluster.startJettySolrRunner(stoppedNode, true);
+        privateCluster.waitForNode(stoppedNode, 120);
+
+        // Wait for cluster to stabilize and verify all nodes running
+        privateCluster.waitForLiveNodes(3, 120);
+
+        // CRITICAL: Wait for collection to become active (replicas up, leader 
elected)
+        // before attempting to index documents
+        privateCluster.waitForActiveCollection(collectionName, 120, 
TimeUnit.SECONDS, 1, 3);
+
+        privateCluster.waitForDocCount(
+            collectionName,
+            initialDocs + docsWhileDown,
+            "documents after recovery",
+            120,
+            TimeUnit.SECONDS);
+
+        // Verify full cluster functionality by adding more documents
+        indexDocuments(client, initialDocs + docsWhileDown, docsAfterRecovery, 
"after_recovery");
+        privateCluster.waitForDocCount(
+            collectionName,
+            initialDocs + docsWhileDown + docsAfterRecovery,
+            "all documents",
+            120,
+            TimeUnit.SECONDS);
+      }
+    } finally {
+      CollectionAdminRequest.deleteCollection(collectionName)
+          .process(privateCluster.getSolrClient());
+      privateCluster.shutdown();
+    }
+  }
+
+  /**
+   * Tests ZK quorum loss and recovery when majority of nodes fail.
+   *
+   * <p>This test verifies that:
+   *
+   * <ul>
+   *   <li>A 3-node ZK quorum loses quorum when 2 nodes are down (1/3 
remaining)
+   *   <li>The surviving node maintains its replica but cannot process updates 
without quorum
+   *   <li>Both failed nodes can be restarted to restore quorum
+   *   <li>The cluster becomes operational again (can query and index 
documents)
+   *   <li>Note: After catastrophic failure, some replicas may need time or 
manual intervention to
+   *       fully recover
+   * </ul>
+   *
+   * <p>This test creates its own private cluster to avoid interfering with 
other tests. <b>Hard to
+   * make this test pass</b>
+   */
+  @AwaitsFix(bugUrl = "https://example.com/foo";)

Review Comment:
   `@AwaitsFix` uses a placeholder URL (`https://example.com/foo`) and the 
Javadoc says the test is "Hard to make this test pass". In this codebase, 
`@AwaitsFix` links to a real Jira (see other tests). Please either provide the 
real SOLR issue URL explaining the failure mode, or remove/disable the test 
until it’s reliable (otherwise this adds a permanently flaky/ignored test).
   



##########
solr/core/src/java/org/apache/solr/core/ZkContainer.java:
##########
@@ -70,42 +80,105 @@ public class ZkContainer {
   public ZkContainer() {}
 
   public void initZooKeeper(final CoreContainer cc, CloudConfig config) {
-    boolean zkRun = 
EnvUtils.getPropertyAsBool("solr.zookeeper.server.enabled", false);
+    // zkServerEnabled is set whenever in solrCloud mode ('-c') but no 
explicit zkHost/ZK_HOST is
+    // provided.
+    final boolean zkServerEnabled =
+        EnvUtils.getPropertyAsBool("solr.zookeeper.server.enabled", false);
+    boolean zkQuorumNode = false;
+    if 
(NodeRoles.MODE_ON.equals(cc.nodeRoles.getRoleMode(NodeRoles.Role.ZOOKEEPER_QUORUM)))
 {
+      zkQuorumNode = true;
+      log.info("Starting node in ZooKeeper Quorum role.");
+    }
 
-    if (zkRun && config == null)
+    if (zkServerEnabled && config == null) {
       throw new SolrException(
           SolrException.ErrorCode.SERVER_ERROR,
           "Cannot start Solr in cloud mode - no cloud config provided");
+    }
+
+    if (config == null) {
+      log.info("Solr is running in standalone mode");
+      return;
+    }
 
-    if (config == null) return; // not in zk mode
+    final boolean runAsQuorum = config.getZkHost() != null && zkQuorumNode;
 
     String zookeeperHost = config.getZkHost();
+    final var solrHome = cc.getSolrHome();
+    if (zkServerEnabled) {
+      if (!runAsQuorum) {
+        // Old school ZooKeeperServerMain being used under the covers.
+        zkServer =
+            SolrZkServer.createAndStart(config.getZkHost(), solrHome, 
config.getSolrHostPort());
+
+        // set client from server config if not already set
+        if (zookeeperHost == null) {
+          zookeeperHost = zkServer.getClientString();
+        }
+      } else {
+        // ZooKeeperServerEmbedded being used under the covers.
+        // Figure out where to put zoo-data
+        final var zkHomeDir = solrHome.resolve("zoo_home");
+        final var zkDataDir = zkHomeDir.resolve("data");
+
+        // Populate a zoo.cfg
+        final String zooCfgTemplate =
+            ""
+                + "tickTime=2000\n"
+                + "initLimit=10\n"
+                + "syncLimit=5\n"
+                + "dataDir=@@DATA_DIR@@\n"
+                + "4lw.commands.whitelist=mntr,conf,ruok\n"
+                + "admin.enableServer=false\n"
+                + "clientPort=@@ZK_CLIENT_PORT@@\n";
+
+        final int zkPort = config.getSolrHostPort() + 1000;
+        String zooCfgContents =
+            zooCfgTemplate
+                .replace("@@DATA_DIR@@", zkDataDir.toString())
+                .replace("@@ZK_CLIENT_PORT@@", String.valueOf(zkPort));
+        final String[] zkHosts = config.getZkHost().split(",");
+        int myId = -1;
+        final String targetConnStringSection = config.getHost() + ":" + zkPort;
+        if (log.isInfoEnabled()) {
+          log.info(
+              "Trying to match {} against zkHostString {} to determine myid",
+              targetConnStringSection,
+              config.getZkHost());
+        }
+        for (int i = 0; i < zkHosts.length; i++) {
+          final String host = zkHosts[i];
+          if (targetConnStringSection.equals(zkHosts[i])) {
+            myId = (i + 1);
+          }
+          final var hostComponents = host.split(":");
+          final var zkServer = hostComponents[0];
+          final var zkClientPort = Integer.valueOf(hostComponents[1]);
+          final var zkQuorumPort = zkClientPort - 4000;
+          final var zkLeaderPort = zkClientPort - 3000;

Review Comment:
   The quorum/leader ports are derived by subtracting constants (`clientPort - 
4000` / `- 3000`). This doesn’t match the port scheme used elsewhere in Solr’s 
embedded-ZK code (`SolrZkServerProps.injectServers` uses `clientPort+1` and 
`clientPort+2`) and can produce invalid/negative ports depending on the client 
port. This will prevent the ZooKeeper quorum from forming reliably. Use an 
explicit, consistent mapping for peer/election ports (and ensure those ports 
are available).
   



##########
solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java:
##########
@@ -340,6 +346,274 @@ public MiniSolrCloudCluster(
     }
   }
 
+  /**
+   * Create a MiniSolrCloudCluster with embedded ZooKeeper quorum mode. Each 
Solr node runs its own
+   * embedded ZooKeeper server, and together they form a quorum.
+   *
+   * @param numServers number of Solr servers (must be at least 3 for quorum)
+   * @param baseDir base directory that the mini cluster should be run from
+   * @param solrXml solr.xml file content
+   * @param jettyConfig Jetty configuration
+   * @param securityJson Optional security.json configuration
+   * @param trackJettyMetrics whether to track Jetty metrics
+   * @throws Exception if there was an error starting the cluster
+   */
+  MiniSolrCloudCluster(
+      int numServers,
+      Path baseDir,
+      String solrXml,
+      JettyConfig jettyConfig,
+      Optional<String> securityJson,
+      boolean trackJettyMetrics,
+      boolean useEmbeddedZkQuorum)
+      throws Exception {
+
+    if (!useEmbeddedZkQuorum) {
+      throw new IllegalArgumentException("This constructor is only for 
embedded ZK quorum mode");
+    }
+    if (numServers < 3) {
+      throw new IllegalArgumentException(
+          "ZooKeeper quorum requires at least 3 nodes, got: " + numServers);
+    }
+
+    Objects.requireNonNull(securityJson);
+    this.baseDir = Objects.requireNonNull(baseDir);
+    this.jettyConfig = Objects.requireNonNull(jettyConfig);
+    this.solrXml = solrXml == null ? DEFAULT_CLOUD_SOLR_XML : solrXml;
+    this.trackJettyMetrics = trackJettyMetrics;
+    this.externalZkServer = true; // No ZkTestServer in quorum mode
+    this.zkServer = null; // No single ZK server
+
+    log.info("Starting cluster of {} servers with embedded ZK quorum in {}", 
numServers, baseDir);
+    Files.createDirectories(baseDir);
+
+    // Phase 1: Reserve random ports for all nodes
+    int[] ports = reservePortPairs(numServers);
+
+    // Build the zkHost string with all ZK ports (Solr port + 1000)
+    StringBuilder zkHostBuilder = new StringBuilder();
+    for (int i = 0; i < numServers; i++) {
+      if (i > 0) {
+        zkHostBuilder.append(",");
+      }
+      int zkPort = ports[i] + 1000;
+      zkHostBuilder.append("127.0.0.1:").append(zkPort);
+    }
+    this.zkHost = zkHostBuilder.toString(); // Save for later use
+
+    if (log.isInfoEnabled()) {
+      log.info("Reserved ports for {} nodes: {}", numServers, 
java.util.Arrays.toString(ports));
+      log.info("ZK connection string: {}", this.zkHost);
+    }
+
+    // Set system properties for embedded ZK quorum mode
+    System.setProperty("solr.zookeeper.server.enabled", "true");
+    System.setProperty("solr.security.manager.enabled", "false");
+    System.setProperty("solr.node.roles", 
"data:on,overseer:allowed,zookeeper_quorum:on");
+    System.setProperty("solr.test.sys.prop1", "propone");
+    System.setProperty("solr.test.sys.prop2", "proptwo");
+    System.setProperty("solr.zookeeper.client.timeout", "300000"); // 5 minutes
+
+    // Phase 2: Start all nodes in parallel
+    List<Callable<JettySolrRunner>> startups = new ArrayList<>(numServers);
+    for (int i = 0; i < numServers; i++) {
+      final int solrPort = ports[i];
+      final String nodeName = newNodeName();
+      startups.add(
+          () -> {
+            Path runnerPath = createInstancePath(nodeName);
+            Files.write(runnerPath.resolve("solr.xml"), 
solrXml.getBytes(StandardCharsets.UTF_8));
+
+            Properties nodeProps = new Properties();
+            nodeProps.setProperty("zkHost", this.zkHost);
+            nodeProps.setProperty("hostPort", String.valueOf(solrPort));
+
+            JettyConfig newConfig = 
JettyConfig.builder(jettyConfig).setPort(solrPort).build();
+
+            JettySolrRunner jetty =
+                !trackJettyMetrics
+                    ? new JettySolrRunner(runnerPath.toString(), nodeProps, 
newConfig)
+                    : new JettySolrRunnerWithMetrics(runnerPath.toString(), 
nodeProps, newConfig);
+
+            int zkPort = solrPort + 1000;
+            log.info("Starting {} on port {} with ZK on port {}", nodeName, 
solrPort, zkPort);
+            jetty.start();
+            log.info("Node {} started successfully", nodeName);
+
+            jettys.add(jetty);
+            synchronized (startupWait) {
+              startupWait.notifyAll();
+            }
+            return jetty;
+          });
+    }
+
+    final ExecutorService executorLauncher =
+        ExecutorUtil.newMDCAwareCachedThreadPool(new 
SolrNamedThreadFactory("jetty-launcher"));
+    Collection<Future<JettySolrRunner>> futures = 
executorLauncher.invokeAll(startups);
+    ExecutorUtil.shutdownAndAwaitTermination(executorLauncher);
+    Exception startupError =
+        checkForExceptions(
+            "Error starting up MiniSolrCloudCluster with embedded ZK quorum", 
futures);
+    if (startupError != null) {
+      try {
+        this.shutdown();
+      } catch (Throwable t) {
+        startupError.addSuppressed(t);
+      }
+      throw startupError;
+    }
+
+    log.info("All {} nodes started, waiting for quorum formation...", 
numServers);
+    Thread.sleep(10000); // Wait for ZK quorum to fully form
+
+    // Initialize ZK paths and security (if provided)
+    try (SolrZkClient zkClient =
+        new SolrZkClient.Builder()
+            .withUrl(this.zkHost)
+            .withTimeout(60000, TimeUnit.MILLISECONDS)
+            .build()) {
+      if (!zkClient.exists("/solr")) {
+        zkClient.makePath("/solr", true);
+      }
+
+      if (jettyConfig.sslConfig != null && jettyConfig.sslConfig.isSSLMode()) {
+        zkClient.makePath(
+            "/solr" + ZkStateReader.CLUSTER_PROPS,
+            "{'urlScheme':'https'}".getBytes(StandardCharsets.UTF_8),
+            true);
+      }
+      if (securityJson.isPresent()) {
+        zkClient.makePath(
+            "/solr/security.json", 
securityJson.get().getBytes(Charset.defaultCharset()), true);
+      }
+    }
+
+    solrClient = buildSolrClientForQuorum(this.zkHost);
+
+    if (numServers > 0) {
+      waitForAllNodes(numServers, 60);
+    }
+
+    log.info("Embedded ZK quorum cluster started successfully with {} nodes", 
numServers);
+  }
+
+  /**
+   * Reserves port pairs for embedded ZK quorum mode. For each node, we need 
both a Solr port and a
+   * ZK port (Solr port + 1000). This method ensures both ports in each pair 
are available before
+   * returning.
+   *
+   * <p>The method keeps all ServerSockets open during the search to prevent 
race conditions where
+   * another process might grab a port between our check and actual usage.
+   *
+   * @param numPairs the number of port pairs to reserve
+   * @return array of Solr ports (ZK ports are Solr port + 1000)
+   * @throws IOException if unable to find enough available port pairs
+   */
+  private int[] reservePortPairs(int numPairs) throws IOException {
+    List<ServerSocket> solrSockets = new ArrayList<>();
+    List<ServerSocket> zkSockets = new ArrayList<>();
+    int[] ports = new int[numPairs];
+
+    try {
+      int pairsFound = 0;
+      int maxAttempts = numPairs * 100; // Reasonable limit to avoid infinite 
loops
+      int attempts = 0;
+
+      while (pairsFound < numPairs && attempts < maxAttempts) {
+        attempts++;
+        ServerSocket solrSocket = null;
+        ServerSocket zkSocket = null;
+
+        try {
+          // Try to get a random available port for Solr
+          solrSocket = new ServerSocket(0);
+          int solrPort = solrSocket.getLocalPort();
+          int zkPort = solrPort + 1000;
+
+          // Check if ZK port would exceed the valid port range (0-65535)
+          if (zkPort > 65535) {
+            solrSocket.close();
+            continue; // Skip this port and try again
+          }
+
+          // Verify the corresponding ZK port is also available
+          zkSocket = new ServerSocket(zkPort);
+
+          // Both ports are available - keep the sockets and record the port
+          solrSockets.add(solrSocket);
+          zkSockets.add(zkSocket);
+          ports[pairsFound] = solrPort;
+          pairsFound++;
+
+          if (log.isDebugEnabled()) {
+            log.debug(
+                "Reserved port pair {}/{}: Solr={}, ZK={}", pairsFound, 
numPairs, solrPort, zkPort);
+          }
+
+        } catch (IOException | IllegalArgumentException e) {
+          // ZK port was not available or invalid, close sockets and try again
+          if (solrSocket != null) {
+            try {
+              solrSocket.close();
+            } catch (IOException ignored) {
+            }
+          }
+          if (zkSocket != null) {
+            try {
+              zkSocket.close();
+            } catch (IOException ignored) {
+            }
+          }

Review Comment:
   `reservePortPairs` only verifies availability of the Solr port and the 
ZooKeeper client port (`solrPort+1000`). A ZooKeeper quorum peer also needs 
additional ports for quorum/election traffic (typically `clientPort+1` and 
`clientPort+2`). If those ports are in use, the embedded ZK server can fail to 
bind even though this method succeeded; consider reserving/checking those ports 
too (or making the port scheme explicit).
   



##########
solr/core/src/java/org/apache/solr/core/ZkContainer.java:
##########
@@ -70,42 +80,105 @@ public class ZkContainer {
   public ZkContainer() {}
 
   public void initZooKeeper(final CoreContainer cc, CloudConfig config) {
-    boolean zkRun = 
EnvUtils.getPropertyAsBool("solr.zookeeper.server.enabled", false);
+    // zkServerEnabled is set whenever in solrCloud mode ('-c') but no 
explicit zkHost/ZK_HOST is
+    // provided.
+    final boolean zkServerEnabled =
+        EnvUtils.getPropertyAsBool("solr.zookeeper.server.enabled", false);
+    boolean zkQuorumNode = false;
+    if 
(NodeRoles.MODE_ON.equals(cc.nodeRoles.getRoleMode(NodeRoles.Role.ZOOKEEPER_QUORUM)))
 {
+      zkQuorumNode = true;
+      log.info("Starting node in ZooKeeper Quorum role.");
+    }
 
-    if (zkRun && config == null)
+    if (zkServerEnabled && config == null) {
       throw new SolrException(
           SolrException.ErrorCode.SERVER_ERROR,
           "Cannot start Solr in cloud mode - no cloud config provided");
+    }
+
+    if (config == null) {
+      log.info("Solr is running in standalone mode");
+      return;
+    }
 
-    if (config == null) return; // not in zk mode
+    final boolean runAsQuorum = config.getZkHost() != null && zkQuorumNode;
 
     String zookeeperHost = config.getZkHost();
+    final var solrHome = cc.getSolrHome();
+    if (zkServerEnabled) {
+      if (!runAsQuorum) {
+        // Old school ZooKeeperServerMain being used under the covers.
+        zkServer =
+            SolrZkServer.createAndStart(config.getZkHost(), solrHome, 
config.getSolrHostPort());
+
+        // set client from server config if not already set
+        if (zookeeperHost == null) {
+          zookeeperHost = zkServer.getClientString();
+        }
+      } else {
+        // ZooKeeperServerEmbedded being used under the covers.
+        // Figure out where to put zoo-data
+        final var zkHomeDir = solrHome.resolve("zoo_home");
+        final var zkDataDir = zkHomeDir.resolve("data");
+
+        // Populate a zoo.cfg
+        final String zooCfgTemplate =
+            ""
+                + "tickTime=2000\n"
+                + "initLimit=10\n"
+                + "syncLimit=5\n"
+                + "dataDir=@@DATA_DIR@@\n"
+                + "4lw.commands.whitelist=mntr,conf,ruok\n"
+                + "admin.enableServer=false\n"
+                + "clientPort=@@ZK_CLIENT_PORT@@\n";
+
+        final int zkPort = config.getSolrHostPort() + 1000;
+        String zooCfgContents =
+            zooCfgTemplate
+                .replace("@@DATA_DIR@@", zkDataDir.toString())
+                .replace("@@ZK_CLIENT_PORT@@", String.valueOf(zkPort));
+        final String[] zkHosts = config.getZkHost().split(",");
+        int myId = -1;
+        final String targetConnStringSection = config.getHost() + ":" + zkPort;
+        if (log.isInfoEnabled()) {
+          log.info(
+              "Trying to match {} against zkHostString {} to determine myid",
+              targetConnStringSection,
+              config.getZkHost());
+        }
+        for (int i = 0; i < zkHosts.length; i++) {
+          final String host = zkHosts[i];
+          if (targetConnStringSection.equals(zkHosts[i])) {
+            myId = (i + 1);
+          }
+          final var hostComponents = host.split(":");
+          final var zkServer = hostComponents[0];
+          final var zkClientPort = Integer.valueOf(hostComponents[1]);
+          final var zkQuorumPort = zkClientPort - 4000;
+          final var zkLeaderPort = zkClientPort - 3000;
+          final String configEntry =
+              "server." + (i + 1) + "=" + zkServer + ":" + zkQuorumPort + ":" 
+ zkLeaderPort + "\n";
+          zooCfgContents = zooCfgContents + configEntry;
+        }
+
+        if (myId == -1) {
+          throw new IllegalStateException(
+              "Unable to determine ZK 'myid' for target " + 
targetConnStringSection);
+        }

Review Comment:
   `myId` detection requires an exact string match between `config.getHost()` 
and the host portion of `zkHost` entries. This will fail for common cases like 
`zkHost` using `localhost` while Solr’s configured host is `127.0.0.1`, causing 
startup to abort with `Unable to determine ZK 'myid'`. Consider matching by 
port (and/or resolving hostnames/loopback equivalence) instead of requiring 
exact host string equality.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to