This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 4.0.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit b022662c7cece750534769abe83a6d5ce662cf17 Author: Andriy Redko <drr...@gmail.com> AuthorDate: Fri Apr 19 10:30:00 2024 -0400 CXF-8951: Concurrent WebClient usage causes massive thread overhead (#1777) Signed-off-by: Andriy Redko <drr...@gmail.com> (cherry picked from commit a70abba613c3c95cd8891384f6992fd6da2b935b) --- .../cxf/transport/http/HttpClientHTTPConduit.java | 202 ++++++++++++++++----- .../systest/jaxrs/JAXRS20ClientServerBookTest.java | 65 +++++++ .../systest/https/conduit/HTTPSConduitTest.java | 9 + 3 files changed, 232 insertions(+), 44 deletions(-) diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java index b25777eeb3..4435bf01e8 100644 --- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java +++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java @@ -49,11 +49,14 @@ import java.nio.channels.UnresolvedAddressException; import java.security.AccessController; import java.security.GeneralSecurityException; import java.security.Principal; +import java.security.PrivilegedAction; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.security.cert.Certificate; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -66,6 +69,8 @@ import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import javax.net.ssl.SSLContext; @@ -82,6 +87,7 @@ import org.apache.cxf.io.CacheAndWriteOutputStream; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageUtils; import org.apache.cxf.service.model.EndpointInfo; +import org.apache.cxf.transport.http.policy.impl.ClientPolicyCalculator; import org.apache.cxf.transport.https.HttpsURLConnectionInfo; import org.apache.cxf.transport.https.SSLUtils; import org.apache.cxf.transports.http.configuration.HTTPClientPolicy; @@ -89,10 +95,113 @@ import org.apache.cxf.ws.addressing.EndpointReferenceType; public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { + private static final String FORCE_URLCONNECTION_HTTP_CONDUIT = "force.urlconnection.http.conduit"; + private static final String SHARE_HTTPCLIENT_CONDUIT = "share.httpclient.http.conduit"; + private static final Set<String> RESTRICTED_HEADERS = getRestrictedHeaders(); - volatile HttpClient client; + private static final HttpClientCache CLIENTS_CACHE = new HttpClientCache(); + volatile RefCount<HttpClient> clientRef; volatile int lastTlsHash = -1; volatile URI sslURL; + + private static final class RefCount<T extends HttpClient> { + private final AtomicLong count = new AtomicLong(); + private final TLSClientParameters clientParameters; + private final HTTPClientPolicy policy; + private final T client; + private final Runnable finalizer; + + RefCount(T client, HTTPClientPolicy policy, TLSClientParameters clientParameters, Runnable finalizer) { + this.client = client; + this.policy = policy; + this.clientParameters = clientParameters; + this.finalizer = finalizer; + } + + RefCount<T> acquire() { + count.incrementAndGet(); + return this; + } + + void release() { + if (count.decrementAndGet() == 0) { + finalizer.run(); + + if (client instanceof AutoCloseable) { + try { + ((AutoCloseable)client).close(); + } catch (Exception e) { + //ignore + } + } else if (client != null) { + tryToShutdownSelector(client); + } + } + } + + HttpClient client() { + return client; + } + + HTTPClientPolicy policy() { + return policy; + } + + public TLSClientParameters clientParameters() { + return clientParameters; + } + } + + private static final class HttpClientCache { + private static final int MAX_SIZE = 100; // Keeping at most 100 clients + + private final List<RefCount<HttpClient>> clients = new ArrayList<>(); + private final ClientPolicyCalculator cpc = new ClientPolicyCalculator(); + private final ReentrantLock lock = new ReentrantLock(); + + RefCount<HttpClient> computeIfAbsent(final boolean shareHttpClient, final HTTPClientPolicy policy, + final TLSClientParameters clientParameters, final Supplier<HttpClient> supplier) { + + // Do not share if it is not allowed for the conduit or cache capacity is exceeded + if (!shareHttpClient || clients.size() >= MAX_SIZE) { + return new RefCount<HttpClient>(supplier.get(), policy, clientParameters, () -> { }).acquire(); + } + + lock.lock(); + try { + for (final RefCount<HttpClient> p: clients) { + if (cpc.equals(p.policy(), policy) && p.clientParameters().equals(clientParameters)) { + return p.acquire(); + } + } + + final HttpClient client = supplier.get(); + final RefCount<HttpClient> clientRef = new RefCount<HttpClient>(client, policy, clientParameters, + () -> this.remove(policy, clientParameters)); + clients.add(clientRef); + + return clientRef.acquire(); + } finally { + lock.unlock(); + } + } + + void remove(final HTTPClientPolicy policy, final TLSClientParameters clientParameters) { + lock.lock(); + try { + final Iterator<RefCount<HttpClient>> iterator = clients.iterator(); + while (iterator.hasNext()) { + final RefCount<HttpClient> p = iterator.next(); + if (cpc.equals(p.policy(), policy) && p.clientParameters().equals(clientParameters)) { + iterator.remove(); + break; + } + } + } finally { + lock.unlock(); + } + } + } public HttpClientHTTPConduit(Bus b, EndpointInfo ei, EndpointReferenceType t) throws IOException { super(b, ei, t); @@ -120,42 +229,43 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { * Close the conduit */ public void close() { - if (client instanceof AutoCloseable) { - try { - ((AutoCloseable)client).close(); - } catch (Exception e) { - //ignore - } - } else if (client != null) { - String name = client.toString(); - client = null; - tryToShutdownSelector(name); + if (clientRef != null) { + clientRef.release(); + clientRef = null; } defaultAddress = null; super.close(); } - private synchronized void tryToShutdownSelector(String n) { - // it can take three seconds (or more) for the JVM to determine the client - // is unreferenced and then shutdown the selector thread, we'll try and speed that - // up. This is somewhat of a complete hack. - int idx = n.lastIndexOf('('); - if (idx > 0) { - n = n.substring(idx + 1); - n = n.substring(0, n.length() - 1); - n = "HttpClient-" + n + "-SelectorManager"; - } - try { - ThreadGroup rootGroup = Thread.currentThread().getThreadGroup(); - Thread[] threads = new Thread[rootGroup.activeCount()]; - int cnt = rootGroup.enumerate(threads); - for (int x = 0; x < cnt; x++) { - if (threads[x].getName().contains(n)) { - threads[x].interrupt(); - } - } - } catch (Throwable t) { - //ignore, nothing we can do except wait for the garbage collection - //and then the three seconds for the timeout + private static void tryToShutdownSelector(HttpClient client) { + synchronized (client) { + String n = client.toString(); + + // it can take three seconds (or more) for the JVM to determine the client + // is unreferenced and then shutdown the selector thread, we'll try and speed that + // up. This is somewhat of a complete hack. + int idx = n.lastIndexOf('('); + if (idx > 0) { + n = n.substring(idx + 1); + n = n.substring(0, n.length() - 1); + n = "HttpClient-" + n + "-SelectorManager"; + } + try { + ThreadGroup rootGroup = Thread.currentThread().getThreadGroup(); + Thread[] threads = new Thread[rootGroup.activeCount()]; + int cnt = rootGroup.enumerate(threads); + for (int x = 0; x < cnt; x++) { + if (threads[x].getName().contains(n)) { + final int index = x; + AccessController.doPrivileged((PrivilegedAction<Void>) () -> { + threads[index].interrupt(); + return null; + }); + } + } + } catch (Throwable t) { + //ignore, nothing we can do except wait for the garbage collection + //and then the three seconds for the timeout + } } } @@ -171,7 +281,7 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { if (clientParameters == null) { clientParameters = new TLSClientParameters(); } - Object o = message.getContextualProperty("force.urlconnection.http.conduit"); + Object o = message.getContextualProperty(FORCE_URLCONNECTION_HTTP_CONDUIT); if (o == null) { o = message.get("USING_URLCONNECTION"); } @@ -204,7 +314,10 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { if (sslURL != null && isSslTargetDifferent(sslURL, uri)) { sslURL = null; - client = null; + if (clientRef != null) { + clientRef.release(); + clientRef = null; + } } // If the HTTP_REQUEST_METHOD is not set, the default is "POST". String httpRequestMethod = @@ -214,11 +327,11 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { message.put(Message.HTTP_REQUEST_METHOD, "POST"); } - HttpClient cl = client; + RefCount<HttpClient> cl = clientRef; if (cl == null) { - int ctimeout = determineConnectionTimeout(message, csPolicy); + int ctimeout = determineConnectionTimeout(message, csPolicy); ProxySelector ps = new ProxyFactoryProxySelector(proxyFactory, csPolicy); - + HttpClient.Builder cb = HttpClient.newBuilder() .proxy(ps) .followRedirects(Redirect.NEVER); @@ -267,10 +380,11 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { cb.version(Version.HTTP_1_1); } - cl = cb.build(); + final boolean shareHttpClient = MessageUtils.getContextualBoolean(message, SHARE_HTTPCLIENT_CONDUIT, true); + cl = CLIENTS_CACHE.computeIfAbsent(shareHttpClient, csPolicy, clientParameters, () -> cb.build()); if (!"https".equals(uri.getScheme()) && !KNOWN_HTTP_VERBS_WITH_NO_CONTENT.contains(httpRequestMethod) - && cl.version() == Version.HTTP_2 + && cl.client().version() == Version.HTTP_2 && ("2".equals(verc) || ("auto".equals(verc) && "2".equals(HTTP_VERSION)))) { try { // We specifically want HTTP2, but we're using a request @@ -281,14 +395,14 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { HttpRequest.Builder rb = HttpRequest.newBuilder() .uri(uri) .method("OPTIONS", BodyPublishers.noBody()); - cl.send(rb.build(), BodyHandlers.ofByteArray()); + cl.client().send(rb.build(), BodyHandlers.ofByteArray()); } catch (IOException | InterruptedException e) { // } } - client = cl; - } - message.put(HttpClient.class, cl); + clientRef = cl; + } + message.put(HttpClient.class, cl.client()); message.put(KEY_HTTP_CONNECTION_ADDRESS, address); } diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java index 2d01cf85da..d76f3ef1e8 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java @@ -22,16 +22,27 @@ package org.apache.cxf.systest.jaxrs; import java.io.IOException; import java.io.InputStream; import java.lang.annotation.Annotation; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; import java.lang.reflect.Type; import java.net.URI; import java.net.URLEncoder; import java.net.UnknownHostException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; +import java.util.function.Supplier; import javax.xml.namespace.QName; @@ -82,9 +93,12 @@ import org.junit.BeforeClass; import org.junit.Test; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.lessThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -138,6 +152,57 @@ public class JAXRS20ClientServerBookTest extends AbstractBusClientServerTestBase assertEquals(1023L, book.getId()); } + @Test + public void testGetGenericBookParallel() throws InterruptedException { + final ExecutorService pool = Executors.newFixedThreadPool(100); + final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + final AtomicLong httpClientThreads = new AtomicLong(); + + final Supplier<Long> captureHttpClientThreads = () -> + Arrays + .stream(threadMXBean.getAllThreadIds()) + .mapToObj(id -> threadMXBean.getThreadInfo(id)) + .filter(Objects::nonNull) + .filter(t -> t.getThreadName().startsWith("HttpClient-")) + .count(); + + final Collection<WebClient> clients = new ArrayList<>(); + try { + final String target = "http://localhost:" + PORT + "/bookstore/genericbooks/123"; + + for (int i = 0; i < 1000; ++i) { + final WebClient client = WebClient.create(target, true); + clients.add(client); + + // We are not checking the future completion, but the fact we were able + // to execute this amount of requests without blowing live threads set + pool.submit(() -> { + Book book = client.sync().get(Book.class); + assertEquals(124L, book.getId()); + + // Capture all "HttpClient-" selector threads + httpClientThreads.accumulateAndGet(captureHttpClientThreads.get(), + (x1, x2) -> x1 > x2 ? x1 : x2); + }); + + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(5)); + } + } finally { + pool.shutdown(); + assertThat(pool.awaitTermination(1, TimeUnit.MINUTES), is(true)); + assertThat(captureHttpClientThreads.get(), greaterThan(0L)); + assertThat(httpClientThreads.get(), greaterThan(0L)); + assertThat(httpClientThreads.get(), lessThan(150L)); /* a bit higher that pool size */ + } + + clients.forEach(WebClient::close); + + // Since JDK-21, HttpClient Implements AutoCloseable + if (Runtime.version().feature() > 21) { + assertThat(httpClientThreads.get(), equalTo(0L)); + } + } + @Test public void testGetGenericBook() throws Exception { String address = "http://localhost:" + PORT + "/bookstore/genericbooks/123"; diff --git a/systests/transports/src/test/java/org/apache/cxf/systest/https/conduit/HTTPSConduitTest.java b/systests/transports/src/test/java/org/apache/cxf/systest/https/conduit/HTTPSConduitTest.java index 788f88230e..bcb9616c47 100644 --- a/systests/transports/src/test/java/org/apache/cxf/systest/https/conduit/HTTPSConduitTest.java +++ b/systests/transports/src/test/java/org/apache/cxf/systest/https/conduit/HTTPSConduitTest.java @@ -376,6 +376,8 @@ public class HTTPSConduitTest extends AbstractBusClientServerTestBase { // Okay, I'm sick of configuration files. // This also tests dynamic configuration of the conduit. Client client = ClientProxy.getClient(bethal); + client.getRequestContext().put("share.httpclient.http.conduit", false); + HTTPConduit http = (HTTPConduit) client.getConduit(); @@ -419,6 +421,8 @@ public class HTTPSConduitTest extends AbstractBusClientServerTestBase { // Okay, I'm sick of configuration files. // This also tests dynamic configuration of the conduit. Client client = ClientProxy.getClient(poltim); + client.getRequestContext().put("share.httpclient.http.conduit", false); + HTTPConduit http = (HTTPConduit) client.getConduit(); @@ -507,6 +511,8 @@ public class HTTPSConduitTest extends AbstractBusClientServerTestBase { // Okay, I'm sick of configuration files. // This also tests dynamic configuration of the conduit. Client client = ClientProxy.getClient(bethal); + client.getRequestContext().put("share.httpclient.http.conduit", false); + HTTPConduit http = (HTTPConduit) client.getConduit(); @@ -568,6 +574,8 @@ public class HTTPSConduitTest extends AbstractBusClientServerTestBase { // Okay, I'm sick of configuration files. // This also tests dynamic configuration of the conduit. Client client = ClientProxy.getClient(tarpin); + client.getRequestContext().put("share.httpclient.http.conduit", false); + HTTPConduit http = (HTTPConduit) client.getConduit(); @@ -713,6 +721,7 @@ public class HTTPSConduitTest extends AbstractBusClientServerTestBase { // Okay, I'm sick of configuration files. // This also tests dynamic configuration of the conduit. Client client = ClientProxy.getClient(gordy); + client.getRequestContext().put("share.httpclient.http.conduit", false); HTTPConduit http = (HTTPConduit) client.getConduit();