dsmiley commented on code in PR #4236:
URL: https://github.com/apache/solr/pull/4236#discussion_r3079560215
##########
changelog/unreleased/SOLR-18174-prevent-double-registration.yml:
##########
@@ -0,0 +1,8 @@
+title: "Fix semaphore permit leaks in HttpJettySolrClient's AsyncTracker.
Avoid IO-thread deadlock on connection failure retries. Add a new metric gauge
solr.http.client.async_permits"
Review Comment:
The double quotes are abnormal
##########
solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java:
##########
@@ -440,5 +443,20 @@ public void initializeMetrics(SolrMetricsContext
parentContext, Attributes attri
commExecutor =
solrMetricsContext.instrumentedExecutorService(
commExecutor, "solr.core.executor", "httpShardExecutor",
SolrInfoBean.Category.QUERY);
+ if (defaultClient != null) {
+ asyncRequestsGauge =
+ solrMetricsContext.observableLongGauge(
+ "solr.http.client.async_permits",
Review Comment:
I don't see that we use this metrics namespace anywhere.
##########
solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java:
##########
@@ -110,6 +111,9 @@ public class HttpJettySolrClient extends HttpSolrClientBase
{
*/
public static final String CLIENT_CUSTOMIZER_SYSPROP =
"solr.solrj.http.jetty.customizer";
+ /** System property to cap the maximum number of outstanding async HTTP
requests. Default 1000. */
+ public static final String ASYNC_REQUESTS_MAX_SYSPROP =
"solr.http.client.async_requests.max";
Review Comment:
Why this sysprop namespace? The field above clearly shows a different
namespace.
##########
solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java:
##########
@@ -127,6 +131,16 @@ public class HttpJettySolrClient extends
HttpSolrClientBase {
private ExecutorService executor;
private boolean shutdownExecutor;
+ /** Fallback for {@code onFailure} dispatch; unbounded so it never rejects.
*/
+ private final ExecutorService failureDispatchExecutor =
+ new ExecutorUtil.MDCAwareThreadPoolExecutor(
+ 1,
Review Comment:
1? We're going to keep a thread around just in case? No; set this to 0
please. If an error happens, a thread might need to be created and that's fine.
##########
solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.lucene.util.SuppressForbidden;
+import org.apache.solr.client.solrj.impl.LBSolrClient;
+import org.apache.solr.client.solrj.jetty.HttpJettySolrClient;
+import org.apache.solr.client.solrj.jetty.LBJettySolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.eclipse.jetty.client.Request;
+import org.eclipse.jetty.client.Response;
+import org.eclipse.jetty.client.Result;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for two semaphore-permit leak bugs in {@link HttpJettySolrClient}'s
{@code AsyncTracker}
+ * that cause distributed queries to hang permanently.
+ *
+ * <h3>Pattern A – HTTP/2 GOAWAY double-queue leak</h3>
+ *
+ * <p>Jetty HTTP/2 can re-queue the same exchange after a GOAWAY/connection
race, firing {@code
+ * onRequestQueued} twice for one logical request. Because {@code onComplete}
fires only once, one
+ * permit is permanently consumed per occurrence, gradually draining the
semaphore over hours or
+ * days until Pattern B triggers.
+ *
+ * <h3>Pattern B – IO-thread deadlock on LB retry when permits depleted</h3>
+ *
+ * <p>When a connection-level failure causes {@link
+ * org.apache.solr.client.solrj.jetty.LBJettySolrClient} to retry
synchronously inside a {@code
+ * whenComplete} callback on the Jetty IO selector thread, the retry calls
{@code acquire()} on that
+ * same IO thread before the original request's {@code onComplete} can call
{@code release()}. No
+ * permits are permanently lost — the deadlock simply requires two permits to
be available
+ * simultaneously — but if the semaphore is at zero, {@code acquire()} blocks
the IO thread
+ * permanently and distributed queries hang forever.
+ */
+public class AsyncTrackerSemaphoreLeakTest extends SolrCloudTestCase {
+
+ private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final String COLLECTION = "semaphore_leak_test";
+
+ /** Reduced semaphore size so we can observe the drain without needing
thousands of requests. */
+ private static final int MAX_PERMITS = 40;
+
+ /**
+ * Number of concurrent requests. Set equal to MAX_PERMITS so that all
permits are exhausted
+ * before any retry can acquire, triggering the IO-thread deadlock.
+ */
+ private static final int NUM_RETRY_REQUESTS = MAX_PERMITS;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ // Reduce the semaphore size so we can observe drain with few requests.
+ // This property is read when HttpJettySolrClient is constructed, so it
must
+ // be set BEFORE the cluster (and its HttpShardHandlerFactory) starts up.
+ System.setProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP,
String.valueOf(MAX_PERMITS));
+
+ configureCluster(2).addConfig("conf",
configset("cloud-dynamic")).configure();
Review Comment:
Are 2 nodes necessary to show the problem?
##########
solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.lucene.util.SuppressForbidden;
+import org.apache.solr.client.solrj.impl.LBSolrClient;
+import org.apache.solr.client.solrj.jetty.HttpJettySolrClient;
+import org.apache.solr.client.solrj.jetty.LBJettySolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.eclipse.jetty.client.Request;
+import org.eclipse.jetty.client.Response;
+import org.eclipse.jetty.client.Result;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for two semaphore-permit leak bugs in {@link HttpJettySolrClient}'s
{@code AsyncTracker}
+ * that cause distributed queries to hang permanently.
+ *
+ * <h3>Pattern A – HTTP/2 GOAWAY double-queue leak</h3>
+ *
+ * <p>Jetty HTTP/2 can re-queue the same exchange after a GOAWAY/connection
race, firing {@code
+ * onRequestQueued} twice for one logical request. Because {@code onComplete}
fires only once, one
+ * permit is permanently consumed per occurrence, gradually draining the
semaphore over hours or
+ * days until Pattern B triggers.
+ *
+ * <h3>Pattern B – IO-thread deadlock on LB retry when permits depleted</h3>
+ *
+ * <p>When a connection-level failure causes {@link
+ * org.apache.solr.client.solrj.jetty.LBJettySolrClient} to retry
synchronously inside a {@code
+ * whenComplete} callback on the Jetty IO selector thread, the retry calls
{@code acquire()} on that
+ * same IO thread before the original request's {@code onComplete} can call
{@code release()}. No
+ * permits are permanently lost — the deadlock simply requires two permits to
be available
+ * simultaneously — but if the semaphore is at zero, {@code acquire()} blocks
the IO thread
+ * permanently and distributed queries hang forever.
+ */
+public class AsyncTrackerSemaphoreLeakTest extends SolrCloudTestCase {
+
+ private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final String COLLECTION = "semaphore_leak_test";
+
+ /** Reduced semaphore size so we can observe the drain without needing
thousands of requests. */
+ private static final int MAX_PERMITS = 40;
+
+ /**
+ * Number of concurrent requests. Set equal to MAX_PERMITS so that all
permits are exhausted
+ * before any retry can acquire, triggering the IO-thread deadlock.
+ */
+ private static final int NUM_RETRY_REQUESTS = MAX_PERMITS;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ // Reduce the semaphore size so we can observe drain with few requests.
+ // This property is read when HttpJettySolrClient is constructed, so it
must
+ // be set BEFORE the cluster (and its HttpShardHandlerFactory) starts up.
+ System.setProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP,
String.valueOf(MAX_PERMITS));
+
+ configureCluster(2).addConfig("conf",
configset("cloud-dynamic")).configure();
+
+ CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1)
+ .process(cluster.getSolrClient());
+
+ waitForState(
+ "Expected 1 active shard with 1 replica",
+ COLLECTION,
+ (n, c) -> SolrCloudTestCase.replicasForCollectionAreFullyActive(n, c,
2, 1));
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ System.clearProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP);
+ }
+
+ /**
+ * Demonstrates the permanent IO-thread deadlock (Pattern B) caused by {@link
+ * org.apache.solr.client.solrj.jetty.LBJettySolrClient} retrying a request
synchronously inside a
+ * {@link CompletableFuture#whenComplete} callback that runs on the Jetty IO
selector thread.
+ *
+ * <p>This assertion <b>FAILS</b> with the current code, demonstrating the
bug. The fix would
+ * dispatch retries to an executor thread so the IO thread remains free to
fire {@code onComplete
+ * → release()}.
+ */
+ @Test
+ public void testSemaphoreLeakOnLBRetry() throws Exception {
+ // Dedicated client so that permanently deadlocked IO threads don't affect
the cluster's client.
+ HttpJettySolrClient testClient =
+ new HttpJettySolrClient.Builder()
+ .withConnectionTimeout(5, TimeUnit.SECONDS)
+ .withIdleTimeout(30, TimeUnit.SECONDS)
+ .useHttp1_1(true) // HTTP/1.1: every request gets its own TCP
connection
+ .build();
+
+ // Fake TCP server: accepts exactly NUM_RETRY_REQUESTS connections and
holds them open.
+ // Once all are established (semaphore exhausted), closes all with RST
simultaneously.
+ ServerSocket fakeServer = new ServerSocket(0);
+ CountDownLatch allConnected = new CountDownLatch(NUM_RETRY_REQUESTS);
+ List<Socket> fakeConnections = Collections.synchronizedList(new
ArrayList<>());
+
+ Thread fakeServerThread =
+ new Thread(
+ () -> {
+ try {
+ while (fakeConnections.size() < NUM_RETRY_REQUESTS &&
!fakeServer.isClosed()) {
+ Socket s = fakeServer.accept();
+ fakeConnections.add(s);
+ allConnected.countDown();
+ }
+ } catch (IOException ignored) {
+ }
+ },
+ "fake-tcp-server");
+ fakeServerThread.setDaemon(true);
+ fakeServerThread.start();
+
+ String fakeBaseUrl = "http://127.0.0.1:" + fakeServer.getLocalPort() +
"/solr";
+ String realBaseUrl =
+ cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" +
COLLECTION;
+
+ List<CompletableFuture<LBSolrClient.Rsp>> futures = new ArrayList<>();
+
+ try (LBJettySolrClient lbClient = new
LBJettySolrClient.Builder(testClient).build()) {
+
+ assertEquals(
+ "All permits should be available before the test (verifies sysprop
was applied)",
+ MAX_PERMITS,
+ testClient.asyncTrackerAvailablePermits());
+
+ // Submit NUM_RETRY_REQUESTS async requests.
+ // Each request has two endpoints: fakeBaseUrl (first) and realBaseUrl
(second/retry).
+ // Each requestAsync() call acquires a semaphore permit synchronously
during send().
+ // After NUM_RETRY_REQUESTS calls, the semaphore is at 0.
+ for (int i = 0; i < NUM_RETRY_REQUESTS; i++) {
+ ModifiableSolrParams qparams = new ModifiableSolrParams();
+ qparams.set("q", "*:*");
+ QueryRequest qr = new QueryRequest(qparams);
+ LBSolrClient.Req req =
+ new LBSolrClient.Req(
+ qr,
+ List.of(
+ new LBSolrClient.Endpoint(fakeBaseUrl),
+ new LBSolrClient.Endpoint(realBaseUrl)));
+ futures.add(lbClient.requestAsync(req));
+ }
+
+ log.info(
+ "Queued {} requests (semaphore now at 0). Waiting for all TCP
connections...",
+ NUM_RETRY_REQUESTS);
+
+ // Wait until the fake server has accepted all NUM_RETRY_REQUESTS
connections.
+ // At this point all semaphore permits are consumed and no onComplete
has fired yet.
+ assertTrue(
+ "All "
+ + NUM_RETRY_REQUESTS
+ + " connections should be established within 15 s, but only "
+ + (NUM_RETRY_REQUESTS - allConnected.getCount())
+ + " were.",
+ allConnected.await(15, TimeUnit.SECONDS));
+
+ assertEquals(
+ "Semaphore should be fully consumed after queuing all requests",
+ 0,
+ testClient.asyncTrackerAvailablePermits());
+
+ // Close all fake connections simultaneously with TCP RST.
+ // onFailure fires on the IO thread → LBJettySolrClient retry →
acquire() blocks
+ // (semaphore=0).
+ int connCount = fakeConnections.size();
+ log.info("Closing {} fake connections via RST...", connCount);
+ for (Socket s : fakeConnections) {
+ try {
+ s.setSoLinger(true, 0); // send RST instead of FIN
+ s.close();
+ } catch (IOException ignored) {
+ }
+ }
+
+ try {
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
+ .get(30, TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ // Individual request failure is fine; permits are released by
onComplete regardless.
+ log.warn("Some requests failed during retry", e);
+ } catch (TimeoutException e) {
+ // Force-stop the HttpClient to unblock any threads stuck in
semaphore.acquire()
+ // before asserting failure, so the finally block can close the client
without hanging.
+ try {
+ testClient.getHttpClient().stop();
+ } catch (Exception ignored) {
+ }
+ fail(
+ "BUG (LBJettySolrClient retry deadlock): futures did not complete
within 30s."
+ + " IO threads are permanently blocked in semaphore.acquire()
because the retry"
+ + " fires synchronously on the IO thread before onComplete can
release().");
+ }
+
+ int permitsAfterFailures = testClient.asyncTrackerAvailablePermits();
+ log.info("Permits after retries: {}/{}", permitsAfterFailures,
MAX_PERMITS);
+ assertEquals(
+ "All permits should be restored after retries complete",
+ MAX_PERMITS,
+ permitsAfterFailures);
+ } finally {
+ fakeServer.close();
+ try {
+ testClient.close();
+ } catch (Exception ignored) {
+ }
+ for (CompletableFuture<LBSolrClient.Rsp> f : futures) {
+ f.cancel(true);
+ }
+ }
+ }
+
+ /**
+ * Verifies that no semaphore permits are permanently leaked when
connection-level failures
+ * trigger LB retries on the Jetty IO selector thread, provided the
semaphore is not exhausted.
+ *
+ * <p>Uses only {@code 20} requests, well below the configured permit limit.
With plenty of
+ * permits available, {@code acquire()} on the IO thread returns immediately
(does not block), so
+ * {@code onComplete} fires normally and every permit is returned.
+ *
+ * <p>This test <b>passes both with and without the Pattern B fix</b>. Run
it with the fix
+ * commented out to confirm that the deadlock only manifests when the
semaphore is fully exhausted
+ * (as demonstrated by {@link #testSemaphoreLeakOnLBRetry}).
+ */
+ @Test
+ public void testNoPermitLeakOnLBRetryWithDefaultPermits() throws Exception {
+ final int numRequests = 20;
+
+ HttpJettySolrClient testClient =
+ new HttpJettySolrClient.Builder()
+ .withConnectionTimeout(5, TimeUnit.SECONDS)
+ .withIdleTimeout(30, TimeUnit.SECONDS)
+ .useHttp1_1(true)
+ .build();
+
+ ServerSocket fakeServer = new ServerSocket(0);
+ CountDownLatch allConnected = new CountDownLatch(numRequests);
+ List<Socket> fakeConnections = Collections.synchronizedList(new
ArrayList<>());
+
+ Thread fakeServerThread =
+ new Thread(
+ () -> {
+ try {
+ while (fakeConnections.size() < numRequests &&
!fakeServer.isClosed()) {
+ Socket s = fakeServer.accept();
+ fakeConnections.add(s);
+ allConnected.countDown();
+ }
+ } catch (IOException ignored) {
+ }
+ },
+ "fake-tcp-server");
+ fakeServerThread.setDaemon(true);
+ fakeServerThread.start();
+
+ String fakeBaseUrl = "http://127.0.0.1:" + fakeServer.getLocalPort() +
"/solr";
+ String realBaseUrl =
+ cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" +
COLLECTION;
+
+ List<CompletableFuture<LBSolrClient.Rsp>> futures = new ArrayList<>();
+
+ try (LBJettySolrClient lbClient = new
LBJettySolrClient.Builder(testClient).build()) {
+
+ int initialPermits = testClient.asyncTrackerMaxPermits();
+ assertTrue("numRequests must be well below permit limit", numRequests <
initialPermits);
+ assertEquals(
+ "All permits available before test",
+ initialPermits,
+ testClient.asyncTrackerAvailablePermits());
+
+ for (int i = 0; i < numRequests; i++) {
+ ModifiableSolrParams p = new ModifiableSolrParams();
+ p.set("q", "*:*");
Review Comment:
FYI we have SolrParams.of(key, value) for trivial cases like this.
##########
solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.lucene.util.SuppressForbidden;
+import org.apache.solr.client.solrj.impl.LBSolrClient;
+import org.apache.solr.client.solrj.jetty.HttpJettySolrClient;
+import org.apache.solr.client.solrj.jetty.LBJettySolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.eclipse.jetty.client.Request;
+import org.eclipse.jetty.client.Response;
+import org.eclipse.jetty.client.Result;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for two semaphore-permit leak bugs in {@link HttpJettySolrClient}'s
{@code AsyncTracker}
+ * that cause distributed queries to hang permanently.
+ *
+ * <h3>Pattern A – HTTP/2 GOAWAY double-queue leak</h3>
+ *
+ * <p>Jetty HTTP/2 can re-queue the same exchange after a GOAWAY/connection
race, firing {@code
+ * onRequestQueued} twice for one logical request. Because {@code onComplete}
fires only once, one
+ * permit is permanently consumed per occurrence, gradually draining the
semaphore over hours or
+ * days until Pattern B triggers.
+ *
+ * <h3>Pattern B – IO-thread deadlock on LB retry when permits depleted</h3>
+ *
+ * <p>When a connection-level failure causes {@link
+ * org.apache.solr.client.solrj.jetty.LBJettySolrClient} to retry
synchronously inside a {@code
+ * whenComplete} callback on the Jetty IO selector thread, the retry calls
{@code acquire()} on that
+ * same IO thread before the original request's {@code onComplete} can call
{@code release()}. No
+ * permits are permanently lost — the deadlock simply requires two permits to
be available
+ * simultaneously — but if the semaphore is at zero, {@code acquire()} blocks
the IO thread
+ * permanently and distributed queries hang forever.
+ */
+public class AsyncTrackerSemaphoreLeakTest extends SolrCloudTestCase {
+
+ private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final String COLLECTION = "semaphore_leak_test";
+
+ /** Reduced semaphore size so we can observe the drain without needing
thousands of requests. */
+ private static final int MAX_PERMITS = 40;
+
+ /**
+ * Number of concurrent requests. Set equal to MAX_PERMITS so that all
permits are exhausted
+ * before any retry can acquire, triggering the IO-thread deadlock.
+ */
+ private static final int NUM_RETRY_REQUESTS = MAX_PERMITS;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ // Reduce the semaphore size so we can observe drain with few requests.
+ // This property is read when HttpJettySolrClient is constructed, so it
must
+ // be set BEFORE the cluster (and its HttpShardHandlerFactory) starts up.
+ System.setProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP,
String.valueOf(MAX_PERMITS));
+
+ configureCluster(2).addConfig("conf",
configset("cloud-dynamic")).configure();
+
+ CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1)
+ .process(cluster.getSolrClient());
+
+ waitForState(
+ "Expected 1 active shard with 1 replica",
+ COLLECTION,
+ (n, c) -> SolrCloudTestCase.replicasForCollectionAreFullyActive(n, c,
2, 1));
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ System.clearProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP);
+ }
+
+ /**
+ * Demonstrates the permanent IO-thread deadlock (Pattern B) caused by {@link
+ * org.apache.solr.client.solrj.jetty.LBJettySolrClient} retrying a request
synchronously inside a
+ * {@link CompletableFuture#whenComplete} callback that runs on the Jetty IO
selector thread.
+ *
+ * <p>This assertion <b>FAILS</b> with the current code, demonstrating the
bug. The fix would
+ * dispatch retries to an executor thread so the IO thread remains free to
fire {@code onComplete
+ * → release()}.
+ */
+ @Test
+ public void testSemaphoreLeakOnLBRetry() throws Exception {
+ // Dedicated client so that permanently deadlocked IO threads don't affect
the cluster's client.
+ HttpJettySolrClient testClient =
+ new HttpJettySolrClient.Builder()
+ .withConnectionTimeout(5, TimeUnit.SECONDS)
+ .withIdleTimeout(30, TimeUnit.SECONDS)
+ .useHttp1_1(true) // HTTP/1.1: every request gets its own TCP
connection
+ .build();
+
+ // Fake TCP server: accepts exactly NUM_RETRY_REQUESTS connections and
holds them open.
+ // Once all are established (semaphore exhausted), closes all with RST
simultaneously.
+ ServerSocket fakeServer = new ServerSocket(0);
+ CountDownLatch allConnected = new CountDownLatch(NUM_RETRY_REQUESTS);
+ List<Socket> fakeConnections = Collections.synchronizedList(new
ArrayList<>());
+
+ Thread fakeServerThread =
+ new Thread(
+ () -> {
+ try {
+ while (fakeConnections.size() < NUM_RETRY_REQUESTS &&
!fakeServer.isClosed()) {
+ Socket s = fakeServer.accept();
+ fakeConnections.add(s);
+ allConnected.countDown();
+ }
+ } catch (IOException ignored) {
+ }
+ },
+ "fake-tcp-server");
+ fakeServerThread.setDaemon(true);
+ fakeServerThread.start();
+
+ String fakeBaseUrl = "http://127.0.0.1:" + fakeServer.getLocalPort() +
"/solr";
+ String realBaseUrl =
+ cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" +
COLLECTION;
+
+ List<CompletableFuture<LBSolrClient.Rsp>> futures = new ArrayList<>();
+
+ try (LBJettySolrClient lbClient = new
LBJettySolrClient.Builder(testClient).build()) {
+
+ assertEquals(
+ "All permits should be available before the test (verifies sysprop
was applied)",
+ MAX_PERMITS,
+ testClient.asyncTrackerAvailablePermits());
+
+ // Submit NUM_RETRY_REQUESTS async requests.
+ // Each request has two endpoints: fakeBaseUrl (first) and realBaseUrl
(second/retry).
+ // Each requestAsync() call acquires a semaphore permit synchronously
during send().
+ // After NUM_RETRY_REQUESTS calls, the semaphore is at 0.
+ for (int i = 0; i < NUM_RETRY_REQUESTS; i++) {
+ ModifiableSolrParams qparams = new ModifiableSolrParams();
+ qparams.set("q", "*:*");
+ QueryRequest qr = new QueryRequest(qparams);
+ LBSolrClient.Req req =
+ new LBSolrClient.Req(
+ qr,
+ List.of(
+ new LBSolrClient.Endpoint(fakeBaseUrl),
+ new LBSolrClient.Endpoint(realBaseUrl)));
+ futures.add(lbClient.requestAsync(req));
+ }
+
+ log.info(
+ "Queued {} requests (semaphore now at 0). Waiting for all TCP
connections...",
+ NUM_RETRY_REQUESTS);
+
+ // Wait until the fake server has accepted all NUM_RETRY_REQUESTS
connections.
+ // At this point all semaphore permits are consumed and no onComplete
has fired yet.
+ assertTrue(
+ "All "
+ + NUM_RETRY_REQUESTS
+ + " connections should be established within 15 s, but only "
+ + (NUM_RETRY_REQUESTS - allConnected.getCount())
+ + " were.",
+ allConnected.await(15, TimeUnit.SECONDS));
+
+ assertEquals(
+ "Semaphore should be fully consumed after queuing all requests",
+ 0,
+ testClient.asyncTrackerAvailablePermits());
+
+ // Close all fake connections simultaneously with TCP RST.
+ // onFailure fires on the IO thread → LBJettySolrClient retry →
acquire() blocks
+ // (semaphore=0).
+ int connCount = fakeConnections.size();
+ log.info("Closing {} fake connections via RST...", connCount);
+ for (Socket s : fakeConnections) {
+ try {
+ s.setSoLinger(true, 0); // send RST instead of FIN
+ s.close();
+ } catch (IOException ignored) {
+ }
+ }
+
+ try {
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
+ .get(30, TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ // Individual request failure is fine; permits are released by
onComplete regardless.
+ log.warn("Some requests failed during retry", e);
+ } catch (TimeoutException e) {
+ // Force-stop the HttpClient to unblock any threads stuck in
semaphore.acquire()
+ // before asserting failure, so the finally block can close the client
without hanging.
+ try {
+ testClient.getHttpClient().stop();
+ } catch (Exception ignored) {
+ }
+ fail(
+ "BUG (LBJettySolrClient retry deadlock): futures did not complete
within 30s."
+ + " IO threads are permanently blocked in semaphore.acquire()
because the retry"
+ + " fires synchronously on the IO thread before onComplete can
release().");
+ }
+
+ int permitsAfterFailures = testClient.asyncTrackerAvailablePermits();
+ log.info("Permits after retries: {}/{}", permitsAfterFailures,
MAX_PERMITS);
+ assertEquals(
+ "All permits should be restored after retries complete",
+ MAX_PERMITS,
+ permitsAfterFailures);
+ } finally {
+ fakeServer.close();
+ try {
+ testClient.close();
+ } catch (Exception ignored) {
+ }
+ for (CompletableFuture<LBSolrClient.Rsp> f : futures) {
+ f.cancel(true);
+ }
+ }
+ }
+
+ /**
+ * Verifies that no semaphore permits are permanently leaked when
connection-level failures
+ * trigger LB retries on the Jetty IO selector thread, provided the
semaphore is not exhausted.
+ *
+ * <p>Uses only {@code 20} requests, well below the configured permit limit.
With plenty of
+ * permits available, {@code acquire()} on the IO thread returns immediately
(does not block), so
+ * {@code onComplete} fires normally and every permit is returned.
+ *
+ * <p>This test <b>passes both with and without the Pattern B fix</b>. Run
it with the fix
+ * commented out to confirm that the deadlock only manifests when the
semaphore is fully exhausted
+ * (as demonstrated by {@link #testSemaphoreLeakOnLBRetry}).
+ */
+ @Test
+ public void testNoPermitLeakOnLBRetryWithDefaultPermits() throws Exception {
+ final int numRequests = 20;
+
+ HttpJettySolrClient testClient =
+ new HttpJettySolrClient.Builder()
+ .withConnectionTimeout(5, TimeUnit.SECONDS)
+ .withIdleTimeout(30, TimeUnit.SECONDS)
+ .useHttp1_1(true)
+ .build();
+
+ ServerSocket fakeServer = new ServerSocket(0);
+ CountDownLatch allConnected = new CountDownLatch(numRequests);
+ List<Socket> fakeConnections = Collections.synchronizedList(new
ArrayList<>());
+
+ Thread fakeServerThread =
Review Comment:
we need to ensure this thread is absolutely stopped when this test ends.
Our test infra (and is a best practice) insist threads don't "leak".
##########
solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.lucene.util.SuppressForbidden;
+import org.apache.solr.client.solrj.impl.LBSolrClient;
+import org.apache.solr.client.solrj.jetty.HttpJettySolrClient;
+import org.apache.solr.client.solrj.jetty.LBJettySolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.eclipse.jetty.client.Request;
+import org.eclipse.jetty.client.Response;
+import org.eclipse.jetty.client.Result;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for two semaphore-permit leak bugs in {@link HttpJettySolrClient}'s
{@code AsyncTracker}
+ * that cause distributed queries to hang permanently.
+ *
+ * <h3>Pattern A – HTTP/2 GOAWAY double-queue leak</h3>
+ *
+ * <p>Jetty HTTP/2 can re-queue the same exchange after a GOAWAY/connection
race, firing {@code
+ * onRequestQueued} twice for one logical request. Because {@code onComplete}
fires only once, one
+ * permit is permanently consumed per occurrence, gradually draining the
semaphore over hours or
+ * days until Pattern B triggers.
+ *
+ * <h3>Pattern B – IO-thread deadlock on LB retry when permits depleted</h3>
+ *
+ * <p>When a connection-level failure causes {@link
+ * org.apache.solr.client.solrj.jetty.LBJettySolrClient} to retry
synchronously inside a {@code
+ * whenComplete} callback on the Jetty IO selector thread, the retry calls
{@code acquire()} on that
+ * same IO thread before the original request's {@code onComplete} can call
{@code release()}. No
+ * permits are permanently lost — the deadlock simply requires two permits to
be available
+ * simultaneously — but if the semaphore is at zero, {@code acquire()} blocks
the IO thread
+ * permanently and distributed queries hang forever.
+ */
+public class AsyncTrackerSemaphoreLeakTest extends SolrCloudTestCase {
+
+ private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final String COLLECTION = "semaphore_leak_test";
+
+ /** Reduced semaphore size so we can observe the drain without needing
thousands of requests. */
+ private static final int MAX_PERMITS = 40;
+
+ /**
+ * Number of concurrent requests. Set equal to MAX_PERMITS so that all
permits are exhausted
+ * before any retry can acquire, triggering the IO-thread deadlock.
+ */
+ private static final int NUM_RETRY_REQUESTS = MAX_PERMITS;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ // Reduce the semaphore size so we can observe drain with few requests.
+ // This property is read when HttpJettySolrClient is constructed, so it
must
+ // be set BEFORE the cluster (and its HttpShardHandlerFactory) starts up.
+ System.setProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP,
String.valueOf(MAX_PERMITS));
+
+ configureCluster(2).addConfig("conf",
configset("cloud-dynamic")).configure();
+
+ CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1)
+ .process(cluster.getSolrClient());
+
+ waitForState(
+ "Expected 1 active shard with 1 replica",
+ COLLECTION,
+ (n, c) -> SolrCloudTestCase.replicasForCollectionAreFullyActive(n, c,
2, 1));
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ System.clearProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP);
+ }
+
+ /**
+ * Demonstrates the permanent IO-thread deadlock (Pattern B) caused by {@link
+ * org.apache.solr.client.solrj.jetty.LBJettySolrClient} retrying a request
synchronously inside a
+ * {@link CompletableFuture#whenComplete} callback that runs on the Jetty IO
selector thread.
+ *
+ * <p>This assertion <b>FAILS</b> with the current code, demonstrating the
bug. The fix would
+ * dispatch retries to an executor thread so the IO thread remains free to
fire {@code onComplete
+ * → release()}.
+ */
+ @Test
+ public void testSemaphoreLeakOnLBRetry() throws Exception {
+ // Dedicated client so that permanently deadlocked IO threads don't affect
the cluster's client.
+ HttpJettySolrClient testClient =
+ new HttpJettySolrClient.Builder()
+ .withConnectionTimeout(5, TimeUnit.SECONDS)
+ .withIdleTimeout(30, TimeUnit.SECONDS)
+ .useHttp1_1(true) // HTTP/1.1: every request gets its own TCP
connection
+ .build();
+
+ // Fake TCP server: accepts exactly NUM_RETRY_REQUESTS connections and
holds them open.
+ // Once all are established (semaphore exhausted), closes all with RST
simultaneously.
+ ServerSocket fakeServer = new ServerSocket(0);
+ CountDownLatch allConnected = new CountDownLatch(NUM_RETRY_REQUESTS);
+ List<Socket> fakeConnections = Collections.synchronizedList(new
ArrayList<>());
+
+ Thread fakeServerThread =
+ new Thread(
+ () -> {
+ try {
+ while (fakeConnections.size() < NUM_RETRY_REQUESTS &&
!fakeServer.isClosed()) {
+ Socket s = fakeServer.accept();
+ fakeConnections.add(s);
+ allConnected.countDown();
+ }
+ } catch (IOException ignored) {
+ }
+ },
+ "fake-tcp-server");
+ fakeServerThread.setDaemon(true);
+ fakeServerThread.start();
+
+ String fakeBaseUrl = "http://127.0.0.1:" + fakeServer.getLocalPort() +
"/solr";
+ String realBaseUrl =
+ cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" +
COLLECTION;
+
+ List<CompletableFuture<LBSolrClient.Rsp>> futures = new ArrayList<>();
+
+ try (LBJettySolrClient lbClient = new
LBJettySolrClient.Builder(testClient).build()) {
+
+ assertEquals(
+ "All permits should be available before the test (verifies sysprop
was applied)",
+ MAX_PERMITS,
+ testClient.asyncTrackerAvailablePermits());
+
+ // Submit NUM_RETRY_REQUESTS async requests.
+ // Each request has two endpoints: fakeBaseUrl (first) and realBaseUrl
(second/retry).
+ // Each requestAsync() call acquires a semaphore permit synchronously
during send().
+ // After NUM_RETRY_REQUESTS calls, the semaphore is at 0.
+ for (int i = 0; i < NUM_RETRY_REQUESTS; i++) {
+ ModifiableSolrParams qparams = new ModifiableSolrParams();
+ qparams.set("q", "*:*");
+ QueryRequest qr = new QueryRequest(qparams);
+ LBSolrClient.Req req =
+ new LBSolrClient.Req(
+ qr,
+ List.of(
+ new LBSolrClient.Endpoint(fakeBaseUrl),
+ new LBSolrClient.Endpoint(realBaseUrl)));
+ futures.add(lbClient.requestAsync(req));
+ }
+
+ log.info(
+ "Queued {} requests (semaphore now at 0). Waiting for all TCP
connections...",
+ NUM_RETRY_REQUESTS);
+
+ // Wait until the fake server has accepted all NUM_RETRY_REQUESTS
connections.
+ // At this point all semaphore permits are consumed and no onComplete
has fired yet.
+ assertTrue(
+ "All "
+ + NUM_RETRY_REQUESTS
+ + " connections should be established within 15 s, but only "
+ + (NUM_RETRY_REQUESTS - allConnected.getCount())
+ + " were.",
+ allConnected.await(15, TimeUnit.SECONDS));
+
+ assertEquals(
+ "Semaphore should be fully consumed after queuing all requests",
+ 0,
+ testClient.asyncTrackerAvailablePermits());
+
+ // Close all fake connections simultaneously with TCP RST.
+ // onFailure fires on the IO thread → LBJettySolrClient retry →
acquire() blocks
+ // (semaphore=0).
+ int connCount = fakeConnections.size();
+ log.info("Closing {} fake connections via RST...", connCount);
+ for (Socket s : fakeConnections) {
+ try {
+ s.setSoLinger(true, 0); // send RST instead of FIN
+ s.close();
+ } catch (IOException ignored) {
+ }
+ }
+
+ try {
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
+ .get(30, TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ // Individual request failure is fine; permits are released by
onComplete regardless.
+ log.warn("Some requests failed during retry", e);
+ } catch (TimeoutException e) {
+ // Force-stop the HttpClient to unblock any threads stuck in
semaphore.acquire()
+ // before asserting failure, so the finally block can close the client
without hanging.
+ try {
+ testClient.getHttpClient().stop();
+ } catch (Exception ignored) {
+ }
+ fail(
+ "BUG (LBJettySolrClient retry deadlock): futures did not complete
within 30s."
+ + " IO threads are permanently blocked in semaphore.acquire()
because the retry"
+ + " fires synchronously on the IO thread before onComplete can
release().");
+ }
+
+ int permitsAfterFailures = testClient.asyncTrackerAvailablePermits();
+ log.info("Permits after retries: {}/{}", permitsAfterFailures,
MAX_PERMITS);
+ assertEquals(
+ "All permits should be restored after retries complete",
+ MAX_PERMITS,
+ permitsAfterFailures);
+ } finally {
+ fakeServer.close();
+ try {
+ testClient.close();
+ } catch (Exception ignored) {
+ }
+ for (CompletableFuture<LBSolrClient.Rsp> f : futures) {
+ f.cancel(true);
+ }
+ }
+ }
+
+ /**
+ * Verifies that no semaphore permits are permanently leaked when
connection-level failures
+ * trigger LB retries on the Jetty IO selector thread, provided the
semaphore is not exhausted.
+ *
+ * <p>Uses only {@code 20} requests, well below the configured permit limit.
With plenty of
+ * permits available, {@code acquire()} on the IO thread returns immediately
(does not block), so
+ * {@code onComplete} fires normally and every permit is returned.
+ *
+ * <p>This test <b>passes both with and without the Pattern B fix</b>. Run
it with the fix
+ * commented out to confirm that the deadlock only manifests when the
semaphore is fully exhausted
+ * (as demonstrated by {@link #testSemaphoreLeakOnLBRetry}).
+ */
+ @Test
+ public void testNoPermitLeakOnLBRetryWithDefaultPermits() throws Exception {
+ final int numRequests = 20;
+
+ HttpJettySolrClient testClient =
+ new HttpJettySolrClient.Builder()
+ .withConnectionTimeout(5, TimeUnit.SECONDS)
+ .withIdleTimeout(30, TimeUnit.SECONDS)
+ .useHttp1_1(true)
+ .build();
+
+ ServerSocket fakeServer = new ServerSocket(0);
+ CountDownLatch allConnected = new CountDownLatch(numRequests);
+ List<Socket> fakeConnections = Collections.synchronizedList(new
ArrayList<>());
+
+ Thread fakeServerThread =
+ new Thread(
+ () -> {
+ try {
+ while (fakeConnections.size() < numRequests &&
!fakeServer.isClosed()) {
+ Socket s = fakeServer.accept();
+ fakeConnections.add(s);
+ allConnected.countDown();
+ }
+ } catch (IOException ignored) {
Review Comment:
Swallowing an IOException could make debugging problems really difficult
here! I think logging an error makes sense, or if more "expected" in this test
(I don't think so?) then warning.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]