This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.6.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 362d16cc0d71445521e319126845a0943cff7447
Author: Andriy Redko <drr...@gmail.com>
AuthorDate: Wed Apr 19 07:59:29 2023 -0400

    CXF-8830: Fix 
org.apache.cxf.transport.http.netty.client.NettyHttpConduitTest.testCallAsyncCallbackInvokedOnlyOnce
 (#1214)
    
    (cherry picked from commit 9cb9eb0cfc3c6628793129c7660e0e6de557c963)
---
 .../hc5/CXFHttpAsyncResponseConsumer.java          |   2 +
 .../http/netty/client/NettyHttpClientHandler.java  |   2 +-
 .../http/netty/client/NettyHttpConduit.java        | 166 ++++++++++++---------
 .../http/netty/client/NettyHttpConduitTest.java    |   3 +
 4 files changed, 105 insertions(+), 68 deletions(-)

diff --git 
a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFHttpAsyncResponseConsumer.java
 
b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFHttpAsyncResponseConsumer.java
index d8851635ba..5808de4870 100644
--- 
a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFHttpAsyncResponseConsumer.java
+++ 
b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFHttpAsyncResponseConsumer.java
@@ -82,6 +82,8 @@ public class CXFHttpAsyncResponseConsumer implements 
AsyncResponseConsumer<Boole
                 buf.consumeContent(src, completed);
                 break;
             }
+
+            Thread.onSpinWait();
         }
     }
 
diff --git 
a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java
 
b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java
index 1a357b42fa..e92117b3c5 100644
--- 
a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java
+++ 
b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java
@@ -62,7 +62,7 @@ public class NettyHttpClientHandler extends 
ChannelDuplexHandler {
         if (msg instanceof NettyHttpClientRequest) {
             NettyHttpClientRequest request = (NettyHttpClientRequest)msg;
             sendedQueue.put(request);
-            ctx.writeAndFlush(request.getRequest());
+            ctx.writeAndFlush(request.getRequest(), promise);
         } else {
             super.write(ctx, msg, promise);
         }
diff --git 
a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java
 
b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java
index cd358d4314..de94af8ddf 100644
--- 
a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java
+++ 
b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java
@@ -37,6 +37,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.logging.Level;
 
 import javax.net.ssl.HostnameVerifier;
@@ -222,6 +226,10 @@ public class NettyHttpConduit extends 
URLConnectionHTTPConduit implements BusLif
         ByteBuf outBuffer;
         OutputStream outputStream;
 
+        final Lock syncLock = new ReentrantLock();
+        final Condition connected = syncLock.newCondition();
+        final Condition responded = syncLock.newCondition();
+
         protected NettyWrappedOutputStream(Message message, boolean 
possibleRetransmit,
                                            boolean isChunking, int 
chunkThreshold, String conduitName, URI url) {
             super(message, possibleRetransmit, isChunking, chunkThreshold, 
conduitName, url);
@@ -237,31 +245,36 @@ public class NettyHttpConduit extends 
URLConnectionHTTPConduit implements BusLif
         }
 
 
-        protected synchronized HttpResponse getHttpResponse() throws 
IOException {
-            while (httpResponse == null) {
-                if (exception == null) { //already have an exception, skip 
waiting
-                    try {
-                        wait(entity.getReceiveTimeout());
-                    } catch (InterruptedException e) {
-                        throw new IOException(e);
-                    }
-                }
-                if (httpResponse == null) {
-
-                    if (exception != null) {
-                        if (exception instanceof IOException) {
-                            throw (IOException)exception;
+        protected HttpResponse getHttpResponse() throws IOException {
+            syncLock.lock();
+            try {
+                while (httpResponse == null) {
+                    if (exception == null) { //already have an exception, skip 
waiting
+                        try {
+                            responded.await(entity.getReceiveTimeout(), 
TimeUnit.MILLISECONDS);
+                        } catch (InterruptedException e) {
+                            throw new IOException(e);
                         }
-                        if (exception instanceof RuntimeException) {
-                            throw (RuntimeException)exception;
+                    }
+                    if (httpResponse == null) {
+    
+                        if (exception != null) {
+                            if (exception instanceof IOException) {
+                                throw (IOException)exception;
+                            }
+                            if (exception instanceof RuntimeException) {
+                                throw (RuntimeException)exception;
+                            }
+                            throw new IOException(exception);
                         }
-                        throw new IOException(exception);
+    
+                        throw new SocketTimeoutException("Read Timeout");
                     }
-
-                    throw new SocketTimeoutException("Read Timeout");
                 }
+                return httpResponse;
+            } finally {
+                syncLock.unlock();
             }
-            return httpResponse;
         }
 
         protected HttpContent getHttpResponseContent() throws IOException {
@@ -269,33 +282,37 @@ public class NettyHttpConduit extends 
URLConnectionHTTPConduit implements BusLif
         }
 
 
-        protected synchronized Channel getChannel() throws IOException {
-            while (channel == null) {
-                if (exception == null) { //already have an exception, skip 
waiting
-                    try {
-                        // connection timeout
-                        wait(entity.getConnectionTimeout());
-                    } catch (InterruptedException e) {
-                        throw new IOException(e);
-                    }
-                }
-                if (channel == null) {
-
-                    if (exception != null) {
-                        if (exception instanceof IOException) {
-                            throw (IOException)exception;
+        protected Channel getChannel() throws IOException {
+            syncLock.lock();
+            try {
+                while (channel == null) {
+                    if (exception == null) { //already have an exception, skip 
waiting
+                        try {
+                            // connection timeout
+                            connected.await(entity.getConnectionTimeout(), 
TimeUnit.MILLISECONDS);
+                        } catch (InterruptedException e) {
+                            throw new IOException(e);
                         }
-                        if (exception instanceof RuntimeException) {
-                            throw (RuntimeException)exception;
+                    }
+                    if (channel == null) {
+    
+                        if (exception != null) {
+                            if (exception instanceof IOException) {
+                                throw (IOException)exception;
+                            }
+                            if (exception instanceof RuntimeException) {
+                                throw (RuntimeException)exception;
+                            }
+                            throw new IOException(exception);
                         }
-                        throw new IOException(exception);
+    
+                        throw new SocketTimeoutException("Connection Timeout");
                     }
-
-                    throw new SocketTimeoutException("Connection Timeout");
                 }
+                return channel;
+            } finally {
+                syncLock.unlock();
             }
-            return channel;
-
         }
 
 
@@ -655,37 +672,52 @@ public class NettyHttpConduit extends 
URLConnectionHTTPConduit implements BusLif
             //entity.getRequest().setChunked(true);
         }
 
-        protected synchronized void setHttpResponse(HttpResponse r) {
-            httpResponse = r;
-            if (isAsync) {
-                //got a response, need to start the response processing now
-                try {
-                    handleResponseOnWorkqueue(false, true);
-                    isAsync = false; // don't trigger another start on next 
block. :-)
-                } catch (Exception ex) {
-                    //ignore, we'll try again on the next consume;
+        protected void setHttpResponse(HttpResponse r) {
+            syncLock.lock();
+            try {
+                httpResponse = r;
+                if (isAsync) {
+                    //got a response, need to start the response processing now
+                    try {
+                        handleResponseOnWorkqueue(false, true);
+                        isAsync = false; // don't trigger another start on 
next block. :-)
+                    } catch (Exception ex) {
+                        //ignore, we'll try again on the next consume;
+                    }
                 }
+                responded.signalAll();
+            } finally {
+                syncLock.unlock();
             }
-            notifyAll();
-        }
-
-        protected synchronized void setException(Throwable ex) {
-            exception = ex;
-            if (isAsync) {
-                //got a response, need to start the response processing now
-                try {
-                    handleResponseOnWorkqueue(false, true);
-                    isAsync = false; // don't trigger another start on next 
block. :-)
-                } catch (Exception ex2) {
-                    ex2.printStackTrace();
+        }
+
+        protected void setException(Throwable ex) {
+            syncLock.lock();
+            try {
+                exception = ex;
+                if (isAsync) {
+                    //got a response, need to start the response processing now
+                    try {
+                        handleResponseOnWorkqueue(false, true);
+                        isAsync = false; // don't trigger another start on 
next block. :-)
+                    } catch (Exception ex2) {
+                        ex2.printStackTrace();
+                    }
                 }
+                responded.signalAll();
+            } finally {
+                syncLock.unlock();
             }
-            notifyAll();
         }
 
-        protected synchronized void setChannel(Channel ch) {
-            channel = ch;
-            notifyAll();
+        protected void setChannel(Channel ch) {
+            syncLock.lock();
+            try {
+                channel = ch;
+                connected.signalAll();
+            } finally {
+                syncLock.unlock();
+            }
         }
     }
 
diff --git 
a/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitTest.java
 
b/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitTest.java
index 7077842312..1ef3e1c515 100644
--- 
a/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitTest.java
+++ 
b/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitTest.java
@@ -22,7 +22,9 @@ package org.apache.cxf.transport.http.netty.client;
 import java.net.URL;
 import java.util.Collections;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
 
 import javax.xml.ws.AsyncHandler;
 import javax.xml.ws.Endpoint;
@@ -89,6 +91,7 @@ public class NettyHttpConduitTest extends 
AbstractBusClientServerTestBase {
                     return "Hello, finally! " + cnt;
                 }
                 public String greetMe(String me) {
+                    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
                     if (me.equals(FILL_BUFFER)) {
                         return String.join("", Collections.nCopies(16093, " 
"));
                     } else {

Reply via email to