This is an automated email from the ASF dual-hosted git repository.

janhoy pushed a commit to branch branch_10x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_10x by this push:
     new b6bd7d5961f SOLR-18174 AsyncTracker Semaphore permit leak fix (#4236)
b6bd7d5961f is described below

commit b6bd7d5961fb86fe9a9d64ad759c38b4700dd899
Author: Jan Høydahl <[email protected]>
AuthorDate: Mon Apr 20 00:20:40 2026 +0200

    SOLR-18174 AsyncTracker Semaphore permit leak fix (#4236)
    
    Also add metric solr_client_request_async_permits
    Make max async requests configurable with sysprop 
solr.solrj.http.jetty.async_requests.max
    
    (cherry picked from commit 3792f2dc670142eb94b7fb9b657a6950d112cc44)
---
 .../SOLR-18174-prevent-double-registration.yml     |   8 +
 .../handler/component/HttpShardHandlerFactory.java |  18 +
 .../component/AsyncTrackerSemaphoreLeakTest.java   | 403 +++++++++++++++++++++
 .../configuration-guide/pages/solr-properties.adoc |   2 +
 .../deployment-guide/pages/metrics-reporting.adoc  |  11 +
 .../pages/major-changes-in-solr-10.adoc            |   8 +
 .../client/solrj/jetty/HttpJettySolrClient.java    |  85 ++++-
 7 files changed, 526 insertions(+), 9 deletions(-)

diff --git a/changelog/unreleased/SOLR-18174-prevent-double-registration.yml 
b/changelog/unreleased/SOLR-18174-prevent-double-registration.yml
new file mode 100644
index 00000000000..ce816f4fece
--- /dev/null
+++ b/changelog/unreleased/SOLR-18174-prevent-double-registration.yml
@@ -0,0 +1,8 @@
+title: Fix semaphore permit leaks in HttpJettySolrClient's AsyncTracker. Avoid 
IO-thread deadlock on connection failure retries. Add a new metric gauge 
solr.http.client.async_permits
+type: fixed
+authors:
+  - name: Jan Høydahl
+    url: https://home.apache.org/phonebook.html?uid=janhoy
+links:
+  - name: SOLR-18174
+    url: https://issues.apache.org/jira/browse/SOLR-18174
diff --git 
a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
 
b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
index 6dc30f47b9b..fa2392c5a1d 100644
--- 
a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
+++ 
b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
@@ -19,6 +19,7 @@ package org.apache.solr.handler.component;
 import static 
org.apache.solr.util.stats.InstrumentedHttpListenerFactory.KNOWN_METRIC_NAME_STRATEGIES;
 
 import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.ObservableLongGauge;
 import java.lang.invoke.MethodHandles;
 import java.util.Iterator;
 import java.util.List;
@@ -85,6 +86,7 @@ public class HttpShardHandlerFactory extends 
ShardHandlerFactory
   protected volatile HttpJettySolrClient defaultClient;
   protected InstrumentedHttpListenerFactory httpListenerFactory;
   protected LBAsyncSolrClient loadbalancer;
+  private ObservableLongGauge asyncRequestsGauge;
 
   int corePoolSize = 0;
   int maximumPoolSize = Integer.MAX_VALUE;
@@ -352,6 +354,7 @@ public class HttpShardHandlerFactory extends 
ShardHandlerFactory
         ExecutorUtil.shutdownAndAwaitTermination(commExecutor);
       }
     }
+    IOUtils.closeQuietly(asyncRequestsGauge);
     try {
       SolrMetricProducer.super.close();
     } catch (Exception e) {
@@ -440,5 +443,20 @@ public class HttpShardHandlerFactory extends 
ShardHandlerFactory
     commExecutor =
         solrMetricsContext.instrumentedExecutorService(
             commExecutor, "solr.core.executor", "httpShardExecutor", 
SolrInfoBean.Category.QUERY);
+    if (defaultClient != null) {
+      asyncRequestsGauge =
+          solrMetricsContext.observableLongGauge(
+              "solr.client.request.async_permits",
+              "Outstanding async HTTP request permits in the Jetty SolrJ 
client"
+                  + " (state=max: configured ceiling; state=available: 
currently unused permits).",
+              measurement -> {
+                measurement.record(
+                    defaultClient.asyncTrackerMaxPermits(), 
Attributes.of(STATE_KEY_ATTR, "max"));
+                measurement.record(
+                    defaultClient.asyncTrackerAvailablePermits(),
+                    Attributes.of(STATE_KEY_ATTR, "available"));
+              },
+              null);
+    }
   }
 }
diff --git 
a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java
 
b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java
new file mode 100644
index 00000000000..1ff3ec542ed
--- /dev/null
+++ 
b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.lucene.util.SuppressForbidden;
+import org.apache.solr.client.solrj.impl.LBSolrClient;
+import org.apache.solr.client.solrj.jetty.HttpJettySolrClient;
+import org.apache.solr.client.solrj.jetty.LBJettySolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.SolrParams;
+import org.eclipse.jetty.client.Request;
+import org.eclipse.jetty.client.Response;
+import org.eclipse.jetty.client.Result;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for two semaphore-permit leak bugs in {@link HttpJettySolrClient}'s 
{@code AsyncTracker}
+ * that cause distributed queries to hang permanently.
+ *
+ * <h3>Pattern A – HTTP/2 GOAWAY double-queue leak</h3>
+ *
+ * <p>Jetty HTTP/2 can re-queue the same exchange after a GOAWAY/connection 
race, firing {@code
+ * onRequestQueued} twice for one logical request. Because {@code onComplete} 
fires only once, one
+ * permit is permanently consumed per occurrence, gradually draining the 
semaphore over hours or
+ * days until Pattern B triggers.
+ *
+ * <h3>Pattern B – IO-thread deadlock on LB retry when permits depleted</h3>
+ *
+ * <p>When a connection-level failure causes {@link
+ * org.apache.solr.client.solrj.jetty.LBJettySolrClient} to retry 
synchronously inside a {@code
+ * whenComplete} callback on the Jetty IO selector thread, the retry calls 
{@code acquire()} on that
+ * same IO thread before the original request's {@code onComplete} can call 
{@code release()}. No
+ * permits are permanently lost — the deadlock simply requires two permits to 
be available
+ * simultaneously — but if the semaphore is at zero, {@code acquire()} blocks 
the IO thread
+ * permanently and distributed queries hang forever.
+ */
+public class AsyncTrackerSemaphoreLeakTest extends SolrCloudTestCase {
+
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final String COLLECTION = "semaphore_leak_test";
+
+  /** Reduced semaphore size so we can observe the drain without needing 
thousands of requests. */
+  private static final int MAX_PERMITS = 40;
+
+  /**
+   * Number of concurrent requests. Set equal to MAX_PERMITS so that all 
permits are exhausted
+   * before any retry can acquire, triggering the IO-thread deadlock.
+   */
+  private static final int NUM_RETRY_REQUESTS = MAX_PERMITS;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    // Reduce the semaphore size so we can observe drain with few requests.
+    // This property is read when HttpJettySolrClient is constructed, so it 
must
+    // be set BEFORE the cluster (and its HttpShardHandlerFactory) starts up.
+    System.setProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP, 
String.valueOf(MAX_PERMITS));
+
+    configureCluster(1).addConfig("conf", 
configset("cloud-dynamic")).configure();
+
+    CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1)
+        .process(cluster.getSolrClient());
+
+    waitForState(
+        "Expected 1 active shard with 1 replica",
+        COLLECTION,
+        (n, c) -> SolrCloudTestCase.replicasForCollectionAreFullyActive(n, c, 
2, 1));
+  }
+
+  @AfterClass
+  public static void cleanup() {
+    System.clearProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP);
+  }
+
+  /**
+   * Demonstrates the permanent IO-thread deadlock (Pattern B) caused by {@link
+   * org.apache.solr.client.solrj.jetty.LBJettySolrClient} retrying a request 
synchronously inside a
+   * {@link CompletableFuture#whenComplete} callback that runs on the Jetty IO 
selector thread.
+   *
+   * <p>This test <b>passes</b> with the {@code failureDispatchExecutor} fix 
in this branch. Without
+   * the fix, the IO thread would block forever in {@code semaphore.acquire()} 
and this test would
+   * time out.
+   */
+  @Test
+  public void testSemaphoreLeakOnLBRetry() throws Exception {
+    // Dedicated client so that permanently deadlocked IO threads don't affect 
the cluster's client.
+    HttpJettySolrClient testClient =
+        new HttpJettySolrClient.Builder()
+            .withConnectionTimeout(5, TimeUnit.SECONDS)
+            .withIdleTimeout(30, TimeUnit.SECONDS)
+            .useHttp1_1(true) // HTTP/1.1: every request gets its own TCP 
connection
+            .build();
+
+    String realBaseUrl =
+        cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + 
COLLECTION;
+
+    List<CompletableFuture<LBSolrClient.Rsp>> futures = new ArrayList<>();
+
+    try (FakeTcpServer fakeServer = new FakeTcpServer(NUM_RETRY_REQUESTS);
+        LBJettySolrClient lbClient = new 
LBJettySolrClient.Builder(testClient).build()) {
+
+      assertEquals(
+          "All permits should be available before the test (verifies sysprop 
was applied)",
+          MAX_PERMITS,
+          testClient.asyncTrackerAvailablePermits());
+
+      // Submit NUM_RETRY_REQUESTS async requests.
+      // Each request has two endpoints: fakeBaseUrl (first) and realBaseUrl 
(second/retry).
+      // Each requestAsync() call acquires a semaphore permit synchronously 
during send().
+      // After NUM_RETRY_REQUESTS calls, the semaphore is at 0.
+      for (int i = 0; i < NUM_RETRY_REQUESTS; i++) {
+        QueryRequest qr = new QueryRequest(SolrParams.of("q", "*:*"));
+        LBSolrClient.Req req =
+            new LBSolrClient.Req(
+                qr,
+                List.of(
+                    new LBSolrClient.Endpoint(fakeServer.baseUrl()),
+                    new LBSolrClient.Endpoint(realBaseUrl)));
+        futures.add(lbClient.requestAsync(req));
+      }
+
+      log.info(
+          "Queued {} requests (semaphore now at 0). Waiting for all TCP 
connections...",
+          NUM_RETRY_REQUESTS);
+
+      // Wait until the fake server has accepted all NUM_RETRY_REQUESTS 
connections.
+      // At this point all semaphore permits are consumed and no onComplete 
has fired yet.
+      assertTrue(
+          "All "
+              + NUM_RETRY_REQUESTS
+              + " connections should be established within 15 s, but only "
+              + fakeServer.connectionCount()
+              + " were.",
+          fakeServer.awaitAllConnected(15, TimeUnit.SECONDS));
+
+      assertEquals(
+          "Semaphore should be fully consumed after queuing all requests",
+          0,
+          testClient.asyncTrackerAvailablePermits());
+
+      // Close all fake connections simultaneously with TCP RST.
+      // onFailure fires on the IO thread → LBJettySolrClient retry → 
acquire() blocks
+      // (semaphore=0).
+      int connCount = fakeServer.connectionCount();
+      log.info("Closing {} fake connections via RST...", connCount);
+      fakeServer.rstAll();
+
+      try {
+        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
+            .get(30, TimeUnit.SECONDS);
+      } catch (ExecutionException e) {
+        // Individual request failure is fine; permits are released by 
onComplete regardless.
+        log.warn("Some requests failed during retry", e);
+      } catch (TimeoutException e) {
+        // Force-stop the HttpClient to unblock any threads stuck in 
semaphore.acquire()
+        // before asserting failure, so the finally block can close the client 
without hanging.
+        try {
+          testClient.getHttpClient().stop();
+        } catch (Exception ignored) {
+          log.debug("Failed to stop HttpClient");
+        }
+        fail(
+            "BUG (LBJettySolrClient retry deadlock): futures did not complete 
within 30s."
+                + " IO threads are permanently blocked in semaphore.acquire() 
because the retry"
+                + " fires synchronously on the IO thread before onComplete can 
release().");
+      }
+
+      int permitsAfterFailures = testClient.asyncTrackerAvailablePermits();
+      log.info("Permits after retries: {}/{}", permitsAfterFailures, 
MAX_PERMITS);
+      assertEquals(
+          "All permits should be restored after retries complete",
+          MAX_PERMITS,
+          permitsAfterFailures);
+    } finally {
+      try {
+        testClient.close();
+      } catch (Exception ignored) {
+        log.debug("Failed to close LBJettySolrClient");
+      }
+      for (CompletableFuture<LBSolrClient.Rsp> f : futures) {
+        f.cancel(true);
+      }
+    }
+  }
+
+  /**
+   * Verifies that the {@code PERMIT_ACQUIRED_ATTR} idempotency guard prevents 
the Pattern A permit
+   * leak where Jetty HTTP/2 re-queues the same exchange after a 
GOAWAY/connection race, firing
+   * {@code onRequestQueued} twice for one logical request while {@code 
onComplete} fires only once.
+   *
+   * <p>Rather than setting up a real HTTP/2 server, this test uses reflection 
to invoke {@code
+   * AsyncTracker.queuedListener} twice and {@code 
AsyncTracker.completeListener} once for the same
+   * {@code Request} object. Without the guard the semaphore count drops by 
one; with the guard the
+   * second queued call is a no-op and the count is unchanged.
+   */
+  @Test
+  @SuppressForbidden(
+      reason =
+          "Reflection needed to access AsyncTracker's private fields for 
white-box testing without exposing them in the production API")
+  public void testPermitLeakOnHttp2GoAwayDoubleQueuedListener() throws 
Exception {
+    assumeWorkingMockito();
+
+    HttpJettySolrClient testClient =
+        new HttpJettySolrClient.Builder()
+            .withConnectionTimeout(5, TimeUnit.SECONDS)
+            .withIdleTimeout(30, TimeUnit.SECONDS)
+            // HTTP/2 is the default transport where this GOAWAY race occurs.
+            .build();
+
+    // Capture asyncTracker and its class for reflection-based listener access 
and cleanup.
+    Field asyncTrackerField = 
HttpJettySolrClient.class.getDeclaredField("asyncTracker");
+    asyncTrackerField.setAccessible(true);
+    Object asyncTracker = asyncTrackerField.get(testClient);
+    Class<?> asyncTrackerClass = asyncTracker.getClass();
+
+    try {
+      int maxPermits = testClient.asyncTrackerMaxPermits();
+      assertEquals(
+          "All permits available before test",
+          maxPermits,
+          testClient.asyncTrackerAvailablePermits());
+
+      // Access the raw listeners via reflection to simulate Jetty's internal 
double-fire.
+      Field queuedListenerField = 
asyncTrackerClass.getDeclaredField("queuedListener");
+      queuedListenerField.setAccessible(true);
+      Request.QueuedListener queuedListener =
+          (Request.QueuedListener) queuedListenerField.get(asyncTracker);
+
+      Field completeListenerField = 
asyncTrackerClass.getDeclaredField("completeListener");
+      completeListenerField.setAccessible(true);
+      Response.CompleteListener completeListener =
+          (Response.CompleteListener) completeListenerField.get(asyncTracker);
+
+      // Fake Request that supports the attribute get/set used by the 
idempotency guard.
+      Map<String, Object> reqAttributes = new HashMap<>();
+      Request fakeRequest = Mockito.mock(Request.class);
+      Mockito.when(fakeRequest.getAttributes()).thenReturn(reqAttributes);
+      Mockito.when(fakeRequest.attribute(ArgumentMatchers.anyString(), 
ArgumentMatchers.any()))
+          .thenAnswer(
+              inv -> {
+                reqAttributes.put(inv.getArgument(0), inv.getArgument(1));
+                return fakeRequest;
+              });
+
+      // Simulate the GOAWAY double-fire: 1st call acquires a permit; 2nd is 
the bug trigger.
+      queuedListener.onQueued(fakeRequest);
+      queuedListener.onQueued(fakeRequest);
+
+      Result fakeResult = Mockito.mock(Result.class);
+      Mockito.when(fakeResult.getRequest()).thenReturn(fakeRequest);
+      // Only one onComplete fires for the logical request (regardless of 
internal retries).
+      completeListener.onComplete(fakeResult);
+
+      int permitsAfter = testClient.asyncTrackerAvailablePermits();
+      log.info("Permits after double-queued + single complete: {}/{}", 
permitsAfter, maxPermits);
+
+      assertEquals(
+          "BUG (Jetty HTTP/2 GOAWAY retry permit leak): onRequestQueued fired 
twice for the"
+              + " same Request object but onComplete fired only once. The 
second acquire()"
+              + " was not matched by a release(), permanently leaking one 
permit per"
+              + " occurrence. In production this causes gradual semaphore 
depletion over"
+              + " hours/days until Pattern B IO-thread deadlock triggers.",
+          maxPermits,
+          permitsAfter);
+
+    } finally {
+      // Force-terminate the Phaser as a safety net; without the fix the 
phaser would be unbalanced.
+      try {
+        Field phaserField = asyncTrackerClass.getDeclaredField("phaser");
+        phaserField.setAccessible(true);
+        Phaser phaser = (Phaser) phaserField.get(asyncTracker);
+        phaser.forceTermination();
+      } catch (Exception ignored) {
+        log.debug("Failed to force-terminate Phaser");
+      }
+
+      try {
+        testClient.close();
+      } catch (Exception ignored) {
+        log.debug("Failed to close HttpJettySolrClient");
+      }
+    }
+  }
+
+  /**
+   * A minimal fake TCP server that accepts a fixed number of connections and 
holds them open,
+   * allowing tests to simulate connection-level failures by RST-ing all 
sockets at once.
+   *
+   * <p>Implements {@link AutoCloseable} so that the server socket and any 
open connections are
+   * always cleaned up when used in a try-with-resources block, even if the 
test fails or throws.
+   */
+  private static class FakeTcpServer implements AutoCloseable {
+    private final ServerSocket serverSocket;
+    private final List<Socket> connections = Collections.synchronizedList(new 
ArrayList<>());
+    private final CountDownLatch allConnected;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    FakeTcpServer(int expectedConnections) throws IOException {
+      this.serverSocket = new ServerSocket(0);
+      this.allConnected = new CountDownLatch(expectedConnections);
+      Thread acceptThread =
+          new Thread(
+              () -> {
+                try {
+                  while (connections.size() < expectedConnections && 
!serverSocket.isClosed()) {
+                    Socket s = serverSocket.accept();
+                    connections.add(s);
+                    allConnected.countDown();
+                  }
+                } catch (IOException ioe) {
+                  log.warn("Failed to accept connection", ioe);
+                }
+              },
+              "fake-tcp-server");
+      acceptThread.setDaemon(true);
+      acceptThread.start();
+    }
+
+    /** Returns the base URL clients should connect to, e.g. {@code 
http://127.0.0.1:PORT/solr}. */
+    String baseUrl() {
+      return "http://127.0.0.1:"; + serverSocket.getLocalPort() + "/solr";
+    }
+
+    /** Waits until all expected connections have been accepted. */
+    boolean awaitAllConnected(long timeout, TimeUnit unit) throws 
InterruptedException {
+      return allConnected.await(timeout, unit);
+    }
+
+    /** Returns the number of connections accepted so far. */
+    int connectionCount() {
+      return connections.size();
+    }
+
+    /**
+     * Closes all accepted connections with TCP RST, triggering onFailure on 
the Jetty IO thread.
+     */
+    void rstAll() {
+      for (Socket s : connections) {
+        try {
+          s.setSoLinger(true, 0); // send RST instead of FIN
+          s.close();
+        } catch (IOException ignored) {
+          log.debug("Failed to close connection");
+        }
+      }
+    }
+
+    /**
+     * RSTs any remaining open connections and closes the server socket, 
stopping the accept thread.
+     * Safe to call multiple times.
+     */
+    @Override
+    public void close() {
+      if (closed.compareAndSet(false, true)) {
+        rstAll();
+        try {
+          serverSocket.close();
+        } catch (IOException ignored) {
+          log.debug("Failed to close server socket");
+        }
+      }
+    }
+  }
+}
diff --git 
a/solr/solr-ref-guide/modules/configuration-guide/pages/solr-properties.adoc 
b/solr/solr-ref-guide/modules/configuration-guide/pages/solr-properties.adoc
index cfc2149d899..dab7bdd5b69 100644
--- a/solr/solr-ref-guide/modules/configuration-guide/pages/solr-properties.adoc
+++ b/solr/solr-ref-guide/modules/configuration-guide/pages/solr-properties.adoc
@@ -104,6 +104,8 @@ NOTE: Properties marked with "!" indicate inverted meaning 
between pre Solr 10 a
 
 |solr.solrj.http.cookies.enabled|!solr.http.disableCookies| false |If 
Http2SolrClient should support cookies.
 
+|solr.solrj.http.jetty.async_requests.max||1000|Maximum number of outstanding 
async HTTP requests allowed concurrently in the Jetty-based SolrJ HTTP client. 
Increase if you observe semaphore exhaustion under heavy distributed query 
load; decrease to limit resource usage. Related metric: 
`solr.client.request.async_permits`.
+
 |solr.solrj.http.jetty.customizer|solr.httpclient.builder.factory||A class 
loaded to customize HttpJettySolrClient upon creation.
 
 
|solr.streamingexpressions.facet.tiered.enabled|solr.facet.stream.tiered|true|Controls
 whether tiered faceting is enabled for streaming expressions.
diff --git 
a/solr/solr-ref-guide/modules/deployment-guide/pages/metrics-reporting.adoc 
b/solr/solr-ref-guide/modules/deployment-guide/pages/metrics-reporting.adoc
index ea074b2ace2..15a2d592f87 100644
--- a/solr/solr-ref-guide/modules/deployment-guide/pages/metrics-reporting.adoc
+++ b/solr/solr-ref-guide/modules/deployment-guide/pages/metrics-reporting.adoc
@@ -124,6 +124,17 @@ The `Overseer Registry` is initialized when running in 
SolrCloud mode and includ
 
 * Size of the Overseer queues (collection work queue and cluster state update 
queue)
 
+=== HTTP Client Registry
+
+Solr exposes metrics for the internal Jetty-based HTTP client used for 
distributed (shard) requests:
+
+[cols="2,1,3",options="header"]
+|===
+| Prometheus Metric Name | Type | Description
+| `solr_client_request_async_permits{state="max"}` | gauge | Configured 
maximum number of outstanding concurrent async HTTP requests (controlled by 
`solr.solrj.http.jetty.async_requests.max`, default 1000).
+| `solr_client_request_async_permits{state="available"}` | gauge | Number of 
async request permits currently available (i.e., not in use). When this 
approaches zero, new distributed requests will block waiting for a permit.
+|===
+
 == Core Level Metrics
 
 === Index Merge Metrics
diff --git 
a/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc 
b/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc
index 3a250b69060..4954860be11 100644
--- 
a/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc
+++ 
b/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc
@@ -335,3 +335,11 @@ If you require the ability to roll back, back up your 
indexes before upgrading.
 === Docker
 
 The `gosu` binary is no longer installed in the Solr Docker image. See 
https://github.com/tianon/gosu[gosu github page] for alternatives, such as 
`runuser`, `setpriv` or `chroot`.
+
+=== Max distributed requests now configurable
+
+The internal HTTP client used for distributed shard sub-requests previously 
had a hard-coded limit of 1000 concurrent async requests per node.
+In large clusters, a single query can fan out to hundreds of sub-requests, 
quickly exhausting this limit and causing requests to queue, potentially 
leading to stalls or timeouts.
+This limit is now configurable via the system property 
`solr.solrj.http.jetty.async_requests.max`.
+
+Current permit utilization can be monitored via the 
`solr_client_request_async_permits` metric (see 
xref:deployment-guide:metrics-reporting.adoc#http-client-registry[HTTP Client 
Registry]).
diff --git 
a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java
 
b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java
index cac90ba4670..05491bb8e69 100644
--- 
a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java
+++ 
b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java
@@ -33,6 +33,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Phaser;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -110,6 +111,10 @@ public class HttpJettySolrClient extends 
HttpSolrClientBase {
    */
   public static final String CLIENT_CUSTOMIZER_SYSPROP = 
"solr.solrj.http.jetty.customizer";
 
+  /** System property to cap the maximum number of outstanding async HTTP 
requests. Default 1000. */
+  public static final String ASYNC_REQUESTS_MAX_SYSPROP =
+      "solr.solrj.http.jetty.async_requests.max";
+
   public static final String REQ_PRINCIPAL_KEY = "solr-req-principal";
   private static final String USER_AGENT =
       "Solr[" + MethodHandles.lookup().lookupClass().getName() + "] " + 
SolrVersion.LATEST_STRING;
@@ -439,7 +444,17 @@ public class HttpJettySolrClient extends 
HttpSolrClientBase {
           @Override
           public void onFailure(Response response, Throwable failure) {
             super.onFailure(response, failure);
-            future.completeExceptionally(new 
SolrServerException(failure.getMessage(), failure));
+            // Dispatch off the IO thread to avoid blocking 
semaphore.acquire() on retry.
+            // Fall back to IO thread if executor rejects 
(shutdown/overloaded).
+            SolrServerException ex = new 
SolrServerException(failure.getMessage(), failure);
+            try {
+              executor.execute(() -> future.completeExceptionally(ex));
+            } catch (RejectedExecutionException ree) {
+              log.warn(
+                  "Failed to complete future exceptionally due to executor 
rejection, completing on IO thread.",
+                  ree);
+              future.completeExceptionally(ex);
+            }
           }
         });
 
@@ -834,7 +849,21 @@ public class HttpJettySolrClient extends 
HttpSolrClientBase {
   }
 
   private static class AsyncTracker {
-    private static final int MAX_OUTSTANDING_REQUESTS = 1000;
+    /**
+     * Read per-instance so that tests can set the sysprop before constructing 
a client and have it
+     * take effect without relying on class-load ordering across test suites 
in the same JVM.
+     */
+    private final int maxOutstandingRequests;
+
+    /**
+     * Request attribute key used to guard idempotency across both listeners. 
Set immediately after
+     * {@code phaser.register()} — before {@code available.acquire()} — so 
that {@code onComplete}
+     * can never fire between registration and attribute-set and leave a 
phaser party stranded.
+     * Jetty can re-fire {@code onRequestQueued} for the same exchange (e.g. 
after a GOAWAY retry);
+     * the attribute makes the second call a no-op. {@code onComplete} always 
fires exactly once and
+     * uses the attribute to call {@code arriveAndDeregister()} + {@code 
release()} exactly once.
+     */
+    private static final String PERMIT_ACQUIRED_ATTR = 
"solr.async_tracker.permit_acquired";
 
     // wait for async requests
     private final Phaser phaser;
@@ -845,35 +874,73 @@ public class HttpJettySolrClient extends 
HttpSolrClientBase {
 
     AsyncTracker() {
       // TODO: what about shared instances?
+      maxOutstandingRequests = 
EnvUtils.getPropertyAsInteger(ASYNC_REQUESTS_MAX_SYSPROP, 1000);
       phaser = new Phaser(1);
-      available = new Semaphore(MAX_OUTSTANDING_REQUESTS, false);
+      available = new Semaphore(maxOutstandingRequests, false);
       queuedListener =
           request -> {
+            if (request.getAttributes().get(PERMIT_ACQUIRED_ATTR) != null) {
+              return;
+            }
             phaser.register();
+            // Set the attribute before acquire() so onComplete can never race 
between
+            // phaser.register() and attribute-set, which would strand a 
phaser party forever.
+            request.attribute(PERMIT_ACQUIRED_ATTR, Boolean.TRUE);
             try {
               available.acquire();
-            } catch (InterruptedException ignored) {
-
+            } catch (InterruptedException e) {
+              // completeListener will call arriveAndDeregister() when 
onComplete fires.
+              Thread.currentThread().interrupt();
             }
           };
       completeListener =
           result -> {
-            phaser.arriveAndDeregister();
-            available.release();
+            if (result != null
+                && 
result.getRequest().getAttributes().get(PERMIT_ACQUIRED_ATTR) != null) {
+              phaser.arriveAndDeregister();
+              available.release();
+            }
           };
     }
 
     int getMaxRequestsQueuedPerDestination() {
       // comfortably above max outstanding requests
-      return MAX_OUTSTANDING_REQUESTS * 3;
+      return maxOutstandingRequests * 3;
+    }
+
+    int maxPermits() {
+      return maxOutstandingRequests;
+    }
+
+    int availablePermits() {
+      return available.availablePermits();
     }
 
     public void waitForComplete() {
-      phaser.arriveAndAwaitAdvance();
+      // Use awaitAdvanceInterruptibly() instead of arriveAndAwaitAdvance() so 
that
+      // ExecutorUtil.shutdownNow() can unblock this during container shutdown.
+      int phase = phaser.arrive();
+      try {
+        phaser.awaitAdvanceInterruptibly(phase);
+      } catch (InterruptedException e) {
+        // Terminate phaser on interrupt so in-flight onComplete callbacks 
don't stall.
+        phaser.forceTermination();
+        Thread.currentThread().interrupt();
+      }
       phaser.arriveAndDeregister();
     }
   }
 
+  /** Returns the configured maximum number of outstanding async requests. */
+  public int asyncTrackerMaxPermits() {
+    return asyncTracker.maxPermits();
+  }
+
+  /** Returns the number of currently available async-request permits. */
+  public int asyncTrackerAvailablePermits() {
+    return asyncTracker.availablePermits();
+  }
+
   public static class Builder
       extends HttpSolrClientBuilderBase<HttpJettySolrClient.Builder, 
HttpJettySolrClient> {
 

Reply via email to