This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 3.5.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 501cf8bdfd1987288f224c24f7114da18a184b9a Author: Andriy Redko <drr...@gmail.com> AuthorDate: Sat Nov 11 10:30:39 2023 -0500 CXF-8911:Allow wrapping AsyncHTTPConduit response processing (using new AsyncHttpResponseWrapperFactory bus extension) (#1510) (cherry picked from commit 74069a140737603a0a59435de308401c4351a80b) (cherry picked from commit 03f5ea8eb1e1edd6502cb5df2d28b618422a572b) --- .../http/asyncclient/AsyncHTTPConduit.java | 32 +++++++++---- .../AsyncHttpResponseWrapperFactory.java | 55 ++++++++++++++++++++++ .../http/asyncclient/AsyncHTTPConduitTest.java | 38 ++++++++++++++- .../http/asyncclient/hc5/AsyncHTTPConduit.java | 21 ++++++++- .../hc5/AsyncHttpResponseWrapperFactory.java | 55 ++++++++++++++++++++++ .../http/asyncclient/hc5/AsyncHTTPConduitTest.java | 38 ++++++++++++++- 6 files changed, 227 insertions(+), 12 deletions(-) diff --git a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java index b0294ad7ee..e0a1122206 100755 --- a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java +++ b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java @@ -66,6 +66,7 @@ import org.apache.cxf.transport.http.Address; import org.apache.cxf.transport.http.Headers; import org.apache.cxf.transport.http.URLConnectionHTTPConduit; import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduitFactory.UseAsyncPolicy; +import org.apache.cxf.transport.http.asyncclient.AsyncHttpResponseWrapperFactory.AsyncHttpResponseWrapper; import org.apache.cxf.transport.https.HttpsURLConnectionInfo; import org.apache.cxf.transports.http.configuration.HTTPClientPolicy; import org.apache.cxf.version.Version; @@ -99,13 +100,15 @@ import org.apache.http.nio.util.HeapByteBufferAllocator; public class AsyncHTTPConduit extends URLConnectionHTTPConduit { public static final String USE_ASYNC = "use.async.http.conduit"; - final AsyncHTTPConduitFactory factory; - volatile int lastTlsHash = -1; - volatile Object sslState; - volatile URI sslURL; - volatile SSLContext sslContext; - volatile SSLSession session; - volatile CloseableHttpAsyncClient client; + private final AsyncHTTPConduitFactory factory; + private final AsyncHttpResponseWrapperFactory asyncHttpResponseWrapperFactory; + + private volatile int lastTlsHash = -1; + private volatile Object sslState; + private volatile URI sslURL; + private volatile SSLContext sslContext; + private volatile SSLSession session; + private volatile CloseableHttpAsyncClient client; public AsyncHTTPConduit(Bus b, @@ -114,6 +117,7 @@ public class AsyncHTTPConduit extends URLConnectionHTTPConduit { AsyncHTTPConduitFactory factory) throws IOException { super(b, ei, t); this.factory = factory; + this.asyncHttpResponseWrapperFactory = bus.getExtension(AsyncHttpResponseWrapperFactory.class); } public synchronized CloseableHttpAsyncClient getHttpAsyncClient() throws IOException { @@ -478,7 +482,7 @@ public class AsyncHTTPConduit extends URLConnectionHTTPConduit { return; } - CXFResponseCallback responseCallback = new CXFResponseCallback() { + CXFResponseCallback delegate = new CXFResponseCallback() { @Override public void responseReceived(HttpResponse response) { setHttpResponse(response); @@ -486,6 +490,18 @@ public class AsyncHTTPConduit extends URLConnectionHTTPConduit { }; + CXFResponseCallback responseCallback = delegate; + if (asyncHttpResponseWrapperFactory != null) { + final AsyncHttpResponseWrapper wrapper = asyncHttpResponseWrapperFactory.create(); + if (wrapper != null) { + responseCallback = new CXFResponseCallback() { + @Override + public void responseReceived(HttpResponse response) { + wrapper.responseReceived(response, delegate::responseReceived); + } + }; + } + } FutureCallback<Boolean> callback = new FutureCallback<Boolean>() { public void completed(Boolean result) { diff --git a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHttpResponseWrapperFactory.java b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHttpResponseWrapperFactory.java new file mode 100644 index 0000000000..bec9e695fe --- /dev/null +++ b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHttpResponseWrapperFactory.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.http.asyncclient; + +import java.util.function.Consumer; + +import org.apache.cxf.Bus; +import org.apache.http.HttpResponse; + +/** + * The {@link Bus} extension to allow wrapping up the response processing of + * the {@link AsyncHTTPConduit} instance. + */ +@FunctionalInterface +public interface AsyncHttpResponseWrapperFactory { + /** + * Creates new instance of the {@link AsyncHttpResponseWrapper} + * @return new instance of the {@link AsyncHttpResponseWrapper} (or null) + */ + AsyncHttpResponseWrapper create(); + + /** + * The wrapper around the response that will be called by the {@link AsyncHTTPConduit} + * instance once the response is received. + */ + interface AsyncHttpResponseWrapper { + /** + * The callback which is called by the {@link AsyncHTTPConduit} instance once + * the response is received. The delegating response handler is passed as the + * an argument and has to be called. + * @param response the response received + * @param delegate delegating response handler + */ + default void responseReceived(HttpResponse response, Consumer<HttpResponse> delegate) { + delegate.accept(response); + } + } +} diff --git a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java index ec412163f6..7f40706620 100644 --- a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java +++ b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import javax.xml.ws.AsyncHandler; import javax.xml.ws.Endpoint; @@ -44,6 +45,7 @@ import org.apache.cxf.message.MessageImpl; import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; import org.apache.cxf.transport.http.HTTPConduit; import org.apache.cxf.transport.http.HTTPConduitFactory; +import org.apache.cxf.transport.http.asyncclient.AsyncHttpResponseWrapperFactory.AsyncHttpResponseWrapper; import org.apache.cxf.transports.http.configuration.HTTPClientPolicy; import org.apache.cxf.workqueue.AutomaticWorkQueueImpl; import org.apache.cxf.workqueue.WorkQueueManager; @@ -51,12 +53,15 @@ import org.apache.hello_world_soap_http.Greeter; import org.apache.hello_world_soap_http.SOAPService; import org.apache.hello_world_soap_http.types.GreetMeLaterResponse; import org.apache.hello_world_soap_http.types.GreetMeResponse; +import org.apache.http.HttpResponse; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @@ -243,7 +248,7 @@ public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase { } @Test - public void testInovationWithHCAddress() throws Exception { + public void testInvocationWithHCAddress() throws Exception { String address = "hc://http://localhost:" + PORT + "/SoapContext/SoapPort"; JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); factory.setServiceClass(Greeter.class); @@ -264,6 +269,7 @@ public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase { String response = greeter.greetMe("test"); assertEquals("Get a wrong response", "Hello test", response); } + @Test public void testCall() throws Exception { updateAddressPort(g, PORT); @@ -274,6 +280,7 @@ public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase { c.setClient(cp); assertEquals("Hello " + request, g.greetMe(request)); } + @Test public void testCallAsync() throws Exception { updateAddressPort(g, PORT); @@ -294,6 +301,35 @@ public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase { }).get(); } + @Test + public void testCallAsyncWithResponseWrapper() throws Exception { + try { + final CountDownLatch latch = new CountDownLatch(1); + final AsyncHttpResponseWrapper wrapper = new AsyncHttpResponseWrapper() { + @Override + public void responseReceived(HttpResponse response, Consumer<HttpResponse> delegate) { + delegate.accept(response); + latch.countDown(); + } + }; + + getStaticBus().setExtension(() -> wrapper, AsyncHttpResponseWrapperFactory.class); + + final String address = "hc://http://localhost:" + PORT + "/SoapContext/SoapPort"; + final Greeter greeter = new SOAPService().getSoapPort(); + setAddress(greeter, address); + + greeter.greetMeLaterAsync(1000, new AsyncHandler<GreetMeLaterResponse>() { + public void handleResponse(Response<GreetMeLaterResponse> res) { + } + }).get(); + + assertThat(latch.await(5, TimeUnit.SECONDS), is(true)); + } finally { + getStaticBus().setExtension(null, AsyncHttpResponseWrapperFactory.class); + } + } + @Test public void testCallAsyncCallbackInvokedOnlyOnce() throws Exception { // This test is especially targeted for RHEL 6.8 diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java index 7e721628f0..010ef1b9aa 100644 --- a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java +++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java @@ -66,6 +66,7 @@ import org.apache.cxf.transport.http.Address; import org.apache.cxf.transport.http.Headers; import org.apache.cxf.transport.http.URLConnectionHTTPConduit; import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduitFactory.UseAsyncPolicy; +import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHttpResponseWrapperFactory.AsyncHttpResponseWrapper; import org.apache.cxf.transport.https.HttpsURLConnectionInfo; import org.apache.cxf.transports.http.configuration.HTTPClientPolicy; import org.apache.cxf.version.Version; @@ -105,6 +106,7 @@ public class AsyncHTTPConduit extends URLConnectionHTTPConduit { public static final String USE_ASYNC = "use.async.http.conduit"; private final AsyncHTTPConduitFactory factory; + private final AsyncHttpResponseWrapperFactory asyncHttpResponseWrapperFactory; private volatile int lastTlsHash = -1; private volatile Object sslState; private volatile URI sslURL; @@ -116,6 +118,7 @@ public class AsyncHTTPConduit extends URLConnectionHTTPConduit { throws IOException { super(b, ei, t); this.factory = factory; + this.asyncHttpResponseWrapperFactory = bus.getExtension(AsyncHttpResponseWrapperFactory.class); } public synchronized CloseableHttpAsyncClient getHttpAsyncClient(final TlsStrategy tlsStrategy) @@ -491,8 +494,8 @@ public class AsyncHTTPConduit extends URLConnectionHTTPConduit { if (connectionFuture != null) { return; } - - CXFResponseCallback responseCallback = new CXFResponseCallback() { + + final CXFResponseCallback delegate = new CXFResponseCallback() { @Override public void responseReceived(HttpResponse response) { setHttpResponse(response); @@ -500,6 +503,20 @@ public class AsyncHTTPConduit extends URLConnectionHTTPConduit { }; + CXFResponseCallback responseCallback = delegate; + if (asyncHttpResponseWrapperFactory != null) { + final AsyncHttpResponseWrapper wrapper = asyncHttpResponseWrapperFactory.create(); + if (wrapper != null) { + responseCallback = new CXFResponseCallback() { + @Override + public void responseReceived(HttpResponse response) { + wrapper.responseReceived(response, delegate::responseReceived); + } + }; + } + } + + FutureCallback<Boolean> callback = new FutureCallback<Boolean>() { public void completed(Boolean result) { diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHttpResponseWrapperFactory.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHttpResponseWrapperFactory.java new file mode 100644 index 0000000000..eb0725c4c8 --- /dev/null +++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHttpResponseWrapperFactory.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.http.asyncclient.hc5; + +import java.util.function.Consumer; + +import org.apache.cxf.Bus; +import org.apache.hc.core5.http.HttpResponse; + +/** + * The {@link Bus} extension to allow wrapping up the response processing of + * the {@link AsyncHTTPConduit} instance. + */ +@FunctionalInterface +public interface AsyncHttpResponseWrapperFactory { + /** + * Creates new instance of the {@link AsyncHttpResponseWrapper} + * @return new instance of the {@link AsyncHttpResponseWrapper} (or null) + */ + AsyncHttpResponseWrapper create(); + + /** + * The wrapper around the response that will be called by the {@link AsyncHTTPConduit} + * instance once the response is received. + */ + interface AsyncHttpResponseWrapper { + /** + * The callback which is called by the {@link AsyncHTTPConduit} instance once + * the response is received. The delegating response handler is passed as the + * an argument and has to be called. + * @param response the response received + * @param delegate delegating response handler + */ + default void responseReceived(HttpResponse response, Consumer<HttpResponse> delegate) { + delegate.accept(response); + } + } +} diff --git a/rt/transports/http-hc5/src/test/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduitTest.java b/rt/transports/http-hc5/src/test/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduitTest.java index 273f534116..13615860d4 100644 --- a/rt/transports/http-hc5/src/test/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduitTest.java +++ b/rt/transports/http-hc5/src/test/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduitTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import javax.xml.ws.AsyncHandler; import javax.xml.ws.Endpoint; @@ -45,9 +46,11 @@ import org.apache.cxf.message.MessageImpl; import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; import org.apache.cxf.transport.http.HTTPConduit; import org.apache.cxf.transport.http.HTTPConduitFactory; +import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHttpResponseWrapperFactory.AsyncHttpResponseWrapper; import org.apache.cxf.transports.http.configuration.HTTPClientPolicy; import org.apache.cxf.workqueue.AutomaticWorkQueueImpl; import org.apache.cxf.workqueue.WorkQueueManager; +import org.apache.hc.core5.http.HttpResponse; import org.apache.hello_world_soap_http.Greeter; import org.apache.hello_world_soap_http.SOAPService; import org.apache.hello_world_soap_http.types.GreetMeLaterResponse; @@ -57,6 +60,8 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @@ -217,7 +222,7 @@ public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase { } @Test - public void testInovationWithHCAddress() throws Exception { + public void testInvocationWithHCAddress() throws Exception { String address = "hc://http://localhost:" + PORT + "/SoapContext/SoapPort"; JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); factory.setServiceClass(Greeter.class); @@ -238,6 +243,7 @@ public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase { String response = greeter.greetMe("test"); assertEquals("Get a wrong response", "Hello test", response); } + @Test public void testCall() throws Exception { updateAddressPort(g, PORT); @@ -248,6 +254,7 @@ public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase { c.setClient(cp); assertEquals("Hello " + request, g.greetMe(request)); } + @Test public void testCallAsync() throws Exception { updateAddressPort(g, PORT); @@ -268,6 +275,35 @@ public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase { }).get(); } + @Test + public void testCallAsyncWithResponseWrapper() throws Exception { + try { + final CountDownLatch latch = new CountDownLatch(1); + final AsyncHttpResponseWrapper wrapper = new AsyncHttpResponseWrapper() { + @Override + public void responseReceived(HttpResponse response, Consumer<HttpResponse> delegate) { + delegate.accept(response); + latch.countDown(); + } + }; + + getStaticBus().setExtension(() -> wrapper, AsyncHttpResponseWrapperFactory.class); + + final String address = "hc://http://localhost:" + PORT + "/SoapContext/SoapPort"; + final Greeter greeter = new SOAPService().getSoapPort(); + setAddress(greeter, address); + + greeter.greetMeLaterAsync(1000, new AsyncHandler<GreetMeLaterResponse>() { + public void handleResponse(Response<GreetMeLaterResponse> res) { + } + }).get(); + + assertThat(latch.await(5, TimeUnit.SECONDS), is(true)); + } finally { + getStaticBus().setExtension(null, AsyncHttpResponseWrapperFactory.class); + } + } + @Test public void testCallAsyncCallbackInvokedOnlyOnce() throws Exception { // This test is especially targeted for RHEL 6.8