janhoy commented on code in PR #4236:
URL: https://github.com/apache/solr/pull/4236#discussion_r3078932675


##########
solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import org.apache.lucene.util.SuppressForbidden;
+import org.apache.solr.client.solrj.impl.LBSolrClient;
+import org.apache.solr.client.solrj.jetty.HttpJettySolrClient;
+import org.apache.solr.client.solrj.jetty.LBJettySolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.eclipse.jetty.client.Request;
+import org.eclipse.jetty.client.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reproduces the {@link org.apache.solr.client.solrj.impl.LBAsyncSolrClient} 
semaphore leak that
+ * causes distributed queries to hang permanently.
+ *
+ * <h3>Bug scenario</h3>
+ *
+ * <ol>
+ *   <li>A shard HTTP request fails with a <em>connection-level</em> error 
(not an HTTP-level
+ *       error). Jetty fires the {@code onFailure} response callback directly 
on the IO selector
+ *       thread.
+ *   <li>{@link 
org.apache.solr.client.solrj.jetty.HttpJettySolrClient#requestAsync} completes 
its
+ *       {@code CompletableFuture} exceptionally from within that {@code 
onFailure} callback — still
+ *       on the IO thread.
+ *   <li>{@code LBAsyncSolrClient.doAsyncRequest} registered a {@code 
whenComplete} on that future.
+ *       Because the future completes on the IO thread, {@code whenComplete} 
also fires
+ *       <em>synchronously on the IO thread</em>.
+ *   <li>The {@code whenComplete} action calls {@code doAsyncRequest} again 
(retry to the next
+ *       endpoint), which eventually calls Jetty's {@code HttpClient.send()}. 
That triggers the
+ *       {@code AsyncTracker.queuedListener} — which calls {@code 
semaphore.acquire()} — still on
+ *       the IO thread, before the original request's {@code 
completeListener.onComplete()} has had
+ *       a chance to call {@code semaphore.release()}.
+ *   <li>If the semaphore is at zero, {@code acquire()} <em>blocks the IO 
thread</em>. The blocked
+ *       IO thread cannot execute the {@code completeListener} that would 
release the original
+ *       permit. The permit is permanently leaked, and the IO thread is 
permanently stuck. Repeat
+ *       until all permits are exhausted: distributed queries hang forever.
+ * </ol>
+ *
+ * <h3>Test setup</h3>
+ *
+ * <p>A raw TCP server accepts {@value #NUM_RETRY_REQUESTS} connections and 
holds them open until
+ * all are established (so all semaphore permits are consumed). It then closes 
all connections
+ * simultaneously via TCP RST, causing all Jetty {@code onFailure} events to 
fire on the IO threads
+ * at the same time. Because the semaphore is already at 0, every retry's 
{@code acquire()} blocks
+ * the IO thread immediately, and no {@code onComplete} release can fire.
+ *
+ * <p>The test asserts that after a short wait the semaphore is at 0 and none 
of the {@code
+ * CompletableFuture}s returned by {@code requestAsync} have completed — 
proving the permanent
+ * permit exhaustion.
+ */
+public class AsyncTrackerSemaphoreLeakTest extends SolrCloudTestCase {
+
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final String COLLECTION = "semaphore_leak_test";
+
+  /** Reduced semaphore size so we can observe the drain without needing 
thousands of requests. */
+  private static final int MAX_PERMITS = 40;
+
+  /**
+   * Number of concurrent requests. Set equal to MAX_PERMITS so that all 
permits are exhausted
+   * before any retry can acquire, triggering the IO-thread deadlock.
+   */
+  private static final int NUM_RETRY_REQUESTS = MAX_PERMITS;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    // Reduce the semaphore size so we can observe drain with few requests.
+    // This property is read when HttpJettySolrClient is constructed, so it 
must
+    // be set BEFORE the cluster (and its HttpShardHandlerFactory) starts up.
+    System.setProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP, 
String.valueOf(MAX_PERMITS));
+
+    configureCluster(2).addConfig("conf", 
configset("cloud-dynamic")).configure();
+
+    CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1)
+        .process(cluster.getSolrClient());
+
+    waitForState(
+        "Expected 1 active shard with 1 replica",
+        COLLECTION,
+        (n, c) -> SolrCloudTestCase.replicasForCollectionAreFullyActive(n, c, 
2, 1));
+  }
+
+  @AfterClass
+  public static void cleanup() {
+    System.clearProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP);
+  }
+
+  /**
+   * Demonstrates the permanent IO-thread deadlock caused by {@link
+   * org.apache.solr.client.solrj.impl.LBAsyncSolrClient} retrying a request 
synchronously inside a
+   * {@link CompletableFuture#whenComplete} callback that runs on the Jetty IO 
selector thread.
+   *
+   * <p>This assertion <b>FAILS</b> with the current code, demonstrating the 
bug. After a fix (e.g.
+   * dispatching the retry to an executor thread instead of running it 
synchronously on the IO
+   * thread), the retries proceed on executor threads, the IO threads remain 
free to fire {@code
+   * onComplete → release()}, and all futures eventually complete via the real 
server.
+   */
+  @Test
+  public void testSemaphoreLeakOnLBRetry() throws Exception {
+    // Create a dedicated HttpJettySolrClient for this test so that if the IO 
threads are
+    // permanently deadlocked they don't affect the cluster's shared client.
+    // The system property is still set to MAX_PERMITS from setupCluster().
+    HttpJettySolrClient testClient =
+        new HttpJettySolrClient.Builder()
+            .withConnectionTimeout(5, TimeUnit.SECONDS)
+            .withIdleTimeout(30, TimeUnit.SECONDS)
+            .useHttp1_1(true) // HTTP/1.1: every request gets its own TCP 
connection
+            .build();
+
+    // Fake TCP server: accepts exactly NUM_RETRY_REQUESTS connections and 
holds them open.
+    // Once all are established (semaphore exhausted), closes all with RST 
simultaneously.
+    ServerSocket fakeServer = new ServerSocket(0);
+    CountDownLatch allConnected = new CountDownLatch(NUM_RETRY_REQUESTS);
+    List<Socket> fakeConnections = Collections.synchronizedList(new 
ArrayList<>());
+
+    Thread fakeServerThread =
+        new Thread(
+            () -> {
+              try {
+                while (fakeConnections.size() < NUM_RETRY_REQUESTS && 
!fakeServer.isClosed()) {
+                  Socket s = fakeServer.accept();
+                  fakeConnections.add(s);
+                  allConnected.countDown();
+                }
+              } catch (IOException ignored) {
+              }
+            },
+            "fake-tcp-server");
+    fakeServerThread.setDaemon(true);
+    fakeServerThread.start();
+
+    String fakeBaseUrl = "http://127.0.0.1:"; + fakeServer.getLocalPort() + 
"/solr";
+    String realBaseUrl =
+        cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + 
COLLECTION;
+
+    List<CompletableFuture<LBSolrClient.Rsp>> futures = new ArrayList<>();
+
+    try (LBJettySolrClient lbClient = new 
LBJettySolrClient.Builder(testClient).build()) {
+
+      assertEquals(
+          "All permits should be available before the test (verifies sysprop 
was applied)",
+          MAX_PERMITS,
+          testClient.asyncTrackerAvailablePermits());
+
+      // Submit NUM_RETRY_REQUESTS async requests.
+      // Each request has two endpoints: fakeBaseUrl (first) and realBaseUrl 
(second/retry).
+      // Each requestAsync() call acquires a semaphore permit synchronously 
during send().
+      // After NUM_RETRY_REQUESTS calls, the semaphore is at 0.
+      for (int i = 0; i < NUM_RETRY_REQUESTS; i++) {
+        ModifiableSolrParams qparams = new ModifiableSolrParams();
+        qparams.set("q", "*:*");
+        QueryRequest qr = new QueryRequest(qparams);
+        LBSolrClient.Req req =
+            new LBSolrClient.Req(
+                qr,
+                List.of(
+                    new LBSolrClient.Endpoint(fakeBaseUrl),
+                    new LBSolrClient.Endpoint(realBaseUrl)));
+        futures.add(lbClient.requestAsync(req));
+      }
+
+      log.info(
+          "Queued {} requests (semaphore now at 0). Waiting for all TCP 
connections...",
+          NUM_RETRY_REQUESTS);
+
+      // Wait until the fake server has accepted all NUM_RETRY_REQUESTS 
connections.
+      // At this point all semaphore permits are consumed and no onComplete 
has fired yet.
+      assertTrue(
+          "All "
+              + NUM_RETRY_REQUESTS
+              + " connections should be established within 15 s, but only "
+              + (NUM_RETRY_REQUESTS - allConnected.getCount())
+              + " were.",
+          allConnected.await(15, TimeUnit.SECONDS));
+
+      assertEquals(
+          "Semaphore should be fully consumed after queuing all requests",
+          0,
+          testClient.asyncTrackerAvailablePermits());
+
+      // Close all fake connections simultaneously with TCP RST.
+      // This fires Jetty's onFailure callback on the IO selector thread for 
each request.
+      // The onFailure path → future.completeExceptionally() → whenComplete 
fires synchronously
+      // on the IO thread → LBAsyncSolrClient.doAsyncRequest (retry) → send() →
+      // onRequestQueued → semaphore.acquire() → BLOCKS (semaphore = 0) → IO 
thread stuck.
+      int connCount = fakeConnections.size();
+      log.info("Closing {} fake connections via RST...", connCount);
+      for (Socket s : fakeConnections) {
+        try {
+          s.setSoLinger(true, 0); // send RST instead of FIN
+          s.close();
+        } catch (IOException ignored) {
+        }
+      }
+
+      // Give IO threads time to process the failure events and attempt the 
retry acquires.
+      Thread.sleep(2000);
+
+      int permitsAfterFailures = testClient.asyncTrackerAvailablePermits();
+      long completedCount = 
futures.stream().filter(CompletableFuture::isDone).count();
+      log.info(
+          "Permits after 2s: {}/{}; futures completed: {}/{}",
+          permitsAfterFailures,
+          MAX_PERMITS,
+          completedCount,
+          NUM_RETRY_REQUESTS);
+
+      // With the bug: the IO threads are deadlocked. Permits remain at 0 and 
no future completes.
+      // This assertion FAILS with the current code, demonstrating the bug.
+      assertEquals(
+          "BUG (LBAsyncSolrClient retry leak): all "
+              + NUM_RETRY_REQUESTS
+              + " semaphore permits should be released once the retries 
complete on the"
+              + " real server. Instead the IO threads are permanently blocked 
in"
+              + " semaphore.acquire() because the retry fires synchronously on 
the IO thread"
+              + " before the original request's completeListener can call 
release().",
+          MAX_PERMITS,
+          permitsAfterFailures);

Review Comment:
   Implemented



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to