This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 4.0.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit b022662c7cece750534769abe83a6d5ce662cf17
Author: Andriy Redko <drr...@gmail.com>
AuthorDate: Fri Apr 19 10:30:00 2024 -0400

    CXF-8951: Concurrent WebClient usage causes massive thread overhead (#1777)
    
    Signed-off-by: Andriy Redko <drr...@gmail.com>
    (cherry picked from commit a70abba613c3c95cd8891384f6992fd6da2b935b)
---
 .../cxf/transport/http/HttpClientHTTPConduit.java  | 202 ++++++++++++++++-----
 .../systest/jaxrs/JAXRS20ClientServerBookTest.java |  65 +++++++
 .../systest/https/conduit/HTTPSConduitTest.java    |   9 +
 3 files changed, 232 insertions(+), 44 deletions(-)

diff --git 
a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
 
b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
index b25777eeb3..4435bf01e8 100644
--- 
a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
+++ 
b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
@@ -49,11 +49,14 @@ import java.nio.channels.UnresolvedAddressException;
 import java.security.AccessController;
 import java.security.GeneralSecurityException;
 import java.security.Principal;
+import java.security.PrivilegedAction;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 import java.security.cert.Certificate;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -66,6 +69,8 @@ import java.util.concurrent.Flow;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
 
 import javax.net.ssl.SSLContext;
@@ -82,6 +87,7 @@ import org.apache.cxf.io.CacheAndWriteOutputStream;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageUtils;
 import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.policy.impl.ClientPolicyCalculator;
 import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
 import org.apache.cxf.transport.https.SSLUtils;
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
@@ -89,10 +95,113 @@ import org.apache.cxf.ws.addressing.EndpointReferenceType;
 
 
 public class HttpClientHTTPConduit extends URLConnectionHTTPConduit {
+    private static final String FORCE_URLCONNECTION_HTTP_CONDUIT = 
"force.urlconnection.http.conduit";
+    private static final String SHARE_HTTPCLIENT_CONDUIT = 
"share.httpclient.http.conduit";
+
     private static final Set<String> RESTRICTED_HEADERS = 
getRestrictedHeaders();
-    volatile HttpClient client;
+    private static final HttpClientCache CLIENTS_CACHE = new HttpClientCache();
+    volatile RefCount<HttpClient> clientRef;
     volatile int lastTlsHash = -1;
     volatile URI sslURL;
+    
+    private static final class RefCount<T extends HttpClient> {
+        private final AtomicLong count = new AtomicLong();
+        private final TLSClientParameters clientParameters;
+        private final HTTPClientPolicy policy;
+        private final T client;
+        private final Runnable finalizer;
+
+        RefCount(T client, HTTPClientPolicy policy, TLSClientParameters 
clientParameters, Runnable finalizer) {
+            this.client = client;
+            this.policy = policy;
+            this.clientParameters = clientParameters;
+            this.finalizer = finalizer;
+        }
+        
+        RefCount<T> acquire() {
+            count.incrementAndGet();
+            return this;
+        }
+        
+        void release() {
+            if (count.decrementAndGet() == 0) {
+                finalizer.run();
+
+                if (client instanceof AutoCloseable) {
+                    try {
+                        ((AutoCloseable)client).close();
+                    } catch (Exception e) {
+                        //ignore
+                    }                
+                } else if (client != null) {
+                    tryToShutdownSelector(client);
+                }
+            }
+        }
+
+        HttpClient client() {
+            return client;
+        }
+
+        HTTPClientPolicy policy() {
+            return policy;
+        }
+
+        public TLSClientParameters clientParameters() {
+            return clientParameters;
+        }
+    }
+    
+    private static final class HttpClientCache {
+        private static final int MAX_SIZE = 100; // Keeping at most 100 clients
+
+        private final List<RefCount<HttpClient>> clients = new ArrayList<>();
+        private final ClientPolicyCalculator cpc = new 
ClientPolicyCalculator();
+        private final ReentrantLock lock = new ReentrantLock();
+
+        RefCount<HttpClient> computeIfAbsent(final boolean shareHttpClient, 
final HTTPClientPolicy policy, 
+                final TLSClientParameters clientParameters, final 
Supplier<HttpClient> supplier) {
+            
+            // Do not share if it is not allowed for the conduit or cache 
capacity is exceeded
+            if (!shareHttpClient || clients.size() >= MAX_SIZE) {
+                return new RefCount<HttpClient>(supplier.get(), policy, 
clientParameters, () -> { }).acquire();
+            }
+
+            lock.lock();
+            try {
+                for (final RefCount<HttpClient> p: clients) {
+                    if (cpc.equals(p.policy(), policy) && 
p.clientParameters().equals(clientParameters)) {
+                        return p.acquire();
+                    }
+                }
+
+                final HttpClient client = supplier.get();
+                final RefCount<HttpClient> clientRef = new 
RefCount<HttpClient>(client, policy, clientParameters,
+                        () -> this.remove(policy, clientParameters));
+                clients.add(clientRef);
+
+                return clientRef.acquire();
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        void remove(final HTTPClientPolicy policy, final TLSClientParameters 
clientParameters) {
+            lock.lock();
+            try {
+                final Iterator<RefCount<HttpClient>> iterator = 
clients.iterator();
+                while (iterator.hasNext()) {
+                    final RefCount<HttpClient> p = iterator.next();
+                    if (cpc.equals(p.policy(), policy) && 
p.clientParameters().equals(clientParameters)) {
+                        iterator.remove();
+                        break;
+                    }
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
 
     public HttpClientHTTPConduit(Bus b, EndpointInfo ei, EndpointReferenceType 
t) throws IOException {
         super(b, ei, t);
@@ -120,42 +229,43 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
      * Close the conduit
      */
     public void close() {
-        if (client instanceof AutoCloseable) {
-            try {
-                ((AutoCloseable)client).close();
-            } catch (Exception e) {
-                //ignore
-            }                
-        } else if (client != null) {
-            String name = client.toString();
-            client = null;
-            tryToShutdownSelector(name);
+        if (clientRef != null) {
+            clientRef.release();
+            clientRef = null;
         }
         defaultAddress = null;
         super.close();
     }
-    private synchronized void tryToShutdownSelector(String n) {
-        // it can take three seconds (or more) for the JVM to determine the 
client
-        // is unreferenced and then shutdown the selector thread, we'll try 
and speed that
-        // up.  This is somewhat of a complete hack.   
-        int idx = n.lastIndexOf('(');
-        if (idx > 0) {
-            n = n.substring(idx + 1);
-            n = n.substring(0, n.length() - 1);
-            n = "HttpClient-" + n + "-SelectorManager";
-        }
-        try {        
-            ThreadGroup rootGroup = Thread.currentThread().getThreadGroup();
-            Thread[] threads = new Thread[rootGroup.activeCount()];
-            int cnt = rootGroup.enumerate(threads);
-            for (int x = 0; x < cnt; x++) {
-                if (threads[x].getName().contains(n)) {
-                    threads[x].interrupt();
-                }            
-            }
-        } catch (Throwable t) {
-            //ignore, nothing we can do except wait for the garbage collection
-            //and then the three seconds for the timeout
+    private static void tryToShutdownSelector(HttpClient client) {
+        synchronized (client) {
+            String n = client.toString();
+    
+            // it can take three seconds (or more) for the JVM to determine 
the client
+            // is unreferenced and then shutdown the selector thread, we'll 
try and speed that
+            // up.  This is somewhat of a complete hack.   
+            int idx = n.lastIndexOf('(');
+            if (idx > 0) {
+                n = n.substring(idx + 1);
+                n = n.substring(0, n.length() - 1);
+                n = "HttpClient-" + n + "-SelectorManager";
+            }
+            try {        
+                ThreadGroup rootGroup = 
Thread.currentThread().getThreadGroup();
+                Thread[] threads = new Thread[rootGroup.activeCount()];
+                int cnt = rootGroup.enumerate(threads);
+                for (int x = 0; x < cnt; x++) {
+                    if (threads[x].getName().contains(n)) {
+                        final int index = x;
+                        AccessController.doPrivileged((PrivilegedAction<Void>) 
() -> {
+                            threads[index].interrupt();
+                            return null;
+                        });
+                    }
+                }
+            } catch (Throwable t) {
+                //ignore, nothing we can do except wait for the garbage 
collection
+                //and then the three seconds for the timeout
+            }
         }
     }
     
@@ -171,7 +281,7 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
         if (clientParameters == null) {
             clientParameters = new TLSClientParameters();
         }
-        Object o = 
message.getContextualProperty("force.urlconnection.http.conduit");
+        Object o = 
message.getContextualProperty(FORCE_URLCONNECTION_HTTP_CONDUIT);
         if (o == null) {
             o = message.get("USING_URLCONNECTION");
         }
@@ -204,7 +314,10 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
         
         if (sslURL != null && isSslTargetDifferent(sslURL, uri)) {
             sslURL = null;
-            client = null;
+            if (clientRef != null) {
+                clientRef.release();
+                clientRef = null;
+            }
         }
         // If the HTTP_REQUEST_METHOD is not set, the default is "POST".
         String httpRequestMethod =
@@ -214,11 +327,11 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
             message.put(Message.HTTP_REQUEST_METHOD, "POST");
         }
 
-        HttpClient cl = client;
+        RefCount<HttpClient> cl = clientRef;
         if (cl == null) {
-            int ctimeout = determineConnectionTimeout(message, csPolicy);      
  
+            int ctimeout = determineConnectionTimeout(message, csPolicy);
             ProxySelector ps = new ProxyFactoryProxySelector(proxyFactory, 
csPolicy);
-            
+
             HttpClient.Builder cb = HttpClient.newBuilder()
                 .proxy(ps)
                 .followRedirects(Redirect.NEVER);
@@ -267,10 +380,11 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
                 cb.version(Version.HTTP_1_1);  
             }
 
-            cl = cb.build();
+            final boolean shareHttpClient = 
MessageUtils.getContextualBoolean(message, SHARE_HTTPCLIENT_CONDUIT, true);
+            cl = CLIENTS_CACHE.computeIfAbsent(shareHttpClient, csPolicy, 
clientParameters, () -> cb.build());
             if (!"https".equals(uri.getScheme()) 
                 && 
!KNOWN_HTTP_VERBS_WITH_NO_CONTENT.contains(httpRequestMethod)
-                && cl.version() == Version.HTTP_2
+                && cl.client().version() == Version.HTTP_2
                 && ("2".equals(verc) || ("auto".equals(verc) && 
"2".equals(HTTP_VERSION)))) {
                 try {
                     // We specifically want HTTP2, but we're using a request
@@ -281,14 +395,14 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
                     HttpRequest.Builder rb = HttpRequest.newBuilder()
                         .uri(uri)
                         .method("OPTIONS", BodyPublishers.noBody());
-                    cl.send(rb.build(), BodyHandlers.ofByteArray());
+                    cl.client().send(rb.build(), BodyHandlers.ofByteArray());
                 } catch (IOException | InterruptedException e) {
                     //
                 }
             } 
-            client = cl;
-        }        
-        message.put(HttpClient.class, cl);
+            clientRef = cl;
+        }
+        message.put(HttpClient.class, cl.client());
         
         message.put(KEY_HTTP_CONNECTION_ADDRESS, address);        
     }
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java
index 2d01cf85da..d76f3ef1e8 100644
--- 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java
@@ -22,16 +22,27 @@ package org.apache.cxf.systest.jaxrs;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.annotation.Annotation;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
 import java.lang.reflect.Type;
 import java.net.URI;
 import java.net.URLEncoder;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
 
 import javax.xml.namespace.QName;
 
@@ -82,9 +93,12 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -138,6 +152,57 @@ public class JAXRS20ClientServerBookTest extends 
AbstractBusClientServerTestBase
         assertEquals(1023L, book.getId());
     }
 
+    @Test
+    public void testGetGenericBookParallel() throws InterruptedException {
+        final ExecutorService pool = Executors.newFixedThreadPool(100);
+        final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+        final AtomicLong httpClientThreads = new AtomicLong(); 
+
+        final Supplier<Long> captureHttpClientThreads = () ->
+            Arrays
+                .stream(threadMXBean.getAllThreadIds())
+                .mapToObj(id -> threadMXBean.getThreadInfo(id))
+                .filter(Objects::nonNull)
+                .filter(t -> t.getThreadName().startsWith("HttpClient-"))
+                .count();
+
+        final Collection<WebClient> clients = new ArrayList<>();
+        try {
+            final String target = "http://localhost:"; + PORT + 
"/bookstore/genericbooks/123";
+
+            for (int i = 0; i < 1000; ++i) {
+                final WebClient client = WebClient.create(target, true);
+                clients.add(client);
+
+                // We are not checking the future completion, but the fact we 
were able 
+                // to execute this amount of requests without blowing live 
threads set 
+                pool.submit(() -> {
+                    Book book = client.sync().get(Book.class);
+                    assertEquals(124L, book.getId());
+
+                    // Capture all "HttpClient-" selector threads
+                    
httpClientThreads.accumulateAndGet(captureHttpClientThreads.get(),
+                        (x1, x2) -> x1 > x2 ? x1 : x2);
+                });
+
+                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(5));
+            }
+        } finally {
+            pool.shutdown();
+            assertThat(pool.awaitTermination(1, TimeUnit.MINUTES), is(true));
+            assertThat(captureHttpClientThreads.get(), greaterThan(0L));
+            assertThat(httpClientThreads.get(), greaterThan(0L));
+            assertThat(httpClientThreads.get(), lessThan(150L)); /* a bit 
higher that pool size */
+        }
+
+        clients.forEach(WebClient::close);
+    
+        // Since JDK-21, HttpClient Implements AutoCloseable
+        if (Runtime.version().feature() > 21) { 
+            assertThat(httpClientThreads.get(), equalTo(0L));
+        }
+    }
+
     @Test
     public void testGetGenericBook() throws Exception {
         String address = "http://localhost:"; + PORT + 
"/bookstore/genericbooks/123";
diff --git 
a/systests/transports/src/test/java/org/apache/cxf/systest/https/conduit/HTTPSConduitTest.java
 
b/systests/transports/src/test/java/org/apache/cxf/systest/https/conduit/HTTPSConduitTest.java
index 788f88230e..bcb9616c47 100644
--- 
a/systests/transports/src/test/java/org/apache/cxf/systest/https/conduit/HTTPSConduitTest.java
+++ 
b/systests/transports/src/test/java/org/apache/cxf/systest/https/conduit/HTTPSConduitTest.java
@@ -376,6 +376,8 @@ public class HTTPSConduitTest extends 
AbstractBusClientServerTestBase {
         // Okay, I'm sick of configuration files.
         // This also tests dynamic configuration of the conduit.
         Client client = ClientProxy.getClient(bethal);
+        client.getRequestContext().put("share.httpclient.http.conduit", false);
+
         HTTPConduit http =
             (HTTPConduit) client.getConduit();
 
@@ -419,6 +421,8 @@ public class HTTPSConduitTest extends 
AbstractBusClientServerTestBase {
         // Okay, I'm sick of configuration files.
         // This also tests dynamic configuration of the conduit.
         Client client = ClientProxy.getClient(poltim);
+        client.getRequestContext().put("share.httpclient.http.conduit", false);
+
         HTTPConduit http =
             (HTTPConduit) client.getConduit();
 
@@ -507,6 +511,8 @@ public class HTTPSConduitTest extends 
AbstractBusClientServerTestBase {
         // Okay, I'm sick of configuration files.
         // This also tests dynamic configuration of the conduit.
         Client client = ClientProxy.getClient(bethal);
+        client.getRequestContext().put("share.httpclient.http.conduit", false);
+
         HTTPConduit http =
             (HTTPConduit) client.getConduit();
 
@@ -568,6 +574,8 @@ public class HTTPSConduitTest extends 
AbstractBusClientServerTestBase {
         // Okay, I'm sick of configuration files.
         // This also tests dynamic configuration of the conduit.
         Client client = ClientProxy.getClient(tarpin);
+        client.getRequestContext().put("share.httpclient.http.conduit", false);
+
         HTTPConduit http =
             (HTTPConduit) client.getConduit();
 
@@ -713,6 +721,7 @@ public class HTTPSConduitTest extends 
AbstractBusClientServerTestBase {
         // Okay, I'm sick of configuration files.
         // This also tests dynamic configuration of the conduit.
         Client client = ClientProxy.getClient(gordy);
+        client.getRequestContext().put("share.httpclient.http.conduit", false);
 
         HTTPConduit http =
             (HTTPConduit) client.getConduit();

Reply via email to