janhoy commented on code in PR #4050:
URL: https://github.com/apache/solr/pull/4050#discussion_r2693720193


##########
solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTestBase.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import jakarta.servlet.ServletException;
+import jakarta.servlet.http.HttpServlet;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.SocketTimeoutException;
+import java.net.http.HttpConnectTimeoutException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
+import org.apache.solr.client.solrj.request.SolrQuery;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.embedded.JettyConfig;
+import org.apache.solr.util.SolrJettyTestRule;
+import org.eclipse.jetty.ee10.servlet.ServletHolder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ConcurrentUpdateSolrClientTestBase extends 
SolrTestCaseJ4 {
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public abstract HttpSolrClientBase solrClient(Integer overrideIdleTimeoutMs);
+
+  public abstract ConcurrentUpdateBaseSolrClient concurrentClient(
+      HttpSolrClientBase solrClient,
+      String baseUrl,
+      String defaultCollection,
+      int queueSize,
+      int threadCount,
+      boolean disablePollQueue);
+
+  public abstract ConcurrentUpdateBaseSolrClient 
outcomeCountingConcurrentClient(
+      String serverUrl,
+      int queueSize,
+      int threadCount,
+      HttpSolrClientBase solrClient,
+      AtomicInteger successCounter,
+      AtomicInteger failureCounter,
+      StringBuilder errors);
+
+  /** Mock endpoint where the CUSS being tested in this class sends requests. 
*/
+  public static class TestServlet extends HttpServlet
+      implements JavaBinUpdateRequestCodec.StreamingUpdateHandler {
+    private static final long serialVersionUID = 1L;
+
+    public static void clear() {
+      lastMethod = null;
+      headers = null;
+      parameters = null;
+      errorCode = null;
+      numReqsRcvd.set(0);
+      numDocsRcvd.set(0);
+    }
+
+    public static Integer errorCode = null;
+    public static String lastMethod = null;
+    public static HashMap<String, String> headers = null;
+    public static Map<String, String[]> parameters = null;
+    public static AtomicInteger numReqsRcvd = new AtomicInteger(0);
+    public static AtomicInteger numDocsRcvd = new AtomicInteger(0);
+
+    public static void setErrorCode(Integer code) {
+      errorCode = code;
+    }
+
+    private void setHeaders(HttpServletRequest req) {
+      Enumeration<String> headerNames = req.getHeaderNames();
+      headers = new HashMap<>();
+      while (headerNames.hasMoreElements()) {
+        final String name = headerNames.nextElement();
+        headers.put(name, req.getHeader(name));
+      }
+    }
+
+    private void setParameters(HttpServletRequest req) {
+      // parameters = req.getParameterMap();
+    }
+
+    @Override
+    protected void doPost(HttpServletRequest req, HttpServletResponse resp)
+        throws ServletException, IOException {
+
+      numReqsRcvd.incrementAndGet();
+      lastMethod = "post";
+      recordRequest(req, resp);
+
+      InputStream reqIn = req.getInputStream();
+      JavaBinUpdateRequestCodec javabin = new JavaBinUpdateRequestCodec();
+      for (; ; ) {
+        try {
+          javabin.unmarshal(reqIn, this);
+        } catch (EOFException e) {
+          break; // this is expected
+        }
+      }
+    }
+
+    private void recordRequest(HttpServletRequest req, HttpServletResponse 
resp) {
+      setHeaders(req);
+      setParameters(req);
+      if (null != errorCode) {
+        try {
+          resp.sendError(errorCode);
+        } catch (IOException e) {
+          throw new RuntimeException("sendError IO fail in TestServlet", e);
+        }
+      }
+    }
+
+    @Override
+    public void update(
+        SolrInputDocument document, UpdateRequest req, Integer commitWithin, 
Boolean override) {
+      numDocsRcvd.incrementAndGet();
+    }
+  } // end TestServlet
+
+  static class SendDocsRunnable implements Runnable {
+
+    private String id;
+    private int numDocs;
+    private SolrClient cuss;
+    private String collection;
+
+    SendDocsRunnable(String id, int numDocs, SolrClient cuss) {
+      this(id, numDocs, cuss, null);
+    }
+
+    SendDocsRunnable(String id, int numDocs, SolrClient cuss, String 
collection) {
+      this.id = id;
+      this.numDocs = numDocs;
+      this.cuss = cuss;
+      this.collection = collection;
+    }
+
+    @Override
+    public void run() {
+      for (int d = 0; d < numDocs; d++) {
+        SolrInputDocument doc = new SolrInputDocument();
+        String docId = id + "_" + d;
+        doc.setField("id", docId);
+        UpdateRequest req = new UpdateRequest();
+        req.add(doc);
+        try {
+          if (this.collection == null) cuss.request(req);
+          else cuss.request(req, this.collection);
+        } catch (Throwable t) {
+          log.error("error making request", t);
+        }
+      }
+    }
+  }
+
+  @ClassRule public static SolrJettyTestRule solrTestRule = new 
SolrJettyTestRule();
+
+  @BeforeClass
+  public static void beforeTest() throws Exception {
+    JettyConfig jettyConfig =
+        JettyConfig.builder().withServlet(new 
ServletHolder(TestServlet.class), "/cuss/*").build();
+    solrTestRule.startSolr(legacyExampleCollection1SolrHome(), new 
Properties(), jettyConfig);
+  }
+
+  @AfterClass
+  public static void afterTest() throws Exception {}
+
+  @Test
+  public void testConcurrentUpdate() throws Exception {
+    TestServlet.clear();
+
+    String serverUrl = solrTestRule.getBaseUrl() + "/cuss/foo";
+
+    int cussThreadCount = 2;
+    int cussQueueSize = 100;
+
+    // for tracking callbacks from CUSS
+    final AtomicInteger successCounter = new AtomicInteger(0);
+    final AtomicInteger errorCounter = new AtomicInteger(0);
+    final StringBuilder errors = new StringBuilder();
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            outcomeCountingConcurrentClient(
+                serverUrl,
+                cussQueueSize,
+                cussThreadCount,
+                http2Client,
+                successCounter,
+                errorCounter,
+                errors)) {
+
+      // ensure it doesn't block where there's nothing to do yet
+      concurrentClient.blockUntilFinished();
+
+      int poolSize = 5;
+      ExecutorService threadPool =
+          ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new 
SolrNamedThreadFactory("testCUSS"));
+
+      int numDocs = 100;
+      int numRunnables = 5;
+      for (int r = 0; r < numRunnables; r++)
+        threadPool.execute(new SendDocsRunnable(String.valueOf(r), numDocs, 
concurrentClient));
+
+      // ensure all docs are sent
+      threadPool.awaitTermination(5, TimeUnit.SECONDS);
+      threadPool.shutdown();
+
+      // wait until all requests are processed by CUSS
+      concurrentClient.blockUntilFinished();
+      concurrentClient.shutdownNow();
+
+      assertEquals("post", TestServlet.lastMethod);
+
+      // expect all requests to be successful
+      int expectedSuccesses = TestServlet.numReqsRcvd.get();
+      assertTrue(expectedSuccesses > 0); // at least one request must have 
been sent
+
+      assertEquals(
+          "Expected no errors but got " + errorCounter.get() + ", due to: " + 
errors.toString(),
+          0,
+          errorCounter.get());
+      assertEquals(
+          "Expected " + expectedSuccesses + " successes, but got " + 
successCounter.get(),
+          successCounter.get(),
+          expectedSuccesses);
+
+      int expectedDocs = numDocs * numRunnables;
+      assertEquals(
+          "Expected CUSS to send " + expectedDocs + " but got " + 
TestServlet.numDocsRcvd.get(),
+          TestServlet.numDocsRcvd.get(),
+          expectedDocs);
+    }
+  }
+
+  @Test
+  public void testCollectionParameters() throws IOException, 
SolrServerException {
+
+    int cussThreadCount = 2;
+    int cussQueueSize = 10;
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            concurrentClient(
+                http2Client,
+                solrTestRule.getBaseUrl(),
+                null,
+                cussThreadCount,
+                cussQueueSize,

Review Comment:
   Swap order: `queueSize, threadCount`



##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateJdkSolrClient.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.http.HttpResponse;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+
+/** A ConcurrentUpdate SolrClient using {@link HttpJdkSolrClient}. */
+public class ConcurrentUpdateJdkSolrClient extends 
ConcurrentUpdateBaseSolrClient {
+
+  private final HttpJdkSolrClient client;
+
+  protected 
ConcurrentUpdateJdkSolrClient(ConcurrentUpdateJdkSolrClient.Builder builder) {
+    super(builder);
+    this.client = (HttpJdkSolrClient) builder.getClient();
+  }
+
+  @Override
+  protected StreamingResponse doSendUpdateStream(Update update) {
+    UpdateRequest req = update.request();
+    String collection = update.collection();
+    CompletableFuture<HttpResponse<InputStream>> resp =
+        client.requestInputStreamAsync(basePath, req, collection);
+
+    return new StreamingResponse() {
+
+      @Override
+      public int awaitResponse(long timeoutMillis) throws Exception {
+        return resp.get(timeoutMillis, TimeUnit.MILLISECONDS).statusCode();
+      }
+
+      @Override
+      public InputStream getInputStream() {
+        try {
+          return resp.get().body();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return InputStream.nullInputStream();
+        } catch (ExecutionException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public Object getUnderlyingResponse() {
+        return resp;
+      }
+
+      @Override
+      public void close() throws IOException {
+        getInputStream().close();
+      }
+    };
+  }
+
+  public static class Builder extends ConcurrentUpdateBaseSolrClient.Builder {
+    /**
+     * @see 
org.apache.solr.client.solrj.impl.ConcurrentUpdateBaseSolrClient.Builder#Builder(String,
+     *     HttpSolrClientBase)
+     */
+    public Builder(String baseUrl, HttpJdkSolrClient client) {
+
+      this(baseUrl, client, false);
+      // The base class uses idle timeout with StreamingResponse#awaitResponse 
so it needs to be
+      // set!
+      this.idleTimeoutMillis = 1000;

Review Comment:
   Why is this not configurable? 
   
   When implementing the tikaserver code, we had a bug where we configured the 
connectTimeout, but let the idleTimeout at its default value (5s), which was 
too low. Solr would send the payload to tikaServer, and in some cases, Tika 
would spend more than 5s parsing the document and the connection became idle. 
If we need to hardcode a value here, it should likely be >1s



##########
solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTestBase.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import jakarta.servlet.ServletException;
+import jakarta.servlet.http.HttpServlet;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.SocketTimeoutException;
+import java.net.http.HttpConnectTimeoutException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
+import org.apache.solr.client.solrj.request.SolrQuery;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.embedded.JettyConfig;
+import org.apache.solr.util.SolrJettyTestRule;
+import org.eclipse.jetty.ee10.servlet.ServletHolder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ConcurrentUpdateSolrClientTestBase extends 
SolrTestCaseJ4 {
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public abstract HttpSolrClientBase solrClient(Integer overrideIdleTimeoutMs);
+
+  public abstract ConcurrentUpdateBaseSolrClient concurrentClient(
+      HttpSolrClientBase solrClient,
+      String baseUrl,
+      String defaultCollection,
+      int queueSize,
+      int threadCount,
+      boolean disablePollQueue);
+
+  public abstract ConcurrentUpdateBaseSolrClient 
outcomeCountingConcurrentClient(
+      String serverUrl,
+      int queueSize,
+      int threadCount,
+      HttpSolrClientBase solrClient,
+      AtomicInteger successCounter,
+      AtomicInteger failureCounter,
+      StringBuilder errors);
+
+  /** Mock endpoint where the CUSS being tested in this class sends requests. 
*/
+  public static class TestServlet extends HttpServlet
+      implements JavaBinUpdateRequestCodec.StreamingUpdateHandler {
+    private static final long serialVersionUID = 1L;
+
+    public static void clear() {
+      lastMethod = null;
+      headers = null;
+      parameters = null;
+      errorCode = null;
+      numReqsRcvd.set(0);
+      numDocsRcvd.set(0);
+    }
+
+    public static Integer errorCode = null;
+    public static String lastMethod = null;
+    public static HashMap<String, String> headers = null;
+    public static Map<String, String[]> parameters = null;
+    public static AtomicInteger numReqsRcvd = new AtomicInteger(0);
+    public static AtomicInteger numDocsRcvd = new AtomicInteger(0);
+
+    public static void setErrorCode(Integer code) {
+      errorCode = code;
+    }
+
+    private void setHeaders(HttpServletRequest req) {
+      Enumeration<String> headerNames = req.getHeaderNames();
+      headers = new HashMap<>();
+      while (headerNames.hasMoreElements()) {
+        final String name = headerNames.nextElement();
+        headers.put(name, req.getHeader(name));
+      }
+    }
+
+    private void setParameters(HttpServletRequest req) {
+      // parameters = req.getParameterMap();
+    }
+
+    @Override
+    protected void doPost(HttpServletRequest req, HttpServletResponse resp)
+        throws ServletException, IOException {
+
+      numReqsRcvd.incrementAndGet();
+      lastMethod = "post";
+      recordRequest(req, resp);
+
+      InputStream reqIn = req.getInputStream();
+      JavaBinUpdateRequestCodec javabin = new JavaBinUpdateRequestCodec();
+      for (; ; ) {
+        try {
+          javabin.unmarshal(reqIn, this);
+        } catch (EOFException e) {
+          break; // this is expected
+        }
+      }
+    }
+
+    private void recordRequest(HttpServletRequest req, HttpServletResponse 
resp) {
+      setHeaders(req);
+      setParameters(req);
+      if (null != errorCode) {
+        try {
+          resp.sendError(errorCode);
+        } catch (IOException e) {
+          throw new RuntimeException("sendError IO fail in TestServlet", e);
+        }
+      }
+    }
+
+    @Override
+    public void update(
+        SolrInputDocument document, UpdateRequest req, Integer commitWithin, 
Boolean override) {
+      numDocsRcvd.incrementAndGet();
+    }
+  } // end TestServlet
+
+  static class SendDocsRunnable implements Runnable {
+
+    private String id;
+    private int numDocs;
+    private SolrClient cuss;
+    private String collection;
+
+    SendDocsRunnable(String id, int numDocs, SolrClient cuss) {
+      this(id, numDocs, cuss, null);
+    }
+
+    SendDocsRunnable(String id, int numDocs, SolrClient cuss, String 
collection) {
+      this.id = id;
+      this.numDocs = numDocs;
+      this.cuss = cuss;
+      this.collection = collection;
+    }
+
+    @Override
+    public void run() {
+      for (int d = 0; d < numDocs; d++) {
+        SolrInputDocument doc = new SolrInputDocument();
+        String docId = id + "_" + d;
+        doc.setField("id", docId);
+        UpdateRequest req = new UpdateRequest();
+        req.add(doc);
+        try {
+          if (this.collection == null) cuss.request(req);
+          else cuss.request(req, this.collection);
+        } catch (Throwable t) {
+          log.error("error making request", t);
+        }
+      }
+    }
+  }
+
+  @ClassRule public static SolrJettyTestRule solrTestRule = new 
SolrJettyTestRule();
+
+  @BeforeClass
+  public static void beforeTest() throws Exception {
+    JettyConfig jettyConfig =
+        JettyConfig.builder().withServlet(new 
ServletHolder(TestServlet.class), "/cuss/*").build();
+    solrTestRule.startSolr(legacyExampleCollection1SolrHome(), new 
Properties(), jettyConfig);
+  }
+
+  @AfterClass
+  public static void afterTest() throws Exception {}
+
+  @Test
+  public void testConcurrentUpdate() throws Exception {
+    TestServlet.clear();
+
+    String serverUrl = solrTestRule.getBaseUrl() + "/cuss/foo";
+
+    int cussThreadCount = 2;
+    int cussQueueSize = 100;
+
+    // for tracking callbacks from CUSS
+    final AtomicInteger successCounter = new AtomicInteger(0);
+    final AtomicInteger errorCounter = new AtomicInteger(0);
+    final StringBuilder errors = new StringBuilder();
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            outcomeCountingConcurrentClient(
+                serverUrl,
+                cussQueueSize,
+                cussThreadCount,
+                http2Client,
+                successCounter,
+                errorCounter,
+                errors)) {
+
+      // ensure it doesn't block where there's nothing to do yet
+      concurrentClient.blockUntilFinished();
+
+      int poolSize = 5;
+      ExecutorService threadPool =
+          ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new 
SolrNamedThreadFactory("testCUSS"));
+
+      int numDocs = 100;
+      int numRunnables = 5;
+      for (int r = 0; r < numRunnables; r++)
+        threadPool.execute(new SendDocsRunnable(String.valueOf(r), numDocs, 
concurrentClient));
+
+      // ensure all docs are sent
+      threadPool.awaitTermination(5, TimeUnit.SECONDS);
+      threadPool.shutdown();
+
+      // wait until all requests are processed by CUSS
+      concurrentClient.blockUntilFinished();
+      concurrentClient.shutdownNow();
+
+      assertEquals("post", TestServlet.lastMethod);
+
+      // expect all requests to be successful
+      int expectedSuccesses = TestServlet.numReqsRcvd.get();
+      assertTrue(expectedSuccesses > 0); // at least one request must have 
been sent
+
+      assertEquals(
+          "Expected no errors but got " + errorCounter.get() + ", due to: " + 
errors.toString(),
+          0,
+          errorCounter.get());
+      assertEquals(
+          "Expected " + expectedSuccesses + " successes, but got " + 
successCounter.get(),
+          successCounter.get(),
+          expectedSuccesses);
+
+      int expectedDocs = numDocs * numRunnables;
+      assertEquals(
+          "Expected CUSS to send " + expectedDocs + " but got " + 
TestServlet.numDocsRcvd.get(),
+          TestServlet.numDocsRcvd.get(),
+          expectedDocs);
+    }
+  }
+
+  @Test
+  public void testCollectionParameters() throws IOException, 
SolrServerException {
+
+    int cussThreadCount = 2;
+    int cussQueueSize = 10;
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            concurrentClient(
+                http2Client,
+                solrTestRule.getBaseUrl(),
+                null,
+                cussThreadCount,
+                cussQueueSize,
+                false)) {
+
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "collection");
+      concurrentClient.add("collection1", doc);
+      concurrentClient.commit("collection1");
+
+      assertEquals(
+          1,
+          concurrentClient
+              .query("collection1", new SolrQuery("id:collection"))
+              .getResults()
+              .getNumFound());
+    }
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            concurrentClient(
+                http2Client,
+                solrTestRule.getBaseUrl(),
+                DEFAULT_TEST_CORENAME,
+                cussThreadCount,
+                cussQueueSize,
+                false)) {
+
+      assertEquals(
+          1, concurrentClient.query(new 
SolrQuery("id:collection")).getResults().getNumFound());
+    }
+  }
+
+  @Test
+  public void testConcurrentCollectionUpdate() throws Exception {
+
+    int cussThreadCount = 2;
+    int cussQueueSize = 100;
+    int numDocs = 100;
+    int numRunnables = 5;
+    int expected = numDocs * numRunnables;
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            concurrentClient(
+                http2Client,
+                solrTestRule.getBaseUrl(),
+                null,
+                cussThreadCount,
+                cussQueueSize,
+                true)) {
+
+      // ensure it doesn't block where there's nothing to do yet
+      concurrentClient.blockUntilFinished();
+
+      // Delete all existing documents.
+      concurrentClient.deleteByQuery("collection1", "*:*");
+
+      int poolSize = 5;
+      ExecutorService threadPool =
+          ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new 
SolrNamedThreadFactory("testCUSS"));
+
+      for (int r = 0; r < numRunnables; r++)
+        threadPool.execute(
+            new SendDocsRunnable(String.valueOf(r), numDocs, concurrentClient, 
"collection1"));
+
+      // ensure all docs are sent
+      threadPool.awaitTermination(5, TimeUnit.SECONDS);
+      threadPool.shutdown();
+
+      concurrentClient.commit("collection1");
+
+      assertEquals(
+          expected,
+          concurrentClient.query("collection1", new 
SolrQuery("*:*")).getResults().getNumFound());
+
+      // wait until all requests are processed by CUSS
+      concurrentClient.blockUntilFinished();
+      concurrentClient.shutdownNow();
+    }
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            concurrentClient(
+                http2Client,
+                solrTestRule.getBaseUrl(),
+                DEFAULT_TEST_CORENAME,
+                cussThreadCount,
+                cussQueueSize,
+                false)) {
+
+      assertEquals(
+          expected, concurrentClient.query(new 
SolrQuery("*:*")).getResults().getNumFound());
+    }
+  }
+
+  /**
+   * Test that connection timeout information is passed to the HttpSolrClient 
that handles non add
+   * operations.
+   */
+  @Test(timeout = 10000)
+  public void testSocketTimeoutOnCommit() throws IOException, 
SolrServerException {
+    InetAddress localHost = InetAddress.getLocalHost();
+    try (ServerSocket server = new ServerSocket(0, 1, localHost);
+        var http2Client = solrClient(1);
+        var client =
+            concurrentClient(
+                http2Client,
+                "http://";
+                    + localHost.getHostAddress()
+                    + ":"
+                    + server.getLocalPort()
+                    + "/noOneThere",
+                null,
+                10,
+                1,

Review Comment:
   Correct order `queueSize=10, threadCount=1`?



##########
solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTestBase.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import jakarta.servlet.ServletException;
+import jakarta.servlet.http.HttpServlet;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.SocketTimeoutException;
+import java.net.http.HttpConnectTimeoutException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
+import org.apache.solr.client.solrj.request.SolrQuery;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.embedded.JettyConfig;
+import org.apache.solr.util.SolrJettyTestRule;
+import org.eclipse.jetty.ee10.servlet.ServletHolder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ConcurrentUpdateSolrClientTestBase extends 
SolrTestCaseJ4 {
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public abstract HttpSolrClientBase solrClient(Integer overrideIdleTimeoutMs);
+
+  public abstract ConcurrentUpdateBaseSolrClient concurrentClient(
+      HttpSolrClientBase solrClient,
+      String baseUrl,
+      String defaultCollection,
+      int queueSize,
+      int threadCount,
+      boolean disablePollQueue);
+
+  public abstract ConcurrentUpdateBaseSolrClient 
outcomeCountingConcurrentClient(
+      String serverUrl,
+      int queueSize,
+      int threadCount,
+      HttpSolrClientBase solrClient,
+      AtomicInteger successCounter,
+      AtomicInteger failureCounter,
+      StringBuilder errors);
+
+  /** Mock endpoint where the CUSS being tested in this class sends requests. 
*/
+  public static class TestServlet extends HttpServlet
+      implements JavaBinUpdateRequestCodec.StreamingUpdateHandler {
+    private static final long serialVersionUID = 1L;
+
+    public static void clear() {
+      lastMethod = null;
+      headers = null;
+      parameters = null;
+      errorCode = null;
+      numReqsRcvd.set(0);
+      numDocsRcvd.set(0);
+    }
+
+    public static Integer errorCode = null;
+    public static String lastMethod = null;
+    public static HashMap<String, String> headers = null;
+    public static Map<String, String[]> parameters = null;
+    public static AtomicInteger numReqsRcvd = new AtomicInteger(0);
+    public static AtomicInteger numDocsRcvd = new AtomicInteger(0);
+
+    public static void setErrorCode(Integer code) {
+      errorCode = code;
+    }
+
+    private void setHeaders(HttpServletRequest req) {
+      Enumeration<String> headerNames = req.getHeaderNames();
+      headers = new HashMap<>();
+      while (headerNames.hasMoreElements()) {
+        final String name = headerNames.nextElement();
+        headers.put(name, req.getHeader(name));
+      }
+    }
+
+    private void setParameters(HttpServletRequest req) {
+      // parameters = req.getParameterMap();
+    }
+
+    @Override
+    protected void doPost(HttpServletRequest req, HttpServletResponse resp)
+        throws ServletException, IOException {
+
+      numReqsRcvd.incrementAndGet();
+      lastMethod = "post";
+      recordRequest(req, resp);
+
+      InputStream reqIn = req.getInputStream();
+      JavaBinUpdateRequestCodec javabin = new JavaBinUpdateRequestCodec();
+      for (; ; ) {
+        try {
+          javabin.unmarshal(reqIn, this);
+        } catch (EOFException e) {
+          break; // this is expected
+        }
+      }
+    }
+
+    private void recordRequest(HttpServletRequest req, HttpServletResponse 
resp) {
+      setHeaders(req);
+      setParameters(req);
+      if (null != errorCode) {
+        try {
+          resp.sendError(errorCode);
+        } catch (IOException e) {
+          throw new RuntimeException("sendError IO fail in TestServlet", e);
+        }
+      }
+    }
+
+    @Override
+    public void update(
+        SolrInputDocument document, UpdateRequest req, Integer commitWithin, 
Boolean override) {
+      numDocsRcvd.incrementAndGet();
+    }
+  } // end TestServlet
+
+  static class SendDocsRunnable implements Runnable {
+
+    private String id;
+    private int numDocs;
+    private SolrClient cuss;
+    private String collection;
+
+    SendDocsRunnable(String id, int numDocs, SolrClient cuss) {
+      this(id, numDocs, cuss, null);
+    }
+
+    SendDocsRunnable(String id, int numDocs, SolrClient cuss, String 
collection) {
+      this.id = id;
+      this.numDocs = numDocs;
+      this.cuss = cuss;
+      this.collection = collection;
+    }
+
+    @Override
+    public void run() {
+      for (int d = 0; d < numDocs; d++) {
+        SolrInputDocument doc = new SolrInputDocument();
+        String docId = id + "_" + d;
+        doc.setField("id", docId);
+        UpdateRequest req = new UpdateRequest();
+        req.add(doc);
+        try {
+          if (this.collection == null) cuss.request(req);
+          else cuss.request(req, this.collection);
+        } catch (Throwable t) {
+          log.error("error making request", t);
+        }
+      }
+    }
+  }
+
+  @ClassRule public static SolrJettyTestRule solrTestRule = new 
SolrJettyTestRule();
+
+  @BeforeClass
+  public static void beforeTest() throws Exception {
+    JettyConfig jettyConfig =
+        JettyConfig.builder().withServlet(new 
ServletHolder(TestServlet.class), "/cuss/*").build();
+    solrTestRule.startSolr(legacyExampleCollection1SolrHome(), new 
Properties(), jettyConfig);
+  }
+
+  @AfterClass
+  public static void afterTest() throws Exception {}
+
+  @Test
+  public void testConcurrentUpdate() throws Exception {
+    TestServlet.clear();
+
+    String serverUrl = solrTestRule.getBaseUrl() + "/cuss/foo";
+
+    int cussThreadCount = 2;
+    int cussQueueSize = 100;
+
+    // for tracking callbacks from CUSS
+    final AtomicInteger successCounter = new AtomicInteger(0);
+    final AtomicInteger errorCounter = new AtomicInteger(0);
+    final StringBuilder errors = new StringBuilder();
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            outcomeCountingConcurrentClient(
+                serverUrl,
+                cussQueueSize,
+                cussThreadCount,
+                http2Client,
+                successCounter,
+                errorCounter,
+                errors)) {
+
+      // ensure it doesn't block where there's nothing to do yet
+      concurrentClient.blockUntilFinished();
+
+      int poolSize = 5;
+      ExecutorService threadPool =
+          ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new 
SolrNamedThreadFactory("testCUSS"));
+
+      int numDocs = 100;
+      int numRunnables = 5;
+      for (int r = 0; r < numRunnables; r++)
+        threadPool.execute(new SendDocsRunnable(String.valueOf(r), numDocs, 
concurrentClient));
+
+      // ensure all docs are sent
+      threadPool.awaitTermination(5, TimeUnit.SECONDS);
+      threadPool.shutdown();
+
+      // wait until all requests are processed by CUSS
+      concurrentClient.blockUntilFinished();
+      concurrentClient.shutdownNow();
+
+      assertEquals("post", TestServlet.lastMethod);
+
+      // expect all requests to be successful
+      int expectedSuccesses = TestServlet.numReqsRcvd.get();
+      assertTrue(expectedSuccesses > 0); // at least one request must have 
been sent
+
+      assertEquals(
+          "Expected no errors but got " + errorCounter.get() + ", due to: " + 
errors.toString(),
+          0,
+          errorCounter.get());
+      assertEquals(
+          "Expected " + expectedSuccesses + " successes, but got " + 
successCounter.get(),
+          successCounter.get(),
+          expectedSuccesses);
+
+      int expectedDocs = numDocs * numRunnables;
+      assertEquals(
+          "Expected CUSS to send " + expectedDocs + " but got " + 
TestServlet.numDocsRcvd.get(),
+          TestServlet.numDocsRcvd.get(),
+          expectedDocs);
+    }
+  }
+
+  @Test
+  public void testCollectionParameters() throws IOException, 
SolrServerException {
+
+    int cussThreadCount = 2;
+    int cussQueueSize = 10;
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            concurrentClient(
+                http2Client,
+                solrTestRule.getBaseUrl(),
+                null,
+                cussThreadCount,
+                cussQueueSize,
+                false)) {
+
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "collection");
+      concurrentClient.add("collection1", doc);
+      concurrentClient.commit("collection1");
+
+      assertEquals(
+          1,
+          concurrentClient
+              .query("collection1", new SolrQuery("id:collection"))
+              .getResults()
+              .getNumFound());
+    }
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            concurrentClient(
+                http2Client,
+                solrTestRule.getBaseUrl(),
+                DEFAULT_TEST_CORENAME,
+                cussThreadCount,
+                cussQueueSize,

Review Comment:
   Swap order: `queueSize, threadCount`



##########
solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTestBase.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import jakarta.servlet.ServletException;
+import jakarta.servlet.http.HttpServlet;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.SocketTimeoutException;
+import java.net.http.HttpConnectTimeoutException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
+import org.apache.solr.client.solrj.request.SolrQuery;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.embedded.JettyConfig;
+import org.apache.solr.util.SolrJettyTestRule;
+import org.eclipse.jetty.ee10.servlet.ServletHolder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ConcurrentUpdateSolrClientTestBase extends 
SolrTestCaseJ4 {
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public abstract HttpSolrClientBase solrClient(Integer overrideIdleTimeoutMs);
+
+  public abstract ConcurrentUpdateBaseSolrClient concurrentClient(
+      HttpSolrClientBase solrClient,
+      String baseUrl,
+      String defaultCollection,
+      int queueSize,
+      int threadCount,
+      boolean disablePollQueue);
+
+  public abstract ConcurrentUpdateBaseSolrClient 
outcomeCountingConcurrentClient(
+      String serverUrl,
+      int queueSize,
+      int threadCount,
+      HttpSolrClientBase solrClient,
+      AtomicInteger successCounter,
+      AtomicInteger failureCounter,
+      StringBuilder errors);
+
+  /** Mock endpoint where the CUSS being tested in this class sends requests. 
*/
+  public static class TestServlet extends HttpServlet
+      implements JavaBinUpdateRequestCodec.StreamingUpdateHandler {
+    private static final long serialVersionUID = 1L;
+
+    public static void clear() {
+      lastMethod = null;
+      headers = null;
+      parameters = null;
+      errorCode = null;
+      numReqsRcvd.set(0);
+      numDocsRcvd.set(0);
+    }
+
+    public static Integer errorCode = null;
+    public static String lastMethod = null;
+    public static HashMap<String, String> headers = null;
+    public static Map<String, String[]> parameters = null;
+    public static AtomicInteger numReqsRcvd = new AtomicInteger(0);
+    public static AtomicInteger numDocsRcvd = new AtomicInteger(0);
+
+    public static void setErrorCode(Integer code) {
+      errorCode = code;
+    }
+
+    private void setHeaders(HttpServletRequest req) {
+      Enumeration<String> headerNames = req.getHeaderNames();
+      headers = new HashMap<>();
+      while (headerNames.hasMoreElements()) {
+        final String name = headerNames.nextElement();
+        headers.put(name, req.getHeader(name));
+      }
+    }
+
+    private void setParameters(HttpServletRequest req) {
+      // parameters = req.getParameterMap();
+    }
+
+    @Override
+    protected void doPost(HttpServletRequest req, HttpServletResponse resp)
+        throws ServletException, IOException {
+
+      numReqsRcvd.incrementAndGet();
+      lastMethod = "post";
+      recordRequest(req, resp);
+
+      InputStream reqIn = req.getInputStream();
+      JavaBinUpdateRequestCodec javabin = new JavaBinUpdateRequestCodec();
+      for (; ; ) {
+        try {
+          javabin.unmarshal(reqIn, this);
+        } catch (EOFException e) {
+          break; // this is expected
+        }
+      }
+    }
+
+    private void recordRequest(HttpServletRequest req, HttpServletResponse 
resp) {
+      setHeaders(req);
+      setParameters(req);
+      if (null != errorCode) {
+        try {
+          resp.sendError(errorCode);
+        } catch (IOException e) {
+          throw new RuntimeException("sendError IO fail in TestServlet", e);
+        }
+      }
+    }
+
+    @Override
+    public void update(
+        SolrInputDocument document, UpdateRequest req, Integer commitWithin, 
Boolean override) {
+      numDocsRcvd.incrementAndGet();
+    }
+  } // end TestServlet
+
+  static class SendDocsRunnable implements Runnable {
+
+    private String id;
+    private int numDocs;
+    private SolrClient cuss;
+    private String collection;
+
+    SendDocsRunnable(String id, int numDocs, SolrClient cuss) {
+      this(id, numDocs, cuss, null);
+    }
+
+    SendDocsRunnable(String id, int numDocs, SolrClient cuss, String 
collection) {
+      this.id = id;
+      this.numDocs = numDocs;
+      this.cuss = cuss;
+      this.collection = collection;
+    }
+
+    @Override
+    public void run() {
+      for (int d = 0; d < numDocs; d++) {
+        SolrInputDocument doc = new SolrInputDocument();
+        String docId = id + "_" + d;
+        doc.setField("id", docId);
+        UpdateRequest req = new UpdateRequest();
+        req.add(doc);
+        try {
+          if (this.collection == null) cuss.request(req);
+          else cuss.request(req, this.collection);
+        } catch (Throwable t) {
+          log.error("error making request", t);
+        }
+      }
+    }
+  }
+
+  @ClassRule public static SolrJettyTestRule solrTestRule = new 
SolrJettyTestRule();
+
+  @BeforeClass
+  public static void beforeTest() throws Exception {
+    JettyConfig jettyConfig =
+        JettyConfig.builder().withServlet(new 
ServletHolder(TestServlet.class), "/cuss/*").build();
+    solrTestRule.startSolr(legacyExampleCollection1SolrHome(), new 
Properties(), jettyConfig);
+  }
+
+  @AfterClass
+  public static void afterTest() throws Exception {}
+
+  @Test
+  public void testConcurrentUpdate() throws Exception {
+    TestServlet.clear();
+
+    String serverUrl = solrTestRule.getBaseUrl() + "/cuss/foo";
+
+    int cussThreadCount = 2;
+    int cussQueueSize = 100;
+
+    // for tracking callbacks from CUSS
+    final AtomicInteger successCounter = new AtomicInteger(0);
+    final AtomicInteger errorCounter = new AtomicInteger(0);
+    final StringBuilder errors = new StringBuilder();
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            outcomeCountingConcurrentClient(
+                serverUrl,
+                cussQueueSize,
+                cussThreadCount,
+                http2Client,
+                successCounter,
+                errorCounter,
+                errors)) {
+
+      // ensure it doesn't block where there's nothing to do yet
+      concurrentClient.blockUntilFinished();
+
+      int poolSize = 5;
+      ExecutorService threadPool =
+          ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new 
SolrNamedThreadFactory("testCUSS"));
+
+      int numDocs = 100;
+      int numRunnables = 5;
+      for (int r = 0; r < numRunnables; r++)
+        threadPool.execute(new SendDocsRunnable(String.valueOf(r), numDocs, 
concurrentClient));
+
+      // ensure all docs are sent
+      threadPool.awaitTermination(5, TimeUnit.SECONDS);
+      threadPool.shutdown();
+
+      // wait until all requests are processed by CUSS
+      concurrentClient.blockUntilFinished();
+      concurrentClient.shutdownNow();
+
+      assertEquals("post", TestServlet.lastMethod);
+
+      // expect all requests to be successful
+      int expectedSuccesses = TestServlet.numReqsRcvd.get();
+      assertTrue(expectedSuccesses > 0); // at least one request must have 
been sent
+
+      assertEquals(
+          "Expected no errors but got " + errorCounter.get() + ", due to: " + 
errors.toString(),
+          0,
+          errorCounter.get());
+      assertEquals(
+          "Expected " + expectedSuccesses + " successes, but got " + 
successCounter.get(),
+          successCounter.get(),
+          expectedSuccesses);
+
+      int expectedDocs = numDocs * numRunnables;
+      assertEquals(
+          "Expected CUSS to send " + expectedDocs + " but got " + 
TestServlet.numDocsRcvd.get(),
+          TestServlet.numDocsRcvd.get(),
+          expectedDocs);
+    }
+  }
+
+  @Test
+  public void testCollectionParameters() throws IOException, 
SolrServerException {
+
+    int cussThreadCount = 2;
+    int cussQueueSize = 10;
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            concurrentClient(
+                http2Client,
+                solrTestRule.getBaseUrl(),
+                null,
+                cussThreadCount,
+                cussQueueSize,
+                false)) {
+
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "collection");
+      concurrentClient.add("collection1", doc);
+      concurrentClient.commit("collection1");
+
+      assertEquals(
+          1,
+          concurrentClient
+              .query("collection1", new SolrQuery("id:collection"))
+              .getResults()
+              .getNumFound());
+    }
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            concurrentClient(
+                http2Client,
+                solrTestRule.getBaseUrl(),
+                DEFAULT_TEST_CORENAME,
+                cussThreadCount,
+                cussQueueSize,
+                false)) {
+
+      assertEquals(
+          1, concurrentClient.query(new 
SolrQuery("id:collection")).getResults().getNumFound());
+    }
+  }
+
+  @Test
+  public void testConcurrentCollectionUpdate() throws Exception {
+
+    int cussThreadCount = 2;
+    int cussQueueSize = 100;
+    int numDocs = 100;
+    int numRunnables = 5;
+    int expected = numDocs * numRunnables;
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            concurrentClient(
+                http2Client,
+                solrTestRule.getBaseUrl(),
+                null,
+                cussThreadCount,
+                cussQueueSize,
+                true)) {
+
+      // ensure it doesn't block where there's nothing to do yet
+      concurrentClient.blockUntilFinished();
+
+      // Delete all existing documents.
+      concurrentClient.deleteByQuery("collection1", "*:*");
+
+      int poolSize = 5;
+      ExecutorService threadPool =
+          ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new 
SolrNamedThreadFactory("testCUSS"));
+
+      for (int r = 0; r < numRunnables; r++)
+        threadPool.execute(
+            new SendDocsRunnable(String.valueOf(r), numDocs, concurrentClient, 
"collection1"));
+
+      // ensure all docs are sent
+      threadPool.awaitTermination(5, TimeUnit.SECONDS);
+      threadPool.shutdown();
+
+      concurrentClient.commit("collection1");
+
+      assertEquals(
+          expected,
+          concurrentClient.query("collection1", new 
SolrQuery("*:*")).getResults().getNumFound());
+
+      // wait until all requests are processed by CUSS
+      concurrentClient.blockUntilFinished();
+      concurrentClient.shutdownNow();
+    }
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            concurrentClient(
+                http2Client,
+                solrTestRule.getBaseUrl(),
+                DEFAULT_TEST_CORENAME,
+                cussThreadCount,
+                cussQueueSize,

Review Comment:
   Swap order: `queueSize, threadCount`



##########
solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTestBase.java:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import jakarta.servlet.ServletException;
+import jakarta.servlet.http.HttpServlet;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.SocketTimeoutException;
+import java.net.http.HttpConnectTimeoutException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
+import org.apache.solr.client.solrj.request.SolrQuery;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.embedded.JettyConfig;
+import org.apache.solr.util.SolrJettyTestRule;
+import org.eclipse.jetty.ee10.servlet.ServletHolder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ConcurrentUpdateSolrClientTestBase extends 
SolrTestCaseJ4 {
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public abstract HttpSolrClientBase solrClient(Integer overrideIdleTimeoutMs);
+
+  public abstract ConcurrentUpdateBaseSolrClient concurrentClient(
+      HttpSolrClientBase solrClient,
+      String baseUrl,
+      String defaultCollection,
+      int queueSize,
+      int threadCount,
+      boolean disablePollQueue);
+
+  public abstract ConcurrentUpdateBaseSolrClient 
outcomeCountingConcurrentClient(
+      String serverUrl,
+      int queueSize,
+      int threadCount,
+      HttpSolrClientBase solrClient,
+      AtomicInteger successCounter,
+      AtomicInteger failureCounter,
+      StringBuilder errors);
+
+  /** Mock endpoint where the CUSS being tested in this class sends requests. 
*/
+  public static class TestServlet extends HttpServlet
+      implements JavaBinUpdateRequestCodec.StreamingUpdateHandler {
+    private static final long serialVersionUID = 1L;
+
+    public static void clear() {
+      lastMethod = null;
+      headers = null;
+      parameters = null;
+      errorCode = null;
+      numReqsRcvd.set(0);
+      numDocsRcvd.set(0);
+    }
+
+    public static Integer errorCode = null;
+    public static String lastMethod = null;
+    public static HashMap<String, String> headers = null;
+    public static Map<String, String[]> parameters = null;
+    public static AtomicInteger numReqsRcvd = new AtomicInteger(0);
+    public static AtomicInteger numDocsRcvd = new AtomicInteger(0);
+
+    public static void setErrorCode(Integer code) {
+      errorCode = code;
+    }
+
+    private void setHeaders(HttpServletRequest req) {
+      Enumeration<String> headerNames = req.getHeaderNames();
+      headers = new HashMap<>();
+      while (headerNames.hasMoreElements()) {
+        final String name = headerNames.nextElement();
+        headers.put(name, req.getHeader(name));
+      }
+    }
+
+    private void setParameters(HttpServletRequest req) {
+      // parameters = req.getParameterMap();
+    }
+
+    @Override
+    protected void doPost(HttpServletRequest req, HttpServletResponse resp)
+        throws ServletException, IOException {
+
+      numReqsRcvd.incrementAndGet();
+      lastMethod = "post";
+      recordRequest(req, resp);
+
+      InputStream reqIn = req.getInputStream();
+      JavaBinUpdateRequestCodec javabin = new JavaBinUpdateRequestCodec();
+      for (; ; ) {
+        try {
+          javabin.unmarshal(reqIn, this);
+        } catch (EOFException e) {
+          break; // this is expected
+        }
+      }
+    }
+
+    private void recordRequest(HttpServletRequest req, HttpServletResponse 
resp) {
+      setHeaders(req);
+      setParameters(req);
+      if (null != errorCode) {
+        try {
+          resp.sendError(errorCode);
+        } catch (IOException e) {
+          throw new RuntimeException("sendError IO fail in TestServlet", e);
+        }
+      }
+    }
+
+    @Override
+    public void update(
+        SolrInputDocument document, UpdateRequest req, Integer commitWithin, 
Boolean override) {
+      numDocsRcvd.incrementAndGet();
+    }
+  } // end TestServlet
+
+  static class SendDocsRunnable implements Runnable {
+
+    private String id;
+    private int numDocs;
+    private SolrClient cuss;
+    private String collection;
+
+    SendDocsRunnable(String id, int numDocs, SolrClient cuss) {
+      this(id, numDocs, cuss, null);
+    }
+
+    SendDocsRunnable(String id, int numDocs, SolrClient cuss, String 
collection) {
+      this.id = id;
+      this.numDocs = numDocs;
+      this.cuss = cuss;
+      this.collection = collection;
+    }
+
+    @Override
+    public void run() {
+      for (int d = 0; d < numDocs; d++) {
+        SolrInputDocument doc = new SolrInputDocument();
+        String docId = id + "_" + d;
+        doc.setField("id", docId);
+        UpdateRequest req = new UpdateRequest();
+        req.add(doc);
+        try {
+          if (this.collection == null) cuss.request(req);
+          else cuss.request(req, this.collection);
+        } catch (Throwable t) {
+          log.error("error making request", t);
+        }
+      }
+    }
+  }
+
+  @ClassRule public static SolrJettyTestRule solrTestRule = new 
SolrJettyTestRule();
+
+  @BeforeClass
+  public static void beforeTest() throws Exception {
+    JettyConfig jettyConfig =
+        JettyConfig.builder().withServlet(new 
ServletHolder(TestServlet.class), "/cuss/*").build();
+    solrTestRule.startSolr(legacyExampleCollection1SolrHome(), new 
Properties(), jettyConfig);
+  }
+
+  @AfterClass
+  public static void afterTest() throws Exception {}
+
+  @Test
+  public void testConcurrentUpdate() throws Exception {
+    TestServlet.clear();
+
+    String serverUrl = solrTestRule.getBaseUrl() + "/cuss/foo";
+
+    int cussThreadCount = 2;
+    int cussQueueSize = 100;
+
+    // for tracking callbacks from CUSS
+    final AtomicInteger successCounter = new AtomicInteger(0);
+    final AtomicInteger errorCounter = new AtomicInteger(0);
+    final StringBuilder errors = new StringBuilder();
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            outcomeCountingConcurrentClient(
+                serverUrl,
+                cussQueueSize,
+                cussThreadCount,
+                http2Client,
+                successCounter,
+                errorCounter,
+                errors)) {
+
+      // ensure it doesn't block where there's nothing to do yet
+      concurrentClient.blockUntilFinished();
+
+      int poolSize = 5;
+      ExecutorService threadPool =
+          ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new 
SolrNamedThreadFactory("testCUSS"));
+
+      int numDocs = 100;
+      int numRunnables = 5;
+      for (int r = 0; r < numRunnables; r++)
+        threadPool.execute(new SendDocsRunnable(String.valueOf(r), numDocs, 
concurrentClient));
+
+      // ensure all docs are sent
+      threadPool.awaitTermination(5, TimeUnit.SECONDS);
+      threadPool.shutdown();
+
+      // wait until all requests are processed by CUSS
+      concurrentClient.blockUntilFinished();
+      concurrentClient.shutdownNow();
+
+      assertEquals("post", TestServlet.lastMethod);
+
+      // expect all requests to be successful
+      int expectedSuccesses = TestServlet.numReqsRcvd.get();
+      assertTrue(expectedSuccesses > 0); // at least one request must have 
been sent
+
+      assertEquals(
+          "Expected no errors but got " + errorCounter.get() + ", due to: " + 
errors.toString(),
+          0,
+          errorCounter.get());
+      assertEquals(
+          "Expected " + expectedSuccesses + " successes, but got " + 
successCounter.get(),
+          successCounter.get(),
+          expectedSuccesses);
+
+      int expectedDocs = numDocs * numRunnables;
+      assertEquals(
+          "Expected CUSS to send " + expectedDocs + " but got " + 
TestServlet.numDocsRcvd.get(),
+          TestServlet.numDocsRcvd.get(),
+          expectedDocs);
+    }
+  }
+
+  @Test
+  public void testCollectionParameters() throws IOException, 
SolrServerException {
+
+    int cussThreadCount = 2;
+    int cussQueueSize = 10;
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            concurrentClient(
+                http2Client,
+                solrTestRule.getBaseUrl(),
+                null,
+                cussThreadCount,
+                cussQueueSize,
+                false)) {
+
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "collection");
+      concurrentClient.add("collection1", doc);
+      concurrentClient.commit("collection1");
+
+      assertEquals(
+          1,
+          concurrentClient
+              .query("collection1", new SolrQuery("id:collection"))
+              .getResults()
+              .getNumFound());
+    }
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            concurrentClient(
+                http2Client,
+                solrTestRule.getBaseUrl(),
+                DEFAULT_TEST_CORENAME,
+                cussThreadCount,
+                cussQueueSize,
+                false)) {
+
+      assertEquals(
+          1, concurrentClient.query(new 
SolrQuery("id:collection")).getResults().getNumFound());
+    }
+  }
+
+  @Test
+  public void testConcurrentCollectionUpdate() throws Exception {
+
+    int cussThreadCount = 2;
+    int cussQueueSize = 100;
+    int numDocs = 100;
+    int numRunnables = 5;
+    int expected = numDocs * numRunnables;
+
+    try (var http2Client = solrClient(null);
+        var concurrentClient =
+            concurrentClient(
+                http2Client,
+                solrTestRule.getBaseUrl(),
+                null,
+                cussThreadCount,
+                cussQueueSize,

Review Comment:
   Swap order: `queueSize, threadCount`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to