janhoy commented on code in PR #4236: URL: https://github.com/apache/solr/pull/4236#discussion_r3078932675
########## solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java: ########## @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.component; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Field; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import org.apache.lucene.util.SuppressForbidden; +import org.apache.solr.client.solrj.impl.LBSolrClient; +import org.apache.solr.client.solrj.jetty.HttpJettySolrClient; +import org.apache.solr.client.solrj.jetty.LBJettySolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.eclipse.jetty.client.Request; +import org.eclipse.jetty.client.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Reproduces the {@link org.apache.solr.client.solrj.impl.LBAsyncSolrClient} semaphore leak that + * causes distributed queries to hang permanently. + * + * <h3>Bug scenario</h3> + * + * <ol> + * <li>A shard HTTP request fails with a <em>connection-level</em> error (not an HTTP-level + * error). Jetty fires the {@code onFailure} response callback directly on the IO selector + * thread. + * <li>{@link org.apache.solr.client.solrj.jetty.HttpJettySolrClient#requestAsync} completes its + * {@code CompletableFuture} exceptionally from within that {@code onFailure} callback — still + * on the IO thread. + * <li>{@code LBAsyncSolrClient.doAsyncRequest} registered a {@code whenComplete} on that future. + * Because the future completes on the IO thread, {@code whenComplete} also fires + * <em>synchronously on the IO thread</em>. + * <li>The {@code whenComplete} action calls {@code doAsyncRequest} again (retry to the next + * endpoint), which eventually calls Jetty's {@code HttpClient.send()}. That triggers the + * {@code AsyncTracker.queuedListener} — which calls {@code semaphore.acquire()} — still on + * the IO thread, before the original request's {@code completeListener.onComplete()} has had + * a chance to call {@code semaphore.release()}. + * <li>If the semaphore is at zero, {@code acquire()} <em>blocks the IO thread</em>. The blocked + * IO thread cannot execute the {@code completeListener} that would release the original + * permit. The permit is permanently leaked, and the IO thread is permanently stuck. Repeat + * until all permits are exhausted: distributed queries hang forever. + * </ol> + * + * <h3>Test setup</h3> + * + * <p>A raw TCP server accepts {@value #NUM_RETRY_REQUESTS} connections and holds them open until + * all are established (so all semaphore permits are consumed). It then closes all connections + * simultaneously via TCP RST, causing all Jetty {@code onFailure} events to fire on the IO threads + * at the same time. Because the semaphore is already at 0, every retry's {@code acquire()} blocks + * the IO thread immediately, and no {@code onComplete} release can fire. + * + * <p>The test asserts that after a short wait the semaphore is at 0 and none of the {@code + * CompletableFuture}s returned by {@code requestAsync} have completed — proving the permanent + * permit exhaustion. + */ +public class AsyncTrackerSemaphoreLeakTest extends SolrCloudTestCase { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final String COLLECTION = "semaphore_leak_test"; + + /** Reduced semaphore size so we can observe the drain without needing thousands of requests. */ + private static final int MAX_PERMITS = 40; + + /** + * Number of concurrent requests. Set equal to MAX_PERMITS so that all permits are exhausted + * before any retry can acquire, triggering the IO-thread deadlock. + */ + private static final int NUM_RETRY_REQUESTS = MAX_PERMITS; + + @BeforeClass + public static void setupCluster() throws Exception { + // Reduce the semaphore size so we can observe drain with few requests. + // This property is read when HttpJettySolrClient is constructed, so it must + // be set BEFORE the cluster (and its HttpShardHandlerFactory) starts up. + System.setProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP, String.valueOf(MAX_PERMITS)); + + configureCluster(2).addConfig("conf", configset("cloud-dynamic")).configure(); + + CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1) + .process(cluster.getSolrClient()); + + waitForState( + "Expected 1 active shard with 1 replica", + COLLECTION, + (n, c) -> SolrCloudTestCase.replicasForCollectionAreFullyActive(n, c, 2, 1)); + } + + @AfterClass + public static void cleanup() { + System.clearProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP); + } + + /** + * Demonstrates the permanent IO-thread deadlock caused by {@link + * org.apache.solr.client.solrj.impl.LBAsyncSolrClient} retrying a request synchronously inside a + * {@link CompletableFuture#whenComplete} callback that runs on the Jetty IO selector thread. + * + * <p>This assertion <b>FAILS</b> with the current code, demonstrating the bug. After a fix (e.g. + * dispatching the retry to an executor thread instead of running it synchronously on the IO + * thread), the retries proceed on executor threads, the IO threads remain free to fire {@code + * onComplete → release()}, and all futures eventually complete via the real server. + */ + @Test + public void testSemaphoreLeakOnLBRetry() throws Exception { + // Create a dedicated HttpJettySolrClient for this test so that if the IO threads are + // permanently deadlocked they don't affect the cluster's shared client. + // The system property is still set to MAX_PERMITS from setupCluster(). + HttpJettySolrClient testClient = + new HttpJettySolrClient.Builder() + .withConnectionTimeout(5, TimeUnit.SECONDS) + .withIdleTimeout(30, TimeUnit.SECONDS) + .useHttp1_1(true) // HTTP/1.1: every request gets its own TCP connection + .build(); + + // Fake TCP server: accepts exactly NUM_RETRY_REQUESTS connections and holds them open. + // Once all are established (semaphore exhausted), closes all with RST simultaneously. + ServerSocket fakeServer = new ServerSocket(0); + CountDownLatch allConnected = new CountDownLatch(NUM_RETRY_REQUESTS); + List<Socket> fakeConnections = Collections.synchronizedList(new ArrayList<>()); + + Thread fakeServerThread = + new Thread( + () -> { + try { + while (fakeConnections.size() < NUM_RETRY_REQUESTS && !fakeServer.isClosed()) { + Socket s = fakeServer.accept(); + fakeConnections.add(s); + allConnected.countDown(); + } + } catch (IOException ignored) { + } + }, + "fake-tcp-server"); + fakeServerThread.setDaemon(true); + fakeServerThread.start(); + + String fakeBaseUrl = "http://127.0.0.1:" + fakeServer.getLocalPort() + "/solr"; + String realBaseUrl = + cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTION; + + List<CompletableFuture<LBSolrClient.Rsp>> futures = new ArrayList<>(); + + try (LBJettySolrClient lbClient = new LBJettySolrClient.Builder(testClient).build()) { + + assertEquals( + "All permits should be available before the test (verifies sysprop was applied)", + MAX_PERMITS, + testClient.asyncTrackerAvailablePermits()); + + // Submit NUM_RETRY_REQUESTS async requests. + // Each request has two endpoints: fakeBaseUrl (first) and realBaseUrl (second/retry). + // Each requestAsync() call acquires a semaphore permit synchronously during send(). + // After NUM_RETRY_REQUESTS calls, the semaphore is at 0. + for (int i = 0; i < NUM_RETRY_REQUESTS; i++) { + ModifiableSolrParams qparams = new ModifiableSolrParams(); + qparams.set("q", "*:*"); + QueryRequest qr = new QueryRequest(qparams); + LBSolrClient.Req req = + new LBSolrClient.Req( + qr, + List.of( + new LBSolrClient.Endpoint(fakeBaseUrl), + new LBSolrClient.Endpoint(realBaseUrl))); + futures.add(lbClient.requestAsync(req)); + } + + log.info( + "Queued {} requests (semaphore now at 0). Waiting for all TCP connections...", + NUM_RETRY_REQUESTS); + + // Wait until the fake server has accepted all NUM_RETRY_REQUESTS connections. + // At this point all semaphore permits are consumed and no onComplete has fired yet. + assertTrue( + "All " + + NUM_RETRY_REQUESTS + + " connections should be established within 15 s, but only " + + (NUM_RETRY_REQUESTS - allConnected.getCount()) + + " were.", + allConnected.await(15, TimeUnit.SECONDS)); + + assertEquals( + "Semaphore should be fully consumed after queuing all requests", + 0, + testClient.asyncTrackerAvailablePermits()); + + // Close all fake connections simultaneously with TCP RST. + // This fires Jetty's onFailure callback on the IO selector thread for each request. + // The onFailure path → future.completeExceptionally() → whenComplete fires synchronously + // on the IO thread → LBAsyncSolrClient.doAsyncRequest (retry) → send() → + // onRequestQueued → semaphore.acquire() → BLOCKS (semaphore = 0) → IO thread stuck. + int connCount = fakeConnections.size(); + log.info("Closing {} fake connections via RST...", connCount); + for (Socket s : fakeConnections) { + try { + s.setSoLinger(true, 0); // send RST instead of FIN + s.close(); + } catch (IOException ignored) { + } + } + + // Give IO threads time to process the failure events and attempt the retry acquires. + Thread.sleep(2000); + + int permitsAfterFailures = testClient.asyncTrackerAvailablePermits(); + long completedCount = futures.stream().filter(CompletableFuture::isDone).count(); + log.info( + "Permits after 2s: {}/{}; futures completed: {}/{}", + permitsAfterFailures, + MAX_PERMITS, + completedCount, + NUM_RETRY_REQUESTS); + + // With the bug: the IO threads are deadlocked. Permits remain at 0 and no future completes. + // This assertion FAILS with the current code, demonstrating the bug. + assertEquals( + "BUG (LBAsyncSolrClient retry leak): all " + + NUM_RETRY_REQUESTS + + " semaphore permits should be released once the retries complete on the" + + " real server. Instead the IO threads are permanently blocked in" + + " semaphore.acquire() because the retry fires synchronously on the IO thread" + + " before the original request's completeListener can call release().", + MAX_PERMITS, + permitsAfterFailures); Review Comment: Implemented -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
