This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 3.4.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 04cce94cfd9835f789016d79a905896027a68a30 Author: Andriy Redko <[email protected]> AuthorDate: Thu Sep 23 19:42:18 2021 -0400 CXF-8597: CXF JAXRS client not closing HTTP connections (#851) * CXF-8597: CXF JAXRS client not closing HTTP connections * Added test case for generic WebClient flow * Added test case for generic JAXRSClientFactoryBean / proxy flow (cherry picked from commit d6a449a3a30aeb950b7de567d8d9e8aa5b00da36) --- .../org/apache/cxf/jaxrs/impl/ResponseImpl.java | 22 ++----- .../jaxrs/ClientHttpConnectionOutInterceptor.java | 71 ++++++++++++++++++++++ .../jaxrs/JAXRSMultithreadedClientTest.java | 17 ++++++ .../systest/jaxrs/JAXRSRequestDispatcherTest.java | 38 ++++++++++++ 4 files changed, 130 insertions(+), 18 deletions(-) diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseImpl.java index b7ba081..fc711f8 100644 --- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseImpl.java +++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseImpl.java @@ -55,7 +55,6 @@ import javax.ws.rs.core.Response.Status.Family; import javax.ws.rs.ext.ReaderInterceptor; import javax.ws.rs.ext.RuntimeDelegate.HeaderDelegate; import javax.xml.stream.XMLStreamReader; -import javax.xml.transform.Source; import org.apache.cxf.helpers.IOUtils; import org.apache.cxf.io.ReaderInputStream; @@ -405,7 +404,7 @@ public final class ResponseImpl extends Response { @Override public <T> T readEntity(Class<T> cls, Annotation[] anns) throws ProcessingException, IllegalStateException { - return doReadEntity(cls, cls, anns, true); + return doReadEntity(cls, cls, anns); } @Override @@ -413,20 +412,13 @@ public final class ResponseImpl extends Response { public <T> T readEntity(GenericType<T> genType, Annotation[] anns) throws ProcessingException, IllegalStateException { return doReadEntity((Class<T>) genType.getRawType(), - genType.getType(), anns, true); + genType.getType(), anns); } public <T> T doReadEntity(Class<T> cls, Type t, Annotation[] anns) throws ProcessingException, IllegalStateException { - return doReadEntity(cls, t, anns, false); - } - - public <T> T doReadEntity(Class<T> cls, Type t, Annotation[] anns, boolean closeAfterRead) - throws ProcessingException, IllegalStateException { checkEntityIsClosed(); - //according to javadoc, should close when is not buffered. - boolean shouldClose = !this.entityBufferred; if (lastEntity != null && cls.isAssignableFrom(lastEntity.getClass()) && !(lastEntity instanceof InputStream)) { @@ -476,11 +468,7 @@ public final class ResponseImpl extends Response { responseMessage); // close the entity after readEntity is called. T tCastLastEntity = castLastEntity(); - shouldClose = shouldClose && !(tCastLastEntity instanceof AutoCloseable) - && !(tCastLastEntity instanceof Source); - if (closeAfterRead && shouldClose) { - close(); - } + autoClose(cls, false); return tCastLastEntity; } catch (NoContentException ex) { //when content is empty, return null instead of throw exception to pass TCK @@ -489,9 +477,7 @@ public final class ResponseImpl extends Response { autoClose(cls, true); reportMessageHandlerProblem("MSG_READER_PROBLEM", cls, mediaType, ex); } else { - if (closeAfterRead && shouldClose) { - close(); - } + autoClose(cls, false); return null; } } catch (Exception ex) { diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/ClientHttpConnectionOutInterceptor.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/ClientHttpConnectionOutInterceptor.java new file mode 100644 index 0000000..66d8413 --- /dev/null +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/ClientHttpConnectionOutInterceptor.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs; + +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.cxf.interceptor.Fault; +import org.apache.cxf.message.Message; +import org.apache.cxf.phase.AbstractPhaseInterceptor; +import org.apache.cxf.phase.Phase; + +class ClientHttpConnectionOutInterceptor extends AbstractPhaseInterceptor<Message> { + private Collection<HttpURLConnection> connections = new ArrayList<>(); + + ClientHttpConnectionOutInterceptor() { + super(Phase.SEND_ENDING); + } + + @Override + public void handleMessage(Message message) throws Fault { + final HttpURLConnection connection = (HttpURLConnection) message.get("http.connection"); + synchronized (connections) { + connections.add(connection); + } + } + + public boolean checkAllClosed() { + synchronized (connections) { + if (connections.isEmpty()) { + return false; + } + + return !connections + .stream() + .anyMatch(this::hasUnclosedInputStream); + } + } + + private boolean hasUnclosedInputStream(HttpURLConnection connection) { + try { + final InputStream inputStream = connection.getInputStream(); + inputStream.read(new byte [] {}); /* 0 bytes to read */ + return true; + } catch (IOException ex) { + // The HttpInputStream throws an IOException in case the input stream is already + // closed (since we actually read nothing). + return !ex.getMessage().equals("stream is closed"); + } + } +} \ No newline at end of file diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSMultithreadedClientTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSMultithreadedClientTest.java index 92376f7..91533e9 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSMultithreadedClientTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSMultithreadedClientTest.java @@ -33,6 +33,7 @@ import javax.ws.rs.core.Response; import org.apache.cxf.helpers.IOUtils; import org.apache.cxf.jaxrs.client.Client; import org.apache.cxf.jaxrs.client.JAXRSClientFactory; +import org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean; import org.apache.cxf.jaxrs.client.WebClient; import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; @@ -41,7 +42,9 @@ import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; public class JAXRSMultithreadedClientTest extends AbstractBusClientServerTestBase { @@ -102,6 +105,20 @@ public class JAXRSMultithreadedClientTest extends AbstractBusClientServerTestBas runProxies(proxy.echoThroughBookStoreSub(), 10, true, true); } + + @Test + public void testSimpleProxyEnsureResponseStreamIsClosed() throws Exception { + final ClientHttpConnectionOutInterceptor interceptor = new ClientHttpConnectionOutInterceptor(); + final JAXRSClientFactoryBean bean = new JAXRSClientFactoryBean(); + bean.setAddress("http://localhost:" + PORT); + bean.setServiceClass(BookStore.class); + bean.getOutInterceptors().add(interceptor); + + final BookStore proxy = bean.create(BookStore.class); + runProxies(proxy, 10, true, false); + + assertThat(interceptor.checkAllClosed(), is(true)); + } private void runWebClients(WebClient client, int numberOfClients, boolean threadSafe, boolean stateCanBeChanged) throws Exception { diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSRequestDispatcherTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSRequestDispatcherTest.java index 7002a30..5055d9d 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSRequestDispatcherTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSRequestDispatcherTest.java @@ -19,6 +19,7 @@ package org.apache.cxf.systest.jaxrs; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -33,8 +34,11 @@ import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; public class JAXRSRequestDispatcherTest extends AbstractBusClientServerTestBase { @@ -60,6 +64,7 @@ public class JAXRSRequestDispatcherTest extends AbstractBusClientServerTestBase } private void doTestGetBookHTML(String endpointAddress) throws Exception { + WebClient client = WebClient.create(endpointAddress) .accept(MediaType.TEXT_HTML); @@ -143,6 +148,38 @@ public class JAXRSRequestDispatcherTest extends AbstractBusClientServerTestBase assertEquals("Welcome", welcome); } + @Test + public void testGetBookHTMLFromEnsureResponseStreamIsUnclosed() throws Exception { + final ClientHttpConnectionOutInterceptor interceptor = new ClientHttpConnectionOutInterceptor(); + + String endpointAddress = "http://localhost:" + PORT + "/the/bookstore4/books/html/123"; + WebClient client = WebClient.create(endpointAddress).accept(MediaType.TEXT_HTML); + WebClient.getConfig(client).getOutInterceptors().add(interceptor); + + XMLSource source = client.get(XMLSource.class); + Map<String, String> namespaces = new HashMap<>(); + namespaces.put("xhtml", "http://www.w3.org/1999/xhtml"); + namespaces.put("books", "http://www.w3.org/books"); + String value = source.getValue("xhtml:html/xhtml:body/xhtml:ul/books:bookTag", namespaces); + assertEquals("CXF Rocks", value); + + assertThat(interceptor.checkAllClosed(), is(false)); + } + + @Test + public void testGetBookHTMLFromEnsureResponseStreamIsAutoClosed() throws Exception { + final ClientHttpConnectionOutInterceptor interceptor = new ClientHttpConnectionOutInterceptor(); + final Map<String, Object> properties = Collections.singletonMap("response.stream.auto.close", true); + + String endpointAddress = "http://localhost:" + PORT + "/the/bookstore4/books/html/123"; + WebClient client = WebClient.create(endpointAddress, properties).accept(MediaType.TEXT_HTML); + WebClient.getConfig(client).getOutInterceptors().add(interceptor); + + final String source = client.get(String.class); + assertThat(source, containsString("CXF Rocks")); + assertThat(interceptor.checkAllClosed(), is(true)); + } + private void doTestGetBookHTMLFromWelcomeList(String address) throws Exception { WebClient client = WebClient.create(address) .accept(MediaType.TEXT_HTML); @@ -154,4 +191,5 @@ public class JAXRSRequestDispatcherTest extends AbstractBusClientServerTestBase String value = source.getValue("xhtml:html/xhtml:body/xhtml:ul/books:bookTag", namespaces); assertEquals("Welcome to CXF", value); } + }
