This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/main by this push: new 398a196fe8 CXF-9009: Async operations fails (netty-conduit) (#1868) 398a196fe8 is described below commit 398a196fe87cc7892586b31cfdcab25005ea43e0 Author: Andriy Redko <drr...@gmail.com> AuthorDate: Mon Jun 3 18:32:28 2024 -0400 CXF-9009: Async operations fails (netty-conduit) (#1868) --- .../client/NettyHttpClientPipelineFactory.java | 13 +- .../http/netty/client/NettyHttpConduit.java | 16 +- systests/transport-netty/pom.xml | 18 +- .../http2/netty/jaxws/JAXWSAsyncClientTest.java | 182 +++++++++++++++++++++ 4 files changed, 211 insertions(+), 18 deletions(-) diff --git a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java index be44b9c4ec..a39ff9ad51 100644 --- a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java +++ b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java @@ -69,8 +69,11 @@ import io.netty.handler.ssl.util.SimpleKeyManagerFactory; import io.netty.handler.ssl.util.SimpleTrustManagerFactory; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.util.AttributeKey; public class NettyHttpClientPipelineFactory extends ChannelInitializer<Channel> { + private static final String WHEN_READY_KEY = "WhenReady-Key"; + private static final AttributeKey<ChannelFuture> WHEN_READY = AttributeKey.valueOf(WHEN_READY_KEY); private static final Logger LOG = LogUtils.getL7dLogger(NettyHttpClientPipelineFactory.class); @@ -79,7 +82,6 @@ public class NettyHttpClientPipelineFactory extends ChannelInitializer<Channel> private final int readTimeout; private final int maxContentLength; private final boolean enableHttp2; - private ChannelPromise readyFuture; public NettyHttpClientPipelineFactory(TLSClientParameters clientParameters) { this(clientParameters, 0); @@ -115,8 +117,9 @@ public class NettyHttpClientPipelineFactory extends ChannelInitializer<Channel> } final NettyHttpClientHandler responseHandler = new NettyHttpClientHandler(); - readyFuture = ch.newPromise(); - + final ChannelPromise readyFuture = ch.newPromise(); + ch.attr(WHEN_READY).set(readyFuture); + if (enableHttp2) { final Http2Connection connection = new DefaultHttp2Connection(false); final Http2SettingsHandler settingsHandler = new Http2SettingsHandler(readyFuture); @@ -188,8 +191,8 @@ public class NettyHttpClientPipelineFactory extends ChannelInitializer<Channel> } } - public ChannelFuture whenReady() { - return readyFuture; + ChannelFuture whenReady(Channel channel) { + return channel.attr(NettyHttpClientPipelineFactory.WHEN_READY).get(); } private SslHandler configureClientSSLOnDemand(Channel channel) throws Exception { diff --git a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java index a2bf8e0a17..5173c945d7 100644 --- a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java +++ b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java @@ -458,15 +458,17 @@ public class NettyHttpConduit extends HttpClientHTTPConduit implements BusLifeCy @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { - handler.whenReady().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - ChannelFuture channelFuture = future.channel().writeAndFlush(entity); - channelFuture.addListener(writeFailureListener); + handler.whenReady(future.channel()) + .addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + ChannelFuture channelFuture = future.channel().writeAndFlush(entity); + channelFuture.addListener(writeFailureListener); + } } } - }); + ); } } }); diff --git a/systests/transport-netty/pom.xml b/systests/transport-netty/pom.xml index b16c3bb2c8..1cf48fe4d8 100644 --- a/systests/transport-netty/pom.xml +++ b/systests/transport-netty/pom.xml @@ -95,27 +95,32 @@ <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-core</artifactId> - <version>${project.version}</version> + <scope>test</scope> </dependency> <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-databinding-jaxb</artifactId> - <version>${project.version}</version> + <scope>test</scope> </dependency> <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-transports-http</artifactId> - <version>${project.version}</version> + <scope>test</scope> </dependency> <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-frontend-jaxrs</artifactId> - <version>${project.version}</version> + <scope>test</scope> </dependency> <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-rs-client</artifactId> - <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-frontend-jaxws</artifactId> + <scope>test</scope> </dependency> <dependency> <groupId>com.fasterxml.jackson.jakarta.rs</groupId> @@ -140,7 +145,7 @@ <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-transports-http-netty-client</artifactId> - <version>${project.version}</version> + <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> @@ -192,6 +197,7 @@ <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> + <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> diff --git a/systests/transport-netty/src/test/java/org/apache/cxf/systest/http2/netty/jaxws/JAXWSAsyncClientTest.java b/systests/transport-netty/src/test/java/org/apache/cxf/systest/http2/netty/jaxws/JAXWSAsyncClientTest.java new file mode 100644 index 0000000000..183f8e1f63 --- /dev/null +++ b/systests/transport-netty/src/test/java/org/apache/cxf/systest/http2/netty/jaxws/JAXWSAsyncClientTest.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.http2.netty.jaxws; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import jakarta.jws.WebService; +import jakarta.xml.ws.Response; +import jakarta.xml.ws.soap.SOAPFaultException; +import org.apache.cxf.endpoint.Client; +import org.apache.cxf.greeter_control.AbstractGreeterImpl; +import org.apache.cxf.greeter_control.Greeter; +import org.apache.cxf.greeter_control.types.GreetMeResponse; +import org.apache.cxf.jaxws.JaxWsProxyFactoryBean; +import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; +import org.apache.cxf.testutil.common.AbstractBusTestServerBase; +import org.apache.cxf.transport.http.HTTPConduit; + +import io.netty.handler.timeout.ReadTimeoutException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class JAXWSAsyncClientTest extends AbstractBusClientServerTestBase { + static final String PORT = allocatePort(Server.class); + + public static class Server extends AbstractBusTestServerBase { + + protected void run() { + GreeterImpl implementor = new GreeterImpl(); + String address = "http://localhost:" + PORT + "/SoapContext/GreeterPort"; + jakarta.xml.ws.Endpoint.publish(address, implementor); + } + + public static void main(String[] args) { + try { + Server s = new Server(); + s.start(); + } catch (Exception ex) { + ex.printStackTrace(); + System.exit(-1); + } finally { + System.out.println("done!"); + } + } + + @WebService(serviceName = "BasicGreeterService", + portName = "GreeterPort", + endpointInterface = "org.apache.cxf.greeter_control.Greeter", + targetNamespace = "http://cxf.apache.org/greeter_control", + wsdlLocation = "testutils/greeter_control.wsdl") + public class GreeterImpl extends AbstractGreeterImpl { + @Override + public String greetMe(String arg) { + if ("timeout".equalsIgnoreCase(arg)) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Do nothing + } + } + + return super.greetMe(arg); + } + } + } + + + @BeforeClass + public static void startServers() throws Exception { + assertTrue("server did not launch correctly", launchServer(Server.class, true)); + } + + @AfterClass + public static void stopServers() throws Exception { + stopAllServers(); + } + + @Test + public void testAsyncClient() throws Exception { + // setup the feature by using JAXWS front-end API + JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); + factory.setAddress("http://localhost:" + PORT + "/SoapContext/GreeterPort"); + factory.setServiceClass(Greeter.class); + Greeter proxy = factory.create(Greeter.class); + + Response<GreetMeResponse> response = proxy.greetMeAsync("cxf"); + int waitCount = 0; + while (!response.isDone() && waitCount < 15) { + Thread.sleep(1000); + waitCount++; + } + + assertTrue("Response still not received.", response.isDone()); + } + + @Test + public void testAsyncClientConcurrently() throws Exception { + final ExecutorService executor = Executors.newFixedThreadPool(3); + + // setup the feature by using JAXWS front-end API + JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); + factory.setAddress("http://localhost:" + PORT + "/SoapContext/GreeterPort"); + factory.setServiceClass(Greeter.class); + Greeter proxy = factory.create(Greeter.class); + + final Callable<Object> callable = () -> proxy.greetMeAsync("cxf", resp -> { }).get(5, TimeUnit.SECONDS); + final List<Future<Object>> futures = executor.invokeAll(List.of(callable, callable, callable)); + + for (final Future<?> response: futures) { + int waitCount = 0; + while (!response.isDone() && waitCount < 15) { + Thread.sleep(1000); + waitCount++; + } + } + + assertTrue("Response still not received.", futures.stream().allMatch(Future::isDone)); + for (final Future<?> response: futures) { + assertThat(response.get(), is(not(nullValue()))); + } + executor.shutdown(); + + assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + } + + @Test + public void testTimeout() throws Exception { + // setup the feature by using JAXWS front-end API + JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); + factory.setAddress("http://localhost:" + PORT + "/SoapContext/GreeterPort"); + factory.setServiceClass(Greeter.class); + Greeter proxy = factory.create(Greeter.class); + + HTTPConduit cond = (HTTPConduit)((Client)proxy).getConduit(); + cond.getClient().setReceiveTimeout(500); + + try { + proxy.greetMeAsync("timeout").get(); + fail("Should have faulted"); + } catch (SOAPFaultException ex) { + fail("should not be a SOAPFaultException"); + } catch (ExecutionException ex) { + //expected + assertTrue(ex.getCause().getClass().getName(), + ex.getCause() instanceof IOException + || ex.getCause() instanceof ReadTimeoutException); + } + } +}