This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.5.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 501cf8bdfd1987288f224c24f7114da18a184b9a
Author: Andriy Redko <drr...@gmail.com>
AuthorDate: Sat Nov 11 10:30:39 2023 -0500

    CXF-8911:Allow wrapping AsyncHTTPConduit response processing (using new 
AsyncHttpResponseWrapperFactory bus extension) (#1510)
    
    (cherry picked from commit 74069a140737603a0a59435de308401c4351a80b)
    (cherry picked from commit 03f5ea8eb1e1edd6502cb5df2d28b618422a572b)
---
 .../http/asyncclient/AsyncHTTPConduit.java         | 32 +++++++++----
 .../AsyncHttpResponseWrapperFactory.java           | 55 ++++++++++++++++++++++
 .../http/asyncclient/AsyncHTTPConduitTest.java     | 38 ++++++++++++++-
 .../http/asyncclient/hc5/AsyncHTTPConduit.java     | 21 ++++++++-
 .../hc5/AsyncHttpResponseWrapperFactory.java       | 55 ++++++++++++++++++++++
 .../http/asyncclient/hc5/AsyncHTTPConduitTest.java | 38 ++++++++++++++-
 6 files changed, 227 insertions(+), 12 deletions(-)

diff --git 
a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
 
b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
index b0294ad7ee..e0a1122206 100755
--- 
a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
+++ 
b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
@@ -66,6 +66,7 @@ import org.apache.cxf.transport.http.Address;
 import org.apache.cxf.transport.http.Headers;
 import org.apache.cxf.transport.http.URLConnectionHTTPConduit;
 import 
org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduitFactory.UseAsyncPolicy;
+import 
org.apache.cxf.transport.http.asyncclient.AsyncHttpResponseWrapperFactory.AsyncHttpResponseWrapper;
 import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.cxf.version.Version;
@@ -99,13 +100,15 @@ import org.apache.http.nio.util.HeapByteBufferAllocator;
 public class AsyncHTTPConduit extends URLConnectionHTTPConduit {
     public static final String USE_ASYNC = "use.async.http.conduit";
 
-    final AsyncHTTPConduitFactory factory;
-    volatile int lastTlsHash = -1;
-    volatile Object sslState;
-    volatile URI sslURL;
-    volatile SSLContext sslContext;
-    volatile SSLSession session;
-    volatile CloseableHttpAsyncClient client;
+    private final AsyncHTTPConduitFactory factory;
+    private final AsyncHttpResponseWrapperFactory 
asyncHttpResponseWrapperFactory;
+
+    private volatile int lastTlsHash = -1;
+    private volatile Object sslState;
+    private volatile URI sslURL;
+    private volatile SSLContext sslContext;
+    private volatile SSLSession session;
+    private volatile CloseableHttpAsyncClient client;
 
 
     public AsyncHTTPConduit(Bus b,
@@ -114,6 +117,7 @@ public class AsyncHTTPConduit extends 
URLConnectionHTTPConduit {
                             AsyncHTTPConduitFactory factory) throws 
IOException {
         super(b, ei, t);
         this.factory = factory;
+        this.asyncHttpResponseWrapperFactory = 
bus.getExtension(AsyncHttpResponseWrapperFactory.class);
     }
 
     public synchronized CloseableHttpAsyncClient getHttpAsyncClient() throws 
IOException {
@@ -478,7 +482,7 @@ public class AsyncHTTPConduit extends 
URLConnectionHTTPConduit {
                 return;
             }
 
-            CXFResponseCallback responseCallback = new CXFResponseCallback() {
+            CXFResponseCallback delegate = new CXFResponseCallback() {
                 @Override
                 public void responseReceived(HttpResponse response) {
                     setHttpResponse(response);
@@ -486,6 +490,18 @@ public class AsyncHTTPConduit extends 
URLConnectionHTTPConduit {
 
             };
 
+            CXFResponseCallback responseCallback = delegate;
+            if (asyncHttpResponseWrapperFactory != null) {
+                final AsyncHttpResponseWrapper wrapper = 
asyncHttpResponseWrapperFactory.create();
+                if (wrapper != null) {
+                    responseCallback = new CXFResponseCallback() {
+                        @Override
+                        public void responseReceived(HttpResponse response) {
+                            wrapper.responseReceived(response, 
delegate::responseReceived);
+                        }
+                    };
+                }
+            }
             FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
 
                 public void completed(Boolean result) {
diff --git 
a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHttpResponseWrapperFactory.java
 
b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHttpResponseWrapperFactory.java
new file mode 100644
index 0000000000..bec9e695fe
--- /dev/null
+++ 
b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHttpResponseWrapperFactory.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient;
+
+import java.util.function.Consumer;
+
+import org.apache.cxf.Bus;
+import org.apache.http.HttpResponse;
+
+/**
+ * The {@link Bus} extension to allow wrapping up the response processing of 
+ * the {@link AsyncHTTPConduit} instance.
+ */
+@FunctionalInterface
+public interface AsyncHttpResponseWrapperFactory {
+    /** 
+     * Creates new instance of the {@link AsyncHttpResponseWrapper} 
+     * @return new instance of the {@link AsyncHttpResponseWrapper} (or null)
+     */
+    AsyncHttpResponseWrapper create();
+
+    /**
+     * The wrapper around the response that will be called by the {@link 
AsyncHTTPConduit} 
+     * instance once the response is received. 
+     */
+    interface AsyncHttpResponseWrapper {
+        /**
+         * The callback which is called by the {@link AsyncHTTPConduit} 
instance once 
+         * the response is received. The delegating response handler is passed 
as the
+         * an argument and has to be called.
+         * @param response the response received
+         * @param delegate delegating response handler
+         */
+        default void responseReceived(HttpResponse response, 
Consumer<HttpResponse> delegate) {
+            delegate.accept(response);
+        }
+    }
+}
diff --git 
a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
 
b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
index ec412163f6..7f40706620 100644
--- 
a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
+++ 
b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import javax.xml.ws.AsyncHandler;
 import javax.xml.ws.Endpoint;
@@ -44,6 +45,7 @@ import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 import org.apache.cxf.transport.http.HTTPConduit;
 import org.apache.cxf.transport.http.HTTPConduitFactory;
+import 
org.apache.cxf.transport.http.asyncclient.AsyncHttpResponseWrapperFactory.AsyncHttpResponseWrapper;
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.cxf.workqueue.AutomaticWorkQueueImpl;
 import org.apache.cxf.workqueue.WorkQueueManager;
@@ -51,12 +53,15 @@ import org.apache.hello_world_soap_http.Greeter;
 import org.apache.hello_world_soap_http.SOAPService;
 import org.apache.hello_world_soap_http.types.GreetMeLaterResponse;
 import org.apache.hello_world_soap_http.types.GreetMeResponse;
+import org.apache.http.HttpResponse;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -243,7 +248,7 @@ public class AsyncHTTPConduitTest extends 
AbstractBusClientServerTestBase {
     }
 
     @Test
-    public void testInovationWithHCAddress() throws Exception {
+    public void testInvocationWithHCAddress() throws Exception {
         String address = "hc://http://localhost:"; + PORT + 
"/SoapContext/SoapPort";
         JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
         factory.setServiceClass(Greeter.class);
@@ -264,6 +269,7 @@ public class AsyncHTTPConduitTest extends 
AbstractBusClientServerTestBase {
         String response = greeter.greetMe("test");
         assertEquals("Get a wrong response", "Hello test", response);
     }
+
     @Test
     public void testCall() throws Exception {
         updateAddressPort(g, PORT);
@@ -274,6 +280,7 @@ public class AsyncHTTPConduitTest extends 
AbstractBusClientServerTestBase {
         c.setClient(cp);
         assertEquals("Hello " + request, g.greetMe(request));
     }
+
     @Test
     public void testCallAsync() throws Exception {
         updateAddressPort(g, PORT);
@@ -294,6 +301,35 @@ public class AsyncHTTPConduitTest extends 
AbstractBusClientServerTestBase {
         }).get();
     }
 
+    @Test
+    public void testCallAsyncWithResponseWrapper() throws Exception {
+        try {
+            final CountDownLatch latch = new CountDownLatch(1);
+            final AsyncHttpResponseWrapper wrapper = new 
AsyncHttpResponseWrapper() {
+                @Override
+                public void responseReceived(HttpResponse response, 
Consumer<HttpResponse> delegate) {
+                    delegate.accept(response);
+                    latch.countDown();
+                }
+            };
+
+            getStaticBus().setExtension(() -> wrapper, 
AsyncHttpResponseWrapperFactory.class);
+    
+            final String address = "hc://http://localhost:"; + PORT + 
"/SoapContext/SoapPort";
+            final Greeter greeter = new SOAPService().getSoapPort();
+            setAddress(greeter, address);
+
+            greeter.greetMeLaterAsync(1000, new 
AsyncHandler<GreetMeLaterResponse>() {
+                public void handleResponse(Response<GreetMeLaterResponse> res) 
{
+                }
+            }).get();
+
+            assertThat(latch.await(5, TimeUnit.SECONDS), is(true));
+        } finally {
+            getStaticBus().setExtension(null, 
AsyncHttpResponseWrapperFactory.class);
+        }
+    }
+
     @Test
     public void testCallAsyncCallbackInvokedOnlyOnce() throws Exception {
         // This test is especially targeted for RHEL 6.8
diff --git 
a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java
 
b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java
index 7e721628f0..010ef1b9aa 100644
--- 
a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java
+++ 
b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java
@@ -66,6 +66,7 @@ import org.apache.cxf.transport.http.Address;
 import org.apache.cxf.transport.http.Headers;
 import org.apache.cxf.transport.http.URLConnectionHTTPConduit;
 import 
org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduitFactory.UseAsyncPolicy;
+import 
org.apache.cxf.transport.http.asyncclient.hc5.AsyncHttpResponseWrapperFactory.AsyncHttpResponseWrapper;
 import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.cxf.version.Version;
@@ -105,6 +106,7 @@ public class AsyncHTTPConduit extends 
URLConnectionHTTPConduit {
     public static final String USE_ASYNC = "use.async.http.conduit";
 
     private final AsyncHTTPConduitFactory factory;
+    private final AsyncHttpResponseWrapperFactory 
asyncHttpResponseWrapperFactory;
     private volatile int lastTlsHash = -1;
     private volatile Object sslState;
     private volatile URI sslURL;
@@ -116,6 +118,7 @@ public class AsyncHTTPConduit extends 
URLConnectionHTTPConduit {
             throws IOException {
         super(b, ei, t);
         this.factory = factory;
+        this.asyncHttpResponseWrapperFactory = 
bus.getExtension(AsyncHttpResponseWrapperFactory.class);
     }
 
     public synchronized CloseableHttpAsyncClient getHttpAsyncClient(final 
TlsStrategy tlsStrategy)
@@ -491,8 +494,8 @@ public class AsyncHTTPConduit extends 
URLConnectionHTTPConduit {
             if (connectionFuture != null) {
                 return;
             }
-
-            CXFResponseCallback responseCallback = new CXFResponseCallback() {
+            
+            final CXFResponseCallback delegate = new CXFResponseCallback() {
                 @Override
                 public void responseReceived(HttpResponse response) {
                     setHttpResponse(response);
@@ -500,6 +503,20 @@ public class AsyncHTTPConduit extends 
URLConnectionHTTPConduit {
 
             };
 
+            CXFResponseCallback responseCallback = delegate;
+            if (asyncHttpResponseWrapperFactory != null) {
+                final AsyncHttpResponseWrapper wrapper = 
asyncHttpResponseWrapperFactory.create();
+                if (wrapper != null) {
+                    responseCallback = new CXFResponseCallback() {
+                        @Override
+                        public void responseReceived(HttpResponse response) {
+                            wrapper.responseReceived(response, 
delegate::responseReceived);
+                        }
+                    };
+                }
+            }
+
+
             FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
 
                 public void completed(Boolean result) {
diff --git 
a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHttpResponseWrapperFactory.java
 
b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHttpResponseWrapperFactory.java
new file mode 100644
index 0000000000..eb0725c4c8
--- /dev/null
+++ 
b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHttpResponseWrapperFactory.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient.hc5;
+
+import java.util.function.Consumer;
+
+import org.apache.cxf.Bus;
+import org.apache.hc.core5.http.HttpResponse;
+
+/**
+ * The {@link Bus} extension to allow wrapping up the response processing of 
+ * the {@link AsyncHTTPConduit} instance.
+ */
+@FunctionalInterface
+public interface AsyncHttpResponseWrapperFactory {
+    /** 
+     * Creates new instance of the {@link AsyncHttpResponseWrapper} 
+     * @return new instance of the {@link AsyncHttpResponseWrapper} (or null)
+     */
+    AsyncHttpResponseWrapper create();
+
+    /**
+     * The wrapper around the response that will be called by the {@link 
AsyncHTTPConduit} 
+     * instance once the response is received. 
+     */
+    interface AsyncHttpResponseWrapper {
+        /**
+         * The callback which is called by the {@link AsyncHTTPConduit} 
instance once 
+         * the response is received. The delegating response handler is passed 
as the
+         * an argument and has to be called.
+         * @param response the response received
+         * @param delegate delegating response handler
+         */
+        default void responseReceived(HttpResponse response, 
Consumer<HttpResponse> delegate) {
+            delegate.accept(response);
+        }
+    }
+}
diff --git 
a/rt/transports/http-hc5/src/test/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduitTest.java
 
b/rt/transports/http-hc5/src/test/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduitTest.java
index 273f534116..13615860d4 100644
--- 
a/rt/transports/http-hc5/src/test/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduitTest.java
+++ 
b/rt/transports/http-hc5/src/test/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduitTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import javax.xml.ws.AsyncHandler;
 import javax.xml.ws.Endpoint;
@@ -45,9 +46,11 @@ import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 import org.apache.cxf.transport.http.HTTPConduit;
 import org.apache.cxf.transport.http.HTTPConduitFactory;
+import 
org.apache.cxf.transport.http.asyncclient.hc5.AsyncHttpResponseWrapperFactory.AsyncHttpResponseWrapper;
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.cxf.workqueue.AutomaticWorkQueueImpl;
 import org.apache.cxf.workqueue.WorkQueueManager;
+import org.apache.hc.core5.http.HttpResponse;
 import org.apache.hello_world_soap_http.Greeter;
 import org.apache.hello_world_soap_http.SOAPService;
 import org.apache.hello_world_soap_http.types.GreetMeLaterResponse;
@@ -57,6 +60,8 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -217,7 +222,7 @@ public class AsyncHTTPConduitTest extends 
AbstractBusClientServerTestBase {
     }
 
     @Test
-    public void testInovationWithHCAddress() throws Exception {
+    public void testInvocationWithHCAddress() throws Exception {
         String address = "hc://http://localhost:"; + PORT + 
"/SoapContext/SoapPort";
         JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
         factory.setServiceClass(Greeter.class);
@@ -238,6 +243,7 @@ public class AsyncHTTPConduitTest extends 
AbstractBusClientServerTestBase {
         String response = greeter.greetMe("test");
         assertEquals("Get a wrong response", "Hello test", response);
     }
+
     @Test
     public void testCall() throws Exception {
         updateAddressPort(g, PORT);
@@ -248,6 +254,7 @@ public class AsyncHTTPConduitTest extends 
AbstractBusClientServerTestBase {
         c.setClient(cp);
         assertEquals("Hello " + request, g.greetMe(request));
     }
+
     @Test
     public void testCallAsync() throws Exception {
         updateAddressPort(g, PORT);
@@ -268,6 +275,35 @@ public class AsyncHTTPConduitTest extends 
AbstractBusClientServerTestBase {
         }).get();
     }
 
+    @Test
+    public void testCallAsyncWithResponseWrapper() throws Exception {
+        try {
+            final CountDownLatch latch = new CountDownLatch(1);
+            final AsyncHttpResponseWrapper wrapper = new 
AsyncHttpResponseWrapper() {
+                @Override
+                public void responseReceived(HttpResponse response, 
Consumer<HttpResponse> delegate) {
+                    delegate.accept(response);
+                    latch.countDown();
+                }
+            };
+
+            getStaticBus().setExtension(() -> wrapper, 
AsyncHttpResponseWrapperFactory.class);
+
+            final String address = "hc://http://localhost:"; + PORT + 
"/SoapContext/SoapPort";
+            final Greeter greeter = new SOAPService().getSoapPort();
+            setAddress(greeter, address);
+    
+            greeter.greetMeLaterAsync(1000, new 
AsyncHandler<GreetMeLaterResponse>() {
+                public void handleResponse(Response<GreetMeLaterResponse> res) 
{
+                }
+            }).get();
+
+            assertThat(latch.await(5, TimeUnit.SECONDS), is(true));
+        } finally {
+            getStaticBus().setExtension(null, 
AsyncHttpResponseWrapperFactory.class);
+        }
+    }
+
     @Test
     public void testCallAsyncCallbackInvokedOnlyOnce() throws Exception {
         // This test is especially targeted for RHEL 6.8

Reply via email to