This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/main by this push:
     new b897c0e79d CXF-8951: Concurrent WebClient usage causes massive thread 
overhead (the case when single instance of WebClient is used from many threads) 
(#1850)
b897c0e79d is described below

commit b897c0e79d9f003c1a68c89ab49f652664cb7001
Author: Andriy Redko <drr...@gmail.com>
AuthorDate: Wed Jun 5 09:40:48 2024 -0400

    CXF-8951: Concurrent WebClient usage causes massive thread overhead (the 
case when single instance of WebClient is used from many threads) (#1850)
---
 .../cxf/transport/http/HttpClientHTTPConduit.java  | 55 +++++++++++++---------
 .../systest/jaxrs/JAXRS20ClientServerBookTest.java | 46 +++++++++++++++++-
 2 files changed, 79 insertions(+), 22 deletions(-)

diff --git 
a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
 
b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
index 6cc533931b..0c7be9e190 100644
--- 
a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
+++ 
b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
@@ -105,6 +105,7 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
     volatile RefCount<HttpClient> clientRef;
     volatile int lastTlsHash = -1;
     volatile URI sslURL;
+    private final ReentrantLock initializationLock = new ReentrantLock();
     
     private static final class RefCount<T extends HttpClient> {
         private final AtomicLong count = new AtomicLong();
@@ -404,31 +405,43 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
                 cb.version(Version.HTTP_1_1);  
             }
 
-            final boolean shareHttpClient = 
MessageUtils.getContextualBoolean(message, SHARE_HTTPCLIENT_CONDUIT, true);
-            cl = CLIENTS_CACHE.computeIfAbsent(shareHttpClient, csPolicy, 
clientParameters, () -> cb.build());
-            if (!"https".equals(uri.getScheme()) 
-                && 
!KNOWN_HTTP_VERBS_WITH_NO_CONTENT.contains(httpRequestMethod)
-                && cl.client().version() == Version.HTTP_2
-                && ("2".equals(verc) || ("auto".equals(verc) && 
"2".equals(HTTP_VERSION)))) {
-                try {
-                    // We specifically want HTTP2, but we're using a request
-                    // that won't trigger an upgrade to HTTP/2 so we'll
-                    // call OPTIONS on the URI which may trigger HTTP/2 
upgrade.
-                    // Not needed for methods that don't have a body 
(GET/HEAD/etc...) 
-                    // or for https (negotiated at the TLS level)
-                    HttpRequest.Builder rb = HttpRequest.newBuilder()
-                        .uri(uri)
-                        .method("OPTIONS", BodyPublishers.noBody());
-                    cl.client().send(rb.build(), BodyHandlers.ofByteArray());
-                } catch (IOException | InterruptedException e) {
-                    //
+            // make sure the conduit is not yet initialized
+            initializationLock.lock();
+            try {
+                cl = clientRef;
+                if (cl == null) {
+                    final boolean shareHttpClient = 
MessageUtils.getContextualBoolean(message,
+                        SHARE_HTTPCLIENT_CONDUIT, true);
+                    cl = CLIENTS_CACHE.computeIfAbsent(shareHttpClient, 
csPolicy, clientParameters, () -> cb.build());
+    
+                    if (!"https".equals(uri.getScheme()) 
+                        && 
!KNOWN_HTTP_VERBS_WITH_NO_CONTENT.contains(httpRequestMethod)
+                        && cl.client().version() == Version.HTTP_2
+                        && ("2".equals(verc) || ("auto".equals(verc) && 
"2".equals(HTTP_VERSION)))) {
+                        try {
+                            // We specifically want HTTP2, but we're using a 
request
+                            // that won't trigger an upgrade to HTTP/2 so we'll
+                            // call OPTIONS on the URI which may trigger 
HTTP/2 upgrade.
+                            // Not needed for methods that don't have a body 
(GET/HEAD/etc...) 
+                            // or for https (negotiated at the TLS level)
+                            HttpRequest.Builder rb = HttpRequest.newBuilder()
+                                .uri(uri)
+                                .method("OPTIONS", BodyPublishers.noBody());
+                            cl.client().send(rb.build(), 
BodyHandlers.ofByteArray());
+                        } catch (IOException | InterruptedException e) {
+                            //
+                        }
+                    } 
+
+                    clientRef = cl;
                 }
-            } 
-            clientRef = cl;
+            } finally {
+                initializationLock.unlock();
+            }
         }
         message.put(HttpClient.class, cl.client());
         
-        message.put(KEY_HTTP_CONNECTION_ADDRESS, address);        
+        message.put(KEY_HTTP_CONNECTION_ADDRESS, address);
     }
 
     @Override
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java
index d76f3ef1e8..6a5075151b 100644
--- 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java
@@ -153,7 +153,7 @@ public class JAXRS20ClientServerBookTest extends 
AbstractBusClientServerTestBase
     }
 
     @Test
-    public void testGetGenericBookParallel() throws InterruptedException {
+    public void testGetGenericBookManyClientsInParallel() throws 
InterruptedException {
         final ExecutorService pool = Executors.newFixedThreadPool(100);
         final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
         final AtomicLong httpClientThreads = new AtomicLong(); 
@@ -203,6 +203,50 @@ public class JAXRS20ClientServerBookTest extends 
AbstractBusClientServerTestBase
         }
     }
 
+    @Test
+    public void testGetGenericBookSingleClientInParallel() throws 
InterruptedException {
+        final ExecutorService pool = Executors.newFixedThreadPool(100);
+        final String target = "http://localhost:"; + PORT + 
"/bookstore/genericbooks/123";
+        final WebClient client = WebClient.create(target, true);
+        final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+
+        final Supplier<Long> captureHttpClientThreads = () ->
+            Arrays
+                .stream(threadMXBean.getAllThreadIds())
+                .mapToObj(id -> threadMXBean.getThreadInfo(id))
+                .filter(Objects::nonNull)
+                .filter(t -> t.getThreadName().startsWith("HttpClient-"))
+                .count();
+
+        try {
+            final Collection<Future<?>> futures = new ArrayList<>(); 
+            for (int i = 0; i < 100; ++i) {
+                // We are not checking the future completion, but the fact we 
were able 
+                // to execute this amount of requests without blowing live 
threads set 
+                futures.add(
+                    pool.submit(() -> {
+                        Book b1 = client.sync().get(Book.class);
+                        assertEquals(124L, b1.getId());
+                    })
+                );
+            }
+
+            // Find any completed future to make sure conduit was initialized
+            while (true) {
+                if (futures.stream().anyMatch(Future::isDone)) {
+                    break;
+                }
+            }
+        } finally {
+            client.close();
+        }
+
+        pool.shutdown();
+        assertThat(pool.awaitTermination(1, TimeUnit.MINUTES), is(true));
+
+        assertThat(captureHttpClientThreads.get(), equalTo(0L));
+    }
+
     @Test
     public void testGetGenericBook() throws Exception {
         String address = "http://localhost:"; + PORT + 
"/bookstore/genericbooks/123";

Reply via email to