This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 4.0.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/4.0.x-fixes by this push: new a826a6e841 CXF-8951: Concurrent WebClient usage causes massive thread overhead (the case when single instance of WebClient is used from many threads) (#1850) a826a6e841 is described below commit a826a6e841d9fb0f89369e7539ef7c63a587a1cb Author: Andriy Redko <drr...@gmail.com> AuthorDate: Wed Jun 5 09:40:48 2024 -0400 CXF-8951: Concurrent WebClient usage causes massive thread overhead (the case when single instance of WebClient is used from many threads) (#1850) (cherry picked from commit b897c0e79d9f003c1a68c89ab49f652664cb7001) --- .../cxf/transport/http/HttpClientHTTPConduit.java | 55 +++++++++++++--------- .../systest/jaxrs/JAXRS20ClientServerBookTest.java | 46 +++++++++++++++++- 2 files changed, 79 insertions(+), 22 deletions(-) diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java index 6cc533931b..0c7be9e190 100644 --- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java +++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java @@ -105,6 +105,7 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { volatile RefCount<HttpClient> clientRef; volatile int lastTlsHash = -1; volatile URI sslURL; + private final ReentrantLock initializationLock = new ReentrantLock(); private static final class RefCount<T extends HttpClient> { private final AtomicLong count = new AtomicLong(); @@ -404,31 +405,43 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { cb.version(Version.HTTP_1_1); } - final boolean shareHttpClient = MessageUtils.getContextualBoolean(message, SHARE_HTTPCLIENT_CONDUIT, true); - cl = CLIENTS_CACHE.computeIfAbsent(shareHttpClient, csPolicy, clientParameters, () -> cb.build()); - if (!"https".equals(uri.getScheme()) - && !KNOWN_HTTP_VERBS_WITH_NO_CONTENT.contains(httpRequestMethod) - && cl.client().version() == Version.HTTP_2 - && ("2".equals(verc) || ("auto".equals(verc) && "2".equals(HTTP_VERSION)))) { - try { - // We specifically want HTTP2, but we're using a request - // that won't trigger an upgrade to HTTP/2 so we'll - // call OPTIONS on the URI which may trigger HTTP/2 upgrade. - // Not needed for methods that don't have a body (GET/HEAD/etc...) - // or for https (negotiated at the TLS level) - HttpRequest.Builder rb = HttpRequest.newBuilder() - .uri(uri) - .method("OPTIONS", BodyPublishers.noBody()); - cl.client().send(rb.build(), BodyHandlers.ofByteArray()); - } catch (IOException | InterruptedException e) { - // + // make sure the conduit is not yet initialized + initializationLock.lock(); + try { + cl = clientRef; + if (cl == null) { + final boolean shareHttpClient = MessageUtils.getContextualBoolean(message, + SHARE_HTTPCLIENT_CONDUIT, true); + cl = CLIENTS_CACHE.computeIfAbsent(shareHttpClient, csPolicy, clientParameters, () -> cb.build()); + + if (!"https".equals(uri.getScheme()) + && !KNOWN_HTTP_VERBS_WITH_NO_CONTENT.contains(httpRequestMethod) + && cl.client().version() == Version.HTTP_2 + && ("2".equals(verc) || ("auto".equals(verc) && "2".equals(HTTP_VERSION)))) { + try { + // We specifically want HTTP2, but we're using a request + // that won't trigger an upgrade to HTTP/2 so we'll + // call OPTIONS on the URI which may trigger HTTP/2 upgrade. + // Not needed for methods that don't have a body (GET/HEAD/etc...) + // or for https (negotiated at the TLS level) + HttpRequest.Builder rb = HttpRequest.newBuilder() + .uri(uri) + .method("OPTIONS", BodyPublishers.noBody()); + cl.client().send(rb.build(), BodyHandlers.ofByteArray()); + } catch (IOException | InterruptedException e) { + // + } + } + + clientRef = cl; } - } - clientRef = cl; + } finally { + initializationLock.unlock(); + } } message.put(HttpClient.class, cl.client()); - message.put(KEY_HTTP_CONNECTION_ADDRESS, address); + message.put(KEY_HTTP_CONNECTION_ADDRESS, address); } @Override diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java index d76f3ef1e8..6a5075151b 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java @@ -153,7 +153,7 @@ public class JAXRS20ClientServerBookTest extends AbstractBusClientServerTestBase } @Test - public void testGetGenericBookParallel() throws InterruptedException { + public void testGetGenericBookManyClientsInParallel() throws InterruptedException { final ExecutorService pool = Executors.newFixedThreadPool(100); final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); final AtomicLong httpClientThreads = new AtomicLong(); @@ -203,6 +203,50 @@ public class JAXRS20ClientServerBookTest extends AbstractBusClientServerTestBase } } + @Test + public void testGetGenericBookSingleClientInParallel() throws InterruptedException { + final ExecutorService pool = Executors.newFixedThreadPool(100); + final String target = "http://localhost:" + PORT + "/bookstore/genericbooks/123"; + final WebClient client = WebClient.create(target, true); + final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + + final Supplier<Long> captureHttpClientThreads = () -> + Arrays + .stream(threadMXBean.getAllThreadIds()) + .mapToObj(id -> threadMXBean.getThreadInfo(id)) + .filter(Objects::nonNull) + .filter(t -> t.getThreadName().startsWith("HttpClient-")) + .count(); + + try { + final Collection<Future<?>> futures = new ArrayList<>(); + for (int i = 0; i < 100; ++i) { + // We are not checking the future completion, but the fact we were able + // to execute this amount of requests without blowing live threads set + futures.add( + pool.submit(() -> { + Book b1 = client.sync().get(Book.class); + assertEquals(124L, b1.getId()); + }) + ); + } + + // Find any completed future to make sure conduit was initialized + while (true) { + if (futures.stream().anyMatch(Future::isDone)) { + break; + } + } + } finally { + client.close(); + } + + pool.shutdown(); + assertThat(pool.awaitTermination(1, TimeUnit.MINUTES), is(true)); + + assertThat(captureHttpClientThreads.get(), equalTo(0L)); + } + @Test public void testGetGenericBook() throws Exception { String address = "http://localhost:" + PORT + "/bookstore/genericbooks/123";