This is an automated email from the ASF dual-hosted git repository.

dkulp pushed a commit to branch 3.6.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 5b69dd06a61a8985e9166108e66e56f481a438cf
Author: Daniel Kulp <d...@kulp.com>
AuthorDate: Tue Jun 13 13:27:57 2023 -0400

    [CXF-8885] Make an attempt to get the HttpClient selector threads to 
shutdown sooner
    Technically, they should shut themselves down about 3 seconds after a 
garbage collection assuming the client they are associated with is cleaned up 
and released.   However, if many clients are used, this may be too long and 
thus we'll make an attempt (via some hacks) to get the thread to shutdown 
sooner.  It's a shame HttpClient doesn't implement Closeable. :(
    
    (cherry picked from commit 65711680af99de16f56cbaa819d207edb9428e8a)
---
 .../cxf/transport/http/HttpClientHTTPConduit.java  | 40 ++++++++++
 .../cxf/systest/jaxws/JaxWsClientThreadTest.java   | 86 ++++++++++++++++++++++
 2 files changed, 126 insertions(+)

diff --git 
a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
 
b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
index 2546e24bac..af7c81b2ec 100644
--- 
a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
+++ 
b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
@@ -25,6 +25,7 @@ import java.io.OutputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 import java.io.PushbackInputStream;
+import java.lang.reflect.InvocationTargetException;
 import java.net.ConnectException;
 import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
@@ -33,6 +34,7 @@ import java.net.ProxySelector;
 import java.net.SocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URLConnection;
 import java.net.UnknownHostException;
 import java.net.http.HttpClient;
 import java.net.http.HttpClient.Redirect;
@@ -71,6 +73,7 @@ import javax.net.ssl.SSLSession;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.common.util.PropertyUtils;
+import org.apache.cxf.common.util.ReflectionUtil;
 import org.apache.cxf.configuration.jsse.TLSClientParameters;
 import org.apache.cxf.helpers.HttpHeaderHelper;
 import org.apache.cxf.helpers.JavaUtils;
@@ -110,6 +113,43 @@ public class HttpClientHTTPConduit extends 
URLConnectionHTTPConduit {
                 || lastURL.getPort() != url.getPort();
     }
     
+    /**
+     * Close the conduit
+     */
+    public void close() {
+        if (client != null) {
+            String name = client.toString();
+            client = null;
+            tryToShutdownSelector(name);
+        }
+        defaultAddress = null;
+        super.close();
+    }
+    private synchronized void tryToShutdownSelector(String n) {
+        // it can take three seconds (or more) for the JVM to determine the 
client
+        // is unreferenced and then shutdown the selector thread, we'll try 
and speed that
+        // up.  This is somewhat of a complete hack.   
+        int idx = n.lastIndexOf('(');
+        if (idx > 0) {
+            n = n.substring(idx + 1);
+            n = n.substring(0, n.length() - 1);
+            n = "HttpClient-" + n + "-SelectorManager";
+        }
+        try {        
+            ThreadGroup rootGroup = Thread.currentThread().getThreadGroup();
+            Thread threads[] = new Thread[rootGroup.activeCount()];
+            int cnt = rootGroup.enumerate(threads);
+            for (int x = 0; x < cnt; x++) {
+                if (threads[x].getName().contains(n)) {
+                    threads[x].interrupt();
+                }            
+            }
+        } catch (Throwable t) {
+            //ignore, nothing we can do except wait for the garbage collection
+            //and then the three seconds for the timeout
+        }
+    }
+    
     @Override
     protected void setupConnection(Message message, Address address, 
HTTPClientPolicy csPolicy) throws IOException {
         URI uri = address.getURI();
diff --git 
a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/JaxWsClientThreadTest.java
 
b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/JaxWsClientThreadTest.java
index d7c247b797..e853b0bf11 100644
--- 
a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/JaxWsClientThreadTest.java
+++ 
b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/JaxWsClientThreadTest.java
@@ -220,4 +220,90 @@ public class JaxWsClientThreadTest extends AbstractCXFTest 
{
                    .get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY));
     }
 
+    @Test
+    public void testMultiGreeterThreadSafety() throws Throwable {
+
+        URL url = getClass().getResource("/wsdl/hello_world.wsdl");
+        final jakarta.xml.ws.Service s = jakarta.xml.ws.Service.create(url, 
serviceName);
+
+        final int numThreads = 50;
+        final Throwable[] errorHolder = new Throwable[numThreads];
+        
+        ThreadGroup rootGroup = Thread.currentThread().getThreadGroup();
+        ThreadGroup parentGroup;
+        while ((parentGroup = rootGroup.getParent()) != null) {
+            rootGroup = parentGroup;
+        }
+        int start = rootGroup.activeCount();
+
+        Thread[] threads = new Thread[numThreads];
+        for (int i = 0; i < numThreads; i++) {
+            final int tid = i;
+            Runnable r = new Runnable() {
+                public void run() {
+                    final Greeter greeter = s.getPort(portName, Greeter.class);
+                    try (AutoCloseable c = (AutoCloseable)greeter){
+                        final InvocationHandler handler = 
Proxy.getInvocationHandler(greeter);
+                        Map<String, Object> requestContext = 
((BindingProvider)handler).getRequestContext();                        
+                        
+                        final String protocol = "http-" + 
Thread.currentThread().getId();
+                        for (int i = 0; i < 10; i++) {
+                            String threadSpecificaddress = protocol + 
"://localhost:80/" + i;
+                            
requestContext.put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY,
+                                               threadSpecificaddress);
+                            assertEquals("we get what we set", 
threadSpecificaddress, requestContext
+                                         
.get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY));
+                            try {
+                                greeter.greetMe("Hi");
+                            } catch (WebServiceException expected) {
+                                //expected.getCause().printStackTrace();
+                                MalformedURLException mue = 
(MalformedURLException)expected
+                                    .getCause();
+                                if (mue == null || mue.getMessage() == null) {
+                                    throw expected;
+                                }
+                                assertTrue("protocol contains thread id from 
context", mue.getMessage()
+                                    .indexOf(protocol) != 0);
+                            }
+
+                            
requestContext.remove(BindingProvider.ENDPOINT_ADDRESS_PROPERTY);
+                            assertNull("property is null", requestContext
+                                         
.get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY));
+
+                        }
+                    } catch (Throwable t) {
+                        // capture assert failures
+                        errorHolder[tid] = t;
+                    }
+                }
+            };
+            threads[i] = new Thread(r);
+        }
+        for (int i = 0; i < numThreads; i++) {
+            threads[i].start();
+        }
+        for (int i = 0; i < numThreads; i++) {
+            threads[i].join();
+        }
+        for (int i = 0; i < numThreads; i++) {
+            if (errorHolder[i] != null) {
+                throw errorHolder[i];
+            }
+        }
+        
+        int end = rootGroup.activeCount();
+        int count = 0;
+        while (end > start && count < 30) {
+            Thread.sleep(100);
+            System.gc();
+            end = rootGroup.activeCount();
+        }
+        
+        
+        System.out.println("Start: " + start + "     End: " + end);
+        // we'll allow a few extra threads to be created for various things 
like GC, but we definitely shouldn't be anywhere 
+        // near numThreads of extra threads
+        assertTrue("Too many extra trheads created  " + end + "/" + start, 
(end - start) < 5);
+
+    }
 }

Reply via email to