This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 3.6.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 362d16cc0d71445521e319126845a0943cff7447 Author: Andriy Redko <drr...@gmail.com> AuthorDate: Wed Apr 19 07:59:29 2023 -0400 CXF-8830: Fix org.apache.cxf.transport.http.netty.client.NettyHttpConduitTest.testCallAsyncCallbackInvokedOnlyOnce (#1214) (cherry picked from commit 9cb9eb0cfc3c6628793129c7660e0e6de557c963) --- .../hc5/CXFHttpAsyncResponseConsumer.java | 2 + .../http/netty/client/NettyHttpClientHandler.java | 2 +- .../http/netty/client/NettyHttpConduit.java | 166 ++++++++++++--------- .../http/netty/client/NettyHttpConduitTest.java | 3 + 4 files changed, 105 insertions(+), 68 deletions(-) diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFHttpAsyncResponseConsumer.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFHttpAsyncResponseConsumer.java index d8851635ba..5808de4870 100644 --- a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFHttpAsyncResponseConsumer.java +++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFHttpAsyncResponseConsumer.java @@ -82,6 +82,8 @@ public class CXFHttpAsyncResponseConsumer implements AsyncResponseConsumer<Boole buf.consumeContent(src, completed); break; } + + Thread.onSpinWait(); } } diff --git a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java index 1a357b42fa..e92117b3c5 100644 --- a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java +++ b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java @@ -62,7 +62,7 @@ public class NettyHttpClientHandler extends ChannelDuplexHandler { if (msg instanceof NettyHttpClientRequest) { NettyHttpClientRequest request = (NettyHttpClientRequest)msg; sendedQueue.put(request); - ctx.writeAndFlush(request.getRequest()); + ctx.writeAndFlush(request.getRequest(), promise); } else { super.write(ctx, msg, promise); } diff --git a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java index cd358d4314..de94af8ddf 100644 --- a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java +++ b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java @@ -37,6 +37,10 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import javax.net.ssl.HostnameVerifier; @@ -222,6 +226,10 @@ public class NettyHttpConduit extends URLConnectionHTTPConduit implements BusLif ByteBuf outBuffer; OutputStream outputStream; + final Lock syncLock = new ReentrantLock(); + final Condition connected = syncLock.newCondition(); + final Condition responded = syncLock.newCondition(); + protected NettyWrappedOutputStream(Message message, boolean possibleRetransmit, boolean isChunking, int chunkThreshold, String conduitName, URI url) { super(message, possibleRetransmit, isChunking, chunkThreshold, conduitName, url); @@ -237,31 +245,36 @@ public class NettyHttpConduit extends URLConnectionHTTPConduit implements BusLif } - protected synchronized HttpResponse getHttpResponse() throws IOException { - while (httpResponse == null) { - if (exception == null) { //already have an exception, skip waiting - try { - wait(entity.getReceiveTimeout()); - } catch (InterruptedException e) { - throw new IOException(e); - } - } - if (httpResponse == null) { - - if (exception != null) { - if (exception instanceof IOException) { - throw (IOException)exception; + protected HttpResponse getHttpResponse() throws IOException { + syncLock.lock(); + try { + while (httpResponse == null) { + if (exception == null) { //already have an exception, skip waiting + try { + responded.await(entity.getReceiveTimeout(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new IOException(e); } - if (exception instanceof RuntimeException) { - throw (RuntimeException)exception; + } + if (httpResponse == null) { + + if (exception != null) { + if (exception instanceof IOException) { + throw (IOException)exception; + } + if (exception instanceof RuntimeException) { + throw (RuntimeException)exception; + } + throw new IOException(exception); } - throw new IOException(exception); + + throw new SocketTimeoutException("Read Timeout"); } - - throw new SocketTimeoutException("Read Timeout"); } + return httpResponse; + } finally { + syncLock.unlock(); } - return httpResponse; } protected HttpContent getHttpResponseContent() throws IOException { @@ -269,33 +282,37 @@ public class NettyHttpConduit extends URLConnectionHTTPConduit implements BusLif } - protected synchronized Channel getChannel() throws IOException { - while (channel == null) { - if (exception == null) { //already have an exception, skip waiting - try { - // connection timeout - wait(entity.getConnectionTimeout()); - } catch (InterruptedException e) { - throw new IOException(e); - } - } - if (channel == null) { - - if (exception != null) { - if (exception instanceof IOException) { - throw (IOException)exception; + protected Channel getChannel() throws IOException { + syncLock.lock(); + try { + while (channel == null) { + if (exception == null) { //already have an exception, skip waiting + try { + // connection timeout + connected.await(entity.getConnectionTimeout(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new IOException(e); } - if (exception instanceof RuntimeException) { - throw (RuntimeException)exception; + } + if (channel == null) { + + if (exception != null) { + if (exception instanceof IOException) { + throw (IOException)exception; + } + if (exception instanceof RuntimeException) { + throw (RuntimeException)exception; + } + throw new IOException(exception); } - throw new IOException(exception); + + throw new SocketTimeoutException("Connection Timeout"); } - - throw new SocketTimeoutException("Connection Timeout"); } + return channel; + } finally { + syncLock.unlock(); } - return channel; - } @@ -655,37 +672,52 @@ public class NettyHttpConduit extends URLConnectionHTTPConduit implements BusLif //entity.getRequest().setChunked(true); } - protected synchronized void setHttpResponse(HttpResponse r) { - httpResponse = r; - if (isAsync) { - //got a response, need to start the response processing now - try { - handleResponseOnWorkqueue(false, true); - isAsync = false; // don't trigger another start on next block. :-) - } catch (Exception ex) { - //ignore, we'll try again on the next consume; + protected void setHttpResponse(HttpResponse r) { + syncLock.lock(); + try { + httpResponse = r; + if (isAsync) { + //got a response, need to start the response processing now + try { + handleResponseOnWorkqueue(false, true); + isAsync = false; // don't trigger another start on next block. :-) + } catch (Exception ex) { + //ignore, we'll try again on the next consume; + } } + responded.signalAll(); + } finally { + syncLock.unlock(); } - notifyAll(); - } - - protected synchronized void setException(Throwable ex) { - exception = ex; - if (isAsync) { - //got a response, need to start the response processing now - try { - handleResponseOnWorkqueue(false, true); - isAsync = false; // don't trigger another start on next block. :-) - } catch (Exception ex2) { - ex2.printStackTrace(); + } + + protected void setException(Throwable ex) { + syncLock.lock(); + try { + exception = ex; + if (isAsync) { + //got a response, need to start the response processing now + try { + handleResponseOnWorkqueue(false, true); + isAsync = false; // don't trigger another start on next block. :-) + } catch (Exception ex2) { + ex2.printStackTrace(); + } } + responded.signalAll(); + } finally { + syncLock.unlock(); } - notifyAll(); } - protected synchronized void setChannel(Channel ch) { - channel = ch; - notifyAll(); + protected void setChannel(Channel ch) { + syncLock.lock(); + try { + channel = ch; + connected.signalAll(); + } finally { + syncLock.unlock(); + } } } diff --git a/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitTest.java b/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitTest.java index 7077842312..1ef3e1c515 100644 --- a/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitTest.java +++ b/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitTest.java @@ -22,7 +22,9 @@ package org.apache.cxf.transport.http.netty.client; import java.net.URL; import java.util.Collections; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; import javax.xml.ws.AsyncHandler; import javax.xml.ws.Endpoint; @@ -89,6 +91,7 @@ public class NettyHttpConduitTest extends AbstractBusClientServerTestBase { return "Hello, finally! " + cnt; } public String greetMe(String me) { + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L)); if (me.equals(FILL_BUFFER)) { return String.join("", Collections.nCopies(16093, " ")); } else {