This is an automated email from the ASF dual-hosted git repository.
jdyer pushed a commit to branch branch_10x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_10x by this push:
new 708fa0723f5 SOLR-18065: ConcurrentUpdateBaseSolrClient: support for
HttpJdkSolrClient (#4050)
708fa0723f5 is described below
commit 708fa0723f58f18f5cc9a3b7ed54481b7baf21c6
Author: James Dyer <[email protected]>
AuthorDate: Fri Jan 16 14:20:48 2026 -0600
SOLR-18065: ConcurrentUpdateBaseSolrClient: support for HttpJdkSolrClient
(#4050)
---
changelog/unreleased/SOLR-18065.yml | 7 +
.../randomization/policies/solr-tests.policy | 8 +
.../modules/deployment-guide/pages/solrj.adoc | 1 +
solr/solrj-jetty/build.gradle | 2 -
.../jetty/ConcurrentUpdateJettySolrClientTest.java | 405 +++------------------
.../solrj/impl/ConcurrentUpdateBaseSolrClient.java | 4 +-
.../solrj/impl/ConcurrentUpdateJdkSolrClient.java | 102 ++++++
.../solr/client/solrj/impl/HttpJdkSolrClient.java | 18 +-
.../solrj/impl/ClusterStateProviderTest.java | 4 +-
.../impl/ConcurrentUpdateJdkSolrClientTest.java | 122 +++++++
.../impl/ConcurrentUpdateSolrClientTestBase.java} | 178 +++++----
11 files changed, 394 insertions(+), 457 deletions(-)
diff --git a/changelog/unreleased/SOLR-18065.yml
b/changelog/unreleased/SOLR-18065.yml
new file mode 100644
index 00000000000..3da09e4b089
--- /dev/null
+++ b/changelog/unreleased/SOLR-18065.yml
@@ -0,0 +1,7 @@
+title: Added new ConcurrentUpdateJdkSolrClient that works with
HttpJdkSolrClient
+type: added
+authors:
+ - name: James Dyer
+links:
+ - name: SOLR-18065
+ url: https://issues.apache.org/jira/browse/SOLR-18065
diff --git a/gradle/testing/randomization/policies/solr-tests.policy
b/gradle/testing/randomization/policies/solr-tests.policy
index d95d4b245dc..eeddeb41809 100644
--- a/gradle/testing/randomization/policies/solr-tests.policy
+++ b/gradle/testing/randomization/policies/solr-tests.policy
@@ -197,6 +197,14 @@ grant {
permission "java.net.URLPermission" "https://[::1]:*/solr/-",
"HEAD,GET,PUT,POST:*";
permission "java.net.URLPermission" "socket://[::1]:*", "CONNECT:*";
+ // for java.net.http.HttpClient. See ConcurrentUpdateJdkSolrClientTest
+ permission "java.net.URLPermission" "http://127.0.0.1:*/noOneThere/-",
"HEAD,GET,PUT,POST:*";
+ permission "java.net.URLPermission" "https://127.0.0.1:*/noOneThere/-",
"HEAD,GET,PUT,POST:*";
+ permission "java.net.URLPermission" "http://localhost:*/noOneThere/-",
"HEAD,GET,PUT,POST:*";
+ permission "java.net.URLPermission" "https://localhost:*/noOneThere/-",
"HEAD,GET,PUT,POST:*";
+ permission "java.net.URLPermission" "http://[::1]:*/noOneThere/-",
"HEAD,GET,PUT,POST:*";
+ permission "java.net.URLPermission" "https://[::1]:*/noOneThere/-",
"HEAD,GET,PUT,POST:*";
+
// Needed by TestCoreDiscovery
permission java.lang.RuntimePermission "accessUserInformation";
};
diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/solrj.adoc
b/solr/solr-ref-guide/modules/deployment-guide/pages/solrj.adoc
index 40f2dea68d9..b7792a4bc71 100644
--- a/solr/solr-ref-guide/modules/deployment-guide/pages/solrj.adoc
+++ b/solr/solr-ref-guide/modules/deployment-guide/pages/solrj.adoc
@@ -127,6 +127,7 @@ Requests are sent in the form of
{solr-javadocs}/solrj/org/apache/solr/client/so
-
{solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/CloudSolrClient.html[`CloudSolrClient`]
- the ideal client for SolrCloud. Using the "cluster state", it routes
requests to the optimal nodes, including splitting out the documents in an
UpdateRequest to different nodes.
-
{solr-javadocs}/solrj-jetty/org/apache/solr/client/solrj/jetty/ConcurrentUpdateJettySolrClient.html[`ConcurrentUpdateJettySolrClient`]
- geared towards indexing-centric workloads.
Buffers documents internally before sending larger batches to Solr.
+-
{solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/ConcurrentUpdateJdkSolrClient.html[`ConcurrentUpdateJdkSolrClient`]
- similar to `ConcurrentUpdateJettySolrClient` but delegates to
`HttpJdkSolrClient` and has no dependencies.
=== Common Configuration Options
diff --git a/solr/solrj-jetty/build.gradle b/solr/solrj-jetty/build.gradle
index 6f3f6eda4e2..0186f025260 100644
--- a/solr/solrj-jetty/build.gradle
+++ b/solr/solrj-jetty/build.gradle
@@ -55,8 +55,6 @@ dependencies {
testImplementation libs.junit.junit
testImplementation libs.hamcrest.hamcrest
- testImplementation libs.jakarta.servlet.api
-
testImplementation libs.eclipse.jetty.ee10.servlet
testRuntimeOnly(libs.eclipse.jetty.alpnjavaserver, {
exclude group: "org.eclipse.jetty.alpn", module: "alpn-api"
diff --git
a/solr/solrj-jetty/src/test/org/apache/solr/client/solrj/jetty/ConcurrentUpdateJettySolrClientTest.java
b/solr/solrj-jetty/src/test/org/apache/solr/client/solrj/jetty/ConcurrentUpdateJettySolrClientTest.java
index 9c89cf3d27b..aa49a737a50 100644
---
a/solr/solrj-jetty/src/test/org/apache/solr/client/solrj/jetty/ConcurrentUpdateJettySolrClientTest.java
+++
b/solr/solrj-jetty/src/test/org/apache/solr/client/solrj/jetty/ConcurrentUpdateJettySolrClientTest.java
@@ -17,338 +17,70 @@
package org.apache.solr.client.solrj.jetty;
-import jakarta.servlet.ServletException;
-import jakarta.servlet.http.HttpServlet;
-import jakarta.servlet.http.HttpServletRequest;
-import jakarta.servlet.http.HttpServletResponse;
-import java.io.EOFException;
-import java.io.IOException;
import java.io.InputStream;
-import java.lang.invoke.MethodHandles;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.net.SocketTimeoutException;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.solr.SolrTestCaseJ4;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
-import org.apache.solr.client.solrj.request.SolrQuery;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
-import org.apache.solr.embedded.JettyConfig;
-import org.apache.solr.util.SolrJettyTestRule;
-import org.eclipse.jetty.ee10.servlet.ServletHolder;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConcurrentUpdateJettySolrClientTest extends SolrTestCaseJ4 {
- private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- /** Mock endpoint where the CUSS being tested in this class sends requests.
*/
- public static class TestServlet extends HttpServlet
- implements JavaBinUpdateRequestCodec.StreamingUpdateHandler {
- private static final long serialVersionUID = 1L;
-
- public static void clear() {
- lastMethod = null;
- headers = null;
- parameters = null;
- errorCode = null;
- numReqsRcvd.set(0);
- numDocsRcvd.set(0);
- }
-
- public static Integer errorCode = null;
- public static String lastMethod = null;
- public static HashMap<String, String> headers = null;
- public static Map<String, String[]> parameters = null;
- public static AtomicInteger numReqsRcvd = new AtomicInteger(0);
- public static AtomicInteger numDocsRcvd = new AtomicInteger(0);
-
- public static void setErrorCode(Integer code) {
- errorCode = code;
- }
-
- private void setHeaders(HttpServletRequest req) {
- Enumeration<String> headerNames = req.getHeaderNames();
- headers = new HashMap<>();
- while (headerNames.hasMoreElements()) {
- final String name = headerNames.nextElement();
- headers.put(name, req.getHeader(name));
- }
- }
-
- private void setParameters(HttpServletRequest req) {
- // parameters = req.getParameterMap();
- }
-
- @Override
- protected void doPost(HttpServletRequest req, HttpServletResponse resp)
- throws ServletException, IOException {
-
- numReqsRcvd.incrementAndGet();
- lastMethod = "post";
- recordRequest(req, resp);
-
- InputStream reqIn = req.getInputStream();
- JavaBinUpdateRequestCodec javabin = new JavaBinUpdateRequestCodec();
- for (; ; ) {
- try {
- javabin.unmarshal(reqIn, this);
- } catch (EOFException e) {
- break; // this is expected
- }
- }
- }
-
- private void recordRequest(HttpServletRequest req, HttpServletResponse
resp) {
- setHeaders(req);
- setParameters(req);
- if (null != errorCode) {
- try {
- resp.sendError(errorCode);
- } catch (IOException e) {
- throw new RuntimeException("sendError IO fail in TestServlet", e);
- }
- }
- }
-
- @Override
- public void update(
- SolrInputDocument document, UpdateRequest req, Integer commitWithin,
Boolean override) {
- numDocsRcvd.incrementAndGet();
- }
- } // end TestServlet
-
- static class SendDocsRunnable implements Runnable {
-
- private String id;
- private int numDocs;
- private SolrClient cuss;
- private String collection;
-
- SendDocsRunnable(String id, int numDocs, SolrClient cuss) {
- this(id, numDocs, cuss, null);
- }
-
- SendDocsRunnable(String id, int numDocs, SolrClient cuss, String
collection) {
- this.id = id;
- this.numDocs = numDocs;
- this.cuss = cuss;
- this.collection = collection;
- }
-
- @Override
- public void run() {
- for (int d = 0; d < numDocs; d++) {
- SolrInputDocument doc = new SolrInputDocument();
- String docId = id + "_" + d;
- doc.setField("id", docId);
- UpdateRequest req = new UpdateRequest();
- req.add(doc);
- try {
- if (this.collection == null) cuss.request(req);
- else cuss.request(req, this.collection);
- } catch (Throwable t) {
- log.error("error making request", t);
- }
- }
- }
- }
-
- @ClassRule public static SolrJettyTestRule solrTestRule = new
SolrJettyTestRule();
-
- @BeforeClass
- public static void beforeTest() throws Exception {
- JettyConfig jettyConfig =
- JettyConfig.builder().withServlet(new
ServletHolder(TestServlet.class), "/cuss/*").build();
- solrTestRule.startSolr(legacyExampleCollection1SolrHome(), new
Properties(), jettyConfig);
- }
-
- @Test
- public void testConcurrentUpdate() throws Exception {
- TestServlet.clear();
-
- String serverUrl = solrTestRule.getBaseUrl() + "/cuss/foo";
-
- int cussThreadCount = 2;
- int cussQueueSize = 100;
-
- // for tracking callbacks from CUSS
- final AtomicInteger successCounter = new AtomicInteger(0);
- final AtomicInteger errorCounter = new AtomicInteger(0);
- final StringBuilder errors = new StringBuilder();
-
- try (var http2Client = new HttpJettySolrClient.Builder().build();
- var concurrentClient =
- new OutcomeCountingConcurrentUpdateSolrClient.Builder(
- serverUrl, http2Client, successCounter, errorCounter,
errors)
- .withQueueSize(cussQueueSize)
- .withThreadCount(cussThreadCount)
- .setPollQueueTime(0, TimeUnit.MILLISECONDS)
- .build()) {
-
- // ensure it doesn't block where there's nothing to do yet
- concurrentClient.blockUntilFinished();
-
- int poolSize = 5;
- ExecutorService threadPool =
- ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new
SolrNamedThreadFactory("testCUSS"));
-
- int numDocs = 100;
- int numRunnables = 5;
- for (int r = 0; r < numRunnables; r++)
- threadPool.execute(new SendDocsRunnable(String.valueOf(r), numDocs,
concurrentClient));
-
- // ensure all docs are sent
- threadPool.awaitTermination(5, TimeUnit.SECONDS);
- threadPool.shutdown();
-
- // wait until all requests are processed by CUSS
- concurrentClient.blockUntilFinished();
- concurrentClient.shutdownNow();
-
- assertEquals("post", TestServlet.lastMethod);
-
- // expect all requests to be successful
- int expectedSuccesses = TestServlet.numReqsRcvd.get();
- assertTrue(expectedSuccesses > 0); // at least one request must have
been sent
-
- assertEquals(
- "Expected no errors but got " + errorCounter.get() + ", due to: " +
errors.toString(),
- 0,
- errorCounter.get());
- assertEquals(
- "Expected " + expectedSuccesses + " successes, but got " +
successCounter.get(),
- successCounter.get(),
- expectedSuccesses);
-
- int expectedDocs = numDocs * numRunnables;
- assertEquals(
- "Expected CUSS to send " + expectedDocs + " but got " +
TestServlet.numDocsRcvd.get(),
- TestServlet.numDocsRcvd.get(),
- expectedDocs);
- }
+import org.apache.solr.client.solrj.impl.ConcurrentUpdateBaseSolrClient;
+import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClientTestBase;
+import org.apache.solr.client.solrj.impl.HttpSolrClientBase;
+
+public class ConcurrentUpdateJettySolrClientTest extends
ConcurrentUpdateSolrClientTestBase {
+
+ @Override
+ public ConcurrentUpdateBaseSolrClient outcomeCountingConcurrentClient(
+ String serverUrl,
+ int queueSize,
+ int threadCount,
+ HttpSolrClientBase solrClient,
+ AtomicInteger successCounter,
+ AtomicInteger failureCounter,
+ StringBuilder errors) {
+ return new OutcomeCountingConcurrentUpdateJettySolrClient.Builder(
+ serverUrl, (HttpJettySolrClient) solrClient, successCounter,
failureCounter, errors)
+ .withQueueSize(queueSize)
+ .withThreadCount(threadCount)
+ .setPollQueueTime(0, TimeUnit.MILLISECONDS)
+ .build();
}
- @Test
- public void testCollectionParameters() throws IOException,
SolrServerException {
-
- int cussThreadCount = 2;
- int cussQueueSize = 10;
-
- try (var http2Client = new HttpJettySolrClient.Builder().build();
- var concurrentClient =
- (new
ConcurrentUpdateJettySolrClient.Builder(solrTestRule.getBaseUrl(), http2Client))
- .withQueueSize(cussQueueSize)
- .withThreadCount(cussThreadCount)
- .build()) {
-
- SolrInputDocument doc = new SolrInputDocument();
- doc.addField("id", "collection");
- concurrentClient.add("collection1", doc);
- concurrentClient.commit("collection1");
-
- assertEquals(
- 1,
- concurrentClient
- .query("collection1", new SolrQuery("id:collection"))
- .getResults()
- .getNumFound());
- }
-
- try (var http2Client = new HttpJettySolrClient.Builder().build();
- var concurrentClient =
- new
ConcurrentUpdateJettySolrClient.Builder(solrTestRule.getBaseUrl(), http2Client)
- .withDefaultCollection(DEFAULT_TEST_CORENAME)
- .withQueueSize(cussQueueSize)
- .withThreadCount(cussThreadCount)
- .build()) {
-
- assertEquals(
- 1, concurrentClient.query(new
SolrQuery("id:collection")).getResults().getNumFound());
+ @Override
+ public HttpSolrClientBase solrClient(Integer overrideIdleTimeoutMs) {
+ var builder = new HttpJettySolrClient.Builder();
+ if (overrideIdleTimeoutMs != null) {
+ builder.withIdleTimeout(overrideIdleTimeoutMs, TimeUnit.MILLISECONDS);
}
+ return builder.build();
}
- @Test
- public void testConcurrentCollectionUpdate() throws Exception {
-
- int cussThreadCount = 2;
- int cussQueueSize = 100;
- int numDocs = 100;
- int numRunnables = 5;
- int expected = numDocs * numRunnables;
-
- try (var http2Client = new HttpJettySolrClient.Builder().build();
- var concurrentClient =
- new
ConcurrentUpdateJettySolrClient.Builder(solrTestRule.getBaseUrl(), http2Client)
- .withQueueSize(cussQueueSize)
- .withThreadCount(cussThreadCount)
- .setPollQueueTime(0, TimeUnit.MILLISECONDS)
- .build()) {
-
- // ensure it doesn't block where there's nothing to do yet
- concurrentClient.blockUntilFinished();
-
- // Delete all existing documents.
- concurrentClient.deleteByQuery("collection1", "*:*");
-
- int poolSize = 5;
- ExecutorService threadPool =
- ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new
SolrNamedThreadFactory("testCUSS"));
-
- for (int r = 0; r < numRunnables; r++)
- threadPool.execute(
- new SendDocsRunnable(String.valueOf(r), numDocs, concurrentClient,
"collection1"));
-
- // ensure all docs are sent
- threadPool.awaitTermination(5, TimeUnit.SECONDS);
- threadPool.shutdown();
-
- concurrentClient.commit("collection1");
-
- assertEquals(
- expected,
- concurrentClient.query("collection1", new
SolrQuery("*:*")).getResults().getNumFound());
-
- // wait until all requests are processed by CUSS
- concurrentClient.blockUntilFinished();
- concurrentClient.shutdownNow();
- }
-
- try (var http2Client = new HttpJettySolrClient.Builder().build();
- var concurrentClient =
- new
ConcurrentUpdateJettySolrClient.Builder(solrTestRule.getBaseUrl(), http2Client)
- .withDefaultCollection(DEFAULT_TEST_CORENAME)
- .withQueueSize(cussQueueSize)
- .withThreadCount(cussThreadCount)
- .build()) {
-
- assertEquals(
- expected, concurrentClient.query(new
SolrQuery("*:*")).getResults().getNumFound());
- }
+ @Override
+ public ConcurrentUpdateBaseSolrClient concurrentClient(
+ HttpSolrClientBase solrClient,
+ String baseUrl,
+ String defaultCollection,
+ int queueSize,
+ int threadCount,
+ boolean disablePollQueue) {
+ var builder =
+ new ConcurrentUpdateJettySolrClient.Builder(baseUrl,
(HttpJettySolrClient) solrClient)
+ .withQueueSize(queueSize)
+ .withThreadCount(threadCount);
+ if (defaultCollection != null) {
+ builder.withDefaultCollection(defaultCollection);
+ }
+ if (disablePollQueue) {
+ builder.setPollQueueTime(0, TimeUnit.MILLISECONDS);
+ }
+ return builder.build();
}
- static class OutcomeCountingConcurrentUpdateSolrClient extends
ConcurrentUpdateJettySolrClient {
+ public static class OutcomeCountingConcurrentUpdateJettySolrClient
+ extends ConcurrentUpdateJettySolrClient {
private final AtomicInteger successCounter;
private final AtomicInteger failureCounter;
private final StringBuilder errors;
- public OutcomeCountingConcurrentUpdateSolrClient(
- OutcomeCountingConcurrentUpdateSolrClient.Builder builder) {
+ public OutcomeCountingConcurrentUpdateJettySolrClient(
+ OutcomeCountingConcurrentUpdateJettySolrClient.Builder builder) {
super(builder);
this.successCounter = builder.successCounter;
this.failureCounter = builder.failureCounter;
@@ -366,7 +98,7 @@ public class ConcurrentUpdateJettySolrClientTest extends
SolrTestCaseJ4 {
successCounter.incrementAndGet();
}
- static class Builder extends ConcurrentUpdateJettySolrClient.Builder {
+ public static class Builder extends
ConcurrentUpdateJettySolrClient.Builder {
protected final AtomicInteger successCounter;
protected final AtomicInteger failureCounter;
protected final StringBuilder errors;
@@ -384,40 +116,9 @@ public class ConcurrentUpdateJettySolrClientTest extends
SolrTestCaseJ4 {
}
@Override
- public OutcomeCountingConcurrentUpdateSolrClient build() {
- return new OutcomeCountingConcurrentUpdateSolrClient(this);
- }
- }
- }
-
- /**
- * Test that connection timeout information is passed to the HttpSolrClient
that handles non add
- * operations.
- */
- @Test(timeout = 10000)
- public void testSocketTimeoutOnCommit() throws IOException,
SolrServerException {
- InetAddress localHost = InetAddress.getLocalHost();
- try (ServerSocket server = new ServerSocket(0, 1, localHost);
- var http2Client =
- new HttpJettySolrClient.Builder().withIdleTimeout(1,
TimeUnit.MILLISECONDS).build();
- var client =
- new ConcurrentUpdateJettySolrClient.Builder(
- "http://"
- + localHost.getHostAddress()
- + ":"
- + server.getLocalPort()
- + "/noOneThere",
- http2Client)
- .build()) {
- // Expecting an exception
- client.commit();
- fail();
- } catch (SolrServerException e) {
- if (!(e.getCause() instanceof SocketTimeoutException // not sure if
Jetty throws this
- || e.getCause() instanceof TimeoutException)) { // Jetty throws this
- throw e;
+ public OutcomeCountingConcurrentUpdateJettySolrClient build() {
+ return new OutcomeCountingConcurrentUpdateJettySolrClient(this);
}
- // else test passes
}
}
}
diff --git
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateBaseSolrClient.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateBaseSolrClient.java
index 25f24de0ed8..59513af9481 100644
---
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateBaseSolrClient.java
+++
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateBaseSolrClient.java
@@ -77,7 +77,7 @@ public abstract class ConcurrentUpdateBaseSolrClient extends
SolrClient {
private final int queueSize;
private final E backdoorE;
- public CustomBlockingQueue(int queueSize, int maxConsumers, E backdoorE) {
+ public CustomBlockingQueue(int queueSize, E backdoorE) {
queue = new LinkedBlockingQueue<>();
available = new Semaphore(queueSize);
this.queueSize = queueSize;
@@ -152,7 +152,7 @@ public abstract class ConcurrentUpdateBaseSolrClient
extends SolrClient {
this.shutdownClient = builder.closeHttpClient;
this.idleTimeoutMillis = builder.idleTimeoutMillis;
this.threadCount = builder.threadCount;
- this.queue = new CustomBlockingQueue<>(builder.queueSize, threadCount,
END_UPDATE);
+ this.queue = new CustomBlockingQueue<>(builder.queueSize, END_UPDATE);
this.runners = new ArrayDeque<>();
this.streamDeletes = builder.streamDeletes;
this.basePath = builder.baseSolrUrl;
diff --git
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateJdkSolrClient.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateJdkSolrClient.java
new file mode 100644
index 00000000000..87bceda8408
--- /dev/null
+++
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateJdkSolrClient.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.http.HttpResponse;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+
+/** A ConcurrentUpdate SolrClient using {@link HttpJdkSolrClient}. */
+public class ConcurrentUpdateJdkSolrClient extends
ConcurrentUpdateBaseSolrClient {
+
+ private final HttpJdkSolrClient client;
+
+ protected
ConcurrentUpdateJdkSolrClient(ConcurrentUpdateJdkSolrClient.Builder builder) {
+ super(builder);
+ this.client = (HttpJdkSolrClient) builder.getClient();
+ }
+
+ @Override
+ protected StreamingResponse doSendUpdateStream(Update update) {
+ UpdateRequest req = update.request();
+ String collection = update.collection();
+ CompletableFuture<HttpResponse<InputStream>> resp =
+ client.requestInputStreamAsync(basePath, req, collection);
+
+ return new StreamingResponse() {
+
+ @Override
+ public int awaitResponse(long timeoutMillis) throws Exception {
+ return resp.get(timeoutMillis, TimeUnit.MILLISECONDS).statusCode();
+ }
+
+ @Override
+ public InputStream getInputStream() {
+ try {
+ return resp.get().body();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return InputStream.nullInputStream();
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Object getUnderlyingResponse() {
+ return resp;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // No-op: InputStream is managed by java.net.http.HttpClient
+ }
+ };
+ }
+
+ public static class Builder extends ConcurrentUpdateBaseSolrClient.Builder {
+ /**
+ * @see
org.apache.solr.client.solrj.impl.ConcurrentUpdateBaseSolrClient.Builder#Builder(String,
+ * HttpSolrClientBase)
+ */
+ public Builder(String baseUrl, HttpJdkSolrClient client) {
+
+ this(baseUrl, client, false);
+ // The base class uses idle timeout with StreamingResponse#awaitResponse
so it needs to be
+ // set!
+ this.idleTimeoutMillis = SolrHttpConstants.DEFAULT_SO_TIMEOUT;
+ }
+
+ /**
+ * @see
org.apache.solr.client.solrj.impl.ConcurrentUpdateBaseSolrClient.Builder#Builder(String,
+ * HttpSolrClientBase, boolean)
+ */
+ public Builder(String baseSolrUrl, HttpSolrClientBase client, boolean
closeHttpClient) {
+ super(baseSolrUrl, client, closeHttpClient);
+ }
+
+ @Override
+ public ConcurrentUpdateJdkSolrClient build() {
+ return new ConcurrentUpdateJdkSolrClient(this);
+ }
+ }
+}
diff --git
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.java
index 2fa11c81b7e..53b538c56f4 100644
---
a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.java
+++
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.java
@@ -143,11 +143,23 @@ public class HttpJdkSolrClient extends HttpSolrClientBase
{
assert ObjectReleaseTracker.track(this);
}
+ protected CompletableFuture<HttpResponse<InputStream>>
requestInputStreamAsync(
+ String baseUrl, final SolrRequest<?> solrRequest, String collection) {
+ try {
+ HttpRequest httpRequest = prepareRequest(baseUrl, solrRequest,
collection).reqb.build();
+ return httpClient.sendAsync(httpRequest,
HttpResponse.BodyHandlers.ofInputStream());
+ } catch (Exception e) {
+ CompletableFuture<HttpResponse<InputStream>> cf = new
CompletableFuture<>();
+ cf.completeExceptionally(e);
+ return cf;
+ }
+ }
+
@Override
public CompletableFuture<NamedList<Object>> requestAsync(
final SolrRequest<?> solrRequest, String collection) {
try {
- PreparedRequest pReq = prepareRequest(solrRequest, collection, null);
+ PreparedRequest pReq = prepareRequest(null, solrRequest, collection);
return httpClient
.sendAsync(pReq.reqb.build(),
HttpResponse.BodyHandlers.ofInputStream())
.thenApply(
@@ -170,7 +182,7 @@ public class HttpJdkSolrClient extends HttpSolrClientBase {
public NamedList<Object> requestWithBaseUrl(
String baseUrl, SolrRequest<?> solrRequest, String collection)
throws SolrServerException, IOException {
- PreparedRequest pReq = prepareRequest(solrRequest, collection, baseUrl);
+ PreparedRequest pReq = prepareRequest(baseUrl, solrRequest, collection);
HttpResponse<InputStream> response = null;
try {
response = httpClient.send(pReq.reqb.build(),
HttpResponse.BodyHandlers.ofInputStream());
@@ -209,7 +221,7 @@ public class HttpJdkSolrClient extends HttpSolrClientBase {
}
protected PreparedRequest prepareRequest(
- SolrRequest<?> solrRequest, String collection, String overrideBaseUrl)
+ String overrideBaseUrl, SolrRequest<?> solrRequest, String collection)
throws SolrServerException, IOException {
checkClosed();
if (ClientUtils.shouldApplyDefaultCollection(collection, solrRequest)) {
diff --git
a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ClusterStateProviderTest.java
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ClusterStateProviderTest.java
index 80f7b5e625b..663fbf9eb1a 100644
---
a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ClusterStateProviderTest.java
+++
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ClusterStateProviderTest.java
@@ -66,9 +66,9 @@ public class ClusterStateProviderTest extends
SolrCloudTestCase {
@Override
protected PreparedRequest prepareRequest(
- SolrRequest<?> solrRequest, String collection, String overrideBaseUrl)
+ String overrideBaseUrl, SolrRequest<?> solrRequest, String collection)
throws SolrServerException, IOException {
- var pr = super.prepareRequest(solrRequest, collection, overrideBaseUrl);
+ var pr = super.prepareRequest(overrideBaseUrl, solrRequest, collection);
pr.reqb.header("User-Agent", userAgent);
return pr;
}
diff --git
a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateJdkSolrClientTest.java
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateJdkSolrClientTest.java
new file mode 100644
index 00000000000..f306dc36edc
--- /dev/null
+++
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateJdkSolrClientTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ConcurrentUpdateJdkSolrClientTest extends
ConcurrentUpdateSolrClientTestBase {
+
+ @Override
+ public HttpSolrClientBase solrClient(Integer overrideIdleTimeoutMs) {
+ var builder =
+ new
HttpJdkSolrClient.Builder().withSSLContext(MockTrustManager.ALL_TRUSTING_SSL_CONTEXT);
+ if (overrideIdleTimeoutMs != null) {
+ builder.withIdleTimeout(overrideIdleTimeoutMs, TimeUnit.MILLISECONDS);
+ }
+ return builder.build();
+ }
+
+ @Override
+ public ConcurrentUpdateBaseSolrClient concurrentClient(
+ HttpSolrClientBase solrClient,
+ String baseUrl,
+ String defaultCollection,
+ int queueSize,
+ int threadCount,
+ boolean disablePollQueue) {
+ var builder =
+ new ConcurrentUpdateJdkSolrClient.Builder(baseUrl, (HttpJdkSolrClient)
solrClient)
+ .withQueueSize(queueSize)
+ .withThreadCount(threadCount);
+ if (defaultCollection != null) {
+ builder.withDefaultCollection(defaultCollection);
+ }
+ if (disablePollQueue) {
+ builder.setPollQueueTime(0, TimeUnit.MILLISECONDS);
+ }
+ return builder.build();
+ }
+
+ @Override
+ public ConcurrentUpdateBaseSolrClient outcomeCountingConcurrentClient(
+ String serverUrl,
+ int queueSize,
+ int threadCount,
+ HttpSolrClientBase solrClient,
+ AtomicInteger successCounter,
+ AtomicInteger failureCounter,
+ StringBuilder errors) {
+ return new OutcomeCountingConcurrentUpdateSolrClient.Builder(
+ serverUrl, (HttpJdkSolrClient) solrClient, successCounter,
failureCounter, errors)
+ .withQueueSize(queueSize)
+ .withThreadCount(threadCount)
+ .setPollQueueTime(0, TimeUnit.MILLISECONDS)
+ .build();
+ }
+
+ public static class OutcomeCountingConcurrentUpdateSolrClient
+ extends ConcurrentUpdateJdkSolrClient {
+ private final AtomicInteger successCounter;
+ private final AtomicInteger failureCounter;
+ private final StringBuilder errors;
+
+ public OutcomeCountingConcurrentUpdateSolrClient(
+ OutcomeCountingConcurrentUpdateSolrClient.Builder builder) {
+ super(builder);
+ this.successCounter = builder.successCounter;
+ this.failureCounter = builder.failureCounter;
+ this.errors = builder.errors;
+ }
+
+ @Override
+ public void handleError(Throwable ex) {
+ failureCounter.incrementAndGet();
+ errors.append(" " + ex);
+ }
+
+ @Override
+ public void onSuccess(Object responseMetadata, InputStream respBody) {
+ successCounter.incrementAndGet();
+ }
+
+ public static class Builder extends ConcurrentUpdateJdkSolrClient.Builder {
+ protected final AtomicInteger successCounter;
+ protected final AtomicInteger failureCounter;
+ protected final StringBuilder errors;
+
+ public Builder(
+ String baseSolrUrl,
+ HttpJdkSolrClient http2Client,
+ AtomicInteger successCounter,
+ AtomicInteger failureCounter,
+ StringBuilder errors) {
+ super(baseSolrUrl, http2Client);
+ this.successCounter = successCounter;
+ this.failureCounter = failureCounter;
+ this.errors = errors;
+ }
+
+ @Override
+ public OutcomeCountingConcurrentUpdateSolrClient build() {
+ return new OutcomeCountingConcurrentUpdateSolrClient(this);
+ }
+ }
+ }
+}
diff --git
a/solr/solrj-jetty/src/test/org/apache/solr/client/solrj/jetty/ConcurrentUpdateJettySolrClientTest.java
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTestBase.java
similarity index 72%
copy from
solr/solrj-jetty/src/test/org/apache/solr/client/solrj/jetty/ConcurrentUpdateJettySolrClientTest.java
copy to
solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTestBase.java
index 9c89cf3d27b..c52e7dd33cd 100644
---
a/solr/solrj-jetty/src/test/org/apache/solr/client/solrj/jetty/ConcurrentUpdateJettySolrClientTest.java
+++
b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTestBase.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.solr.client.solrj.jetty;
+package org.apache.solr.client.solrj.impl;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServlet;
@@ -28,6 +28,7 @@ import java.lang.invoke.MethodHandles;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
+import java.net.http.HttpConnectTimeoutException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
@@ -48,15 +49,35 @@ import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.embedded.JettyConfig;
import org.apache.solr.util.SolrJettyTestRule;
import org.eclipse.jetty.ee10.servlet.ServletHolder;
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ConcurrentUpdateJettySolrClientTest extends SolrTestCaseJ4 {
+public abstract class ConcurrentUpdateSolrClientTestBase extends
SolrTestCaseJ4 {
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public abstract HttpSolrClientBase solrClient(Integer overrideIdleTimeoutMs);
+
+ public abstract ConcurrentUpdateBaseSolrClient concurrentClient(
+ HttpSolrClientBase solrClient,
+ String baseUrl,
+ String defaultCollection,
+ int queueSize,
+ int threadCount,
+ boolean disablePollQueue);
+
+ public abstract ConcurrentUpdateBaseSolrClient
outcomeCountingConcurrentClient(
+ String serverUrl,
+ int queueSize,
+ int threadCount,
+ HttpSolrClientBase solrClient,
+ AtomicInteger successCounter,
+ AtomicInteger failureCounter,
+ StringBuilder errors);
+
/** Mock endpoint where the CUSS being tested in this class sends requests.
*/
public static class TestServlet extends HttpServlet
implements JavaBinUpdateRequestCodec.StreamingUpdateHandler {
@@ -91,10 +112,6 @@ public class ConcurrentUpdateJettySolrClientTest extends
SolrTestCaseJ4 {
}
}
- private void setParameters(HttpServletRequest req) {
- // parameters = req.getParameterMap();
- }
-
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
@@ -116,7 +133,6 @@ public class ConcurrentUpdateJettySolrClientTest extends
SolrTestCaseJ4 {
private void recordRequest(HttpServletRequest req, HttpServletResponse
resp) {
setHeaders(req);
- setParameters(req);
if (null != errorCode) {
try {
resp.sendError(errorCode);
@@ -178,6 +194,9 @@ public class ConcurrentUpdateJettySolrClientTest extends
SolrTestCaseJ4 {
solrTestRule.startSolr(legacyExampleCollection1SolrHome(), new
Properties(), jettyConfig);
}
+ @AfterClass
+ public static void afterTest() throws Exception {}
+
@Test
public void testConcurrentUpdate() throws Exception {
TestServlet.clear();
@@ -192,14 +211,16 @@ public class ConcurrentUpdateJettySolrClientTest extends
SolrTestCaseJ4 {
final AtomicInteger errorCounter = new AtomicInteger(0);
final StringBuilder errors = new StringBuilder();
- try (var http2Client = new HttpJettySolrClient.Builder().build();
+ try (var http2Client = solrClient(null);
var concurrentClient =
- new OutcomeCountingConcurrentUpdateSolrClient.Builder(
- serverUrl, http2Client, successCounter, errorCounter,
errors)
- .withQueueSize(cussQueueSize)
- .withThreadCount(cussThreadCount)
- .setPollQueueTime(0, TimeUnit.MILLISECONDS)
- .build()) {
+ outcomeCountingConcurrentClient(
+ serverUrl,
+ cussQueueSize,
+ cussThreadCount,
+ http2Client,
+ successCounter,
+ errorCounter,
+ errors)) {
// ensure it doesn't block where there's nothing to do yet
concurrentClient.blockUntilFinished();
@@ -250,12 +271,15 @@ public class ConcurrentUpdateJettySolrClientTest extends
SolrTestCaseJ4 {
int cussThreadCount = 2;
int cussQueueSize = 10;
- try (var http2Client = new HttpJettySolrClient.Builder().build();
+ try (var http2Client = solrClient(null);
var concurrentClient =
- (new
ConcurrentUpdateJettySolrClient.Builder(solrTestRule.getBaseUrl(), http2Client))
- .withQueueSize(cussQueueSize)
- .withThreadCount(cussThreadCount)
- .build()) {
+ concurrentClient(
+ http2Client,
+ solrTestRule.getBaseUrl(),
+ null,
+ cussQueueSize,
+ cussThreadCount,
+ false)) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "collection");
@@ -270,13 +294,15 @@ public class ConcurrentUpdateJettySolrClientTest extends
SolrTestCaseJ4 {
.getNumFound());
}
- try (var http2Client = new HttpJettySolrClient.Builder().build();
+ try (var http2Client = solrClient(null);
var concurrentClient =
- new
ConcurrentUpdateJettySolrClient.Builder(solrTestRule.getBaseUrl(), http2Client)
- .withDefaultCollection(DEFAULT_TEST_CORENAME)
- .withQueueSize(cussQueueSize)
- .withThreadCount(cussThreadCount)
- .build()) {
+ concurrentClient(
+ http2Client,
+ solrTestRule.getBaseUrl(),
+ DEFAULT_TEST_CORENAME,
+ cussQueueSize,
+ cussThreadCount,
+ false)) {
assertEquals(
1, concurrentClient.query(new
SolrQuery("id:collection")).getResults().getNumFound());
@@ -292,13 +318,15 @@ public class ConcurrentUpdateJettySolrClientTest extends
SolrTestCaseJ4 {
int numRunnables = 5;
int expected = numDocs * numRunnables;
- try (var http2Client = new HttpJettySolrClient.Builder().build();
+ try (var http2Client = solrClient(null);
var concurrentClient =
- new
ConcurrentUpdateJettySolrClient.Builder(solrTestRule.getBaseUrl(), http2Client)
- .withQueueSize(cussQueueSize)
- .withThreadCount(cussThreadCount)
- .setPollQueueTime(0, TimeUnit.MILLISECONDS)
- .build()) {
+ concurrentClient(
+ http2Client,
+ solrTestRule.getBaseUrl(),
+ null,
+ cussQueueSize,
+ cussThreadCount,
+ true)) {
// ensure it doesn't block where there's nothing to do yet
concurrentClient.blockUntilFinished();
@@ -329,67 +357,21 @@ public class ConcurrentUpdateJettySolrClientTest extends
SolrTestCaseJ4 {
concurrentClient.shutdownNow();
}
- try (var http2Client = new HttpJettySolrClient.Builder().build();
+ try (var http2Client = solrClient(null);
var concurrentClient =
- new
ConcurrentUpdateJettySolrClient.Builder(solrTestRule.getBaseUrl(), http2Client)
- .withDefaultCollection(DEFAULT_TEST_CORENAME)
- .withQueueSize(cussQueueSize)
- .withThreadCount(cussThreadCount)
- .build()) {
+ concurrentClient(
+ http2Client,
+ solrTestRule.getBaseUrl(),
+ DEFAULT_TEST_CORENAME,
+ cussQueueSize,
+ cussThreadCount,
+ false)) {
assertEquals(
expected, concurrentClient.query(new
SolrQuery("*:*")).getResults().getNumFound());
}
}
- static class OutcomeCountingConcurrentUpdateSolrClient extends
ConcurrentUpdateJettySolrClient {
- private final AtomicInteger successCounter;
- private final AtomicInteger failureCounter;
- private final StringBuilder errors;
-
- public OutcomeCountingConcurrentUpdateSolrClient(
- OutcomeCountingConcurrentUpdateSolrClient.Builder builder) {
- super(builder);
- this.successCounter = builder.successCounter;
- this.failureCounter = builder.failureCounter;
- this.errors = builder.errors;
- }
-
- @Override
- public void handleError(Throwable ex) {
- failureCounter.incrementAndGet();
- errors.append(" " + ex);
- }
-
- @Override
- public void onSuccess(Object responseMetadata, InputStream respBody) {
- successCounter.incrementAndGet();
- }
-
- static class Builder extends ConcurrentUpdateJettySolrClient.Builder {
- protected final AtomicInteger successCounter;
- protected final AtomicInteger failureCounter;
- protected final StringBuilder errors;
-
- public Builder(
- String baseSolrUrl,
- HttpJettySolrClient http2Client,
- AtomicInteger successCounter,
- AtomicInteger failureCounter,
- StringBuilder errors) {
- super(baseSolrUrl, http2Client);
- this.successCounter = successCounter;
- this.failureCounter = failureCounter;
- this.errors = errors;
- }
-
- @Override
- public OutcomeCountingConcurrentUpdateSolrClient build() {
- return new OutcomeCountingConcurrentUpdateSolrClient(this);
- }
- }
- }
-
/**
* Test that connection timeout information is passed to the HttpSolrClient
that handles non add
* operations.
@@ -398,23 +380,27 @@ public class ConcurrentUpdateJettySolrClientTest extends
SolrTestCaseJ4 {
public void testSocketTimeoutOnCommit() throws IOException,
SolrServerException {
InetAddress localHost = InetAddress.getLocalHost();
try (ServerSocket server = new ServerSocket(0, 1, localHost);
- var http2Client =
- new HttpJettySolrClient.Builder().withIdleTimeout(1,
TimeUnit.MILLISECONDS).build();
+ var http2Client = solrClient(1);
var client =
- new ConcurrentUpdateJettySolrClient.Builder(
- "http://"
- + localHost.getHostAddress()
- + ":"
- + server.getLocalPort()
- + "/noOneThere",
- http2Client)
- .build()) {
+ concurrentClient(
+ http2Client,
+ "http://"
+ + localHost.getHostAddress()
+ + ":"
+ + server.getLocalPort()
+ + "/noOneThere",
+ null,
+ 10,
+ 1,
+ false)) {
// Expecting an exception
client.commit();
fail();
} catch (SolrServerException e) {
if (!(e.getCause() instanceof SocketTimeoutException // not sure if
Jetty throws this
- || e.getCause() instanceof TimeoutException)) { // Jetty throws this
+ || e.getCause() instanceof TimeoutException // Jetty throws this
+ || e.getCause() instanceof HttpConnectTimeoutException // jdk client
throws this
+ )) {
throw e;
}
// else test passes