This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 3.3.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 504b31611e70ca26b968cf9d96d1dc3d7c920aab Author: Andriy Redko <[email protected]> AuthorDate: Sun May 17 12:41:00 2020 -0400 CXF-8282: Set read timeout using netty client (#673) (cherry picked from commit f1f312b6dcd3770a52d1dd42e260a4ac26c779b7) --- .../http/netty/client/CxfResponseCallBack.java | 2 +- .../http/netty/client/NettyHttpClientHandler.java | 12 +- .../client/NettyHttpClientPipelineFactory.java | 14 +- .../http/netty/client/NettyHttpConduit.java | 10 +- .../http/netty/client/NettyHttpConduitTest.java | 265 +++++++++++++++++++++ 5 files changed, 295 insertions(+), 8 deletions(-) diff --git a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/CxfResponseCallBack.java b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/CxfResponseCallBack.java index baa4f62..1a0bc09 100644 --- a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/CxfResponseCallBack.java +++ b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/CxfResponseCallBack.java @@ -22,6 +22,6 @@ package org.apache.cxf.transport.http.netty.client; import io.netty.handler.codec.http.HttpResponse; public interface CxfResponseCallBack { - void responseReceived(HttpResponse response); + void error(Throwable ex); } diff --git a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java index 160d99e..dbcdc16 100644 --- a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java +++ b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java @@ -19,6 +19,7 @@ package org.apache.cxf.transport.http.netty.client; +import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; @@ -26,6 +27,7 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.timeout.ReadTimeoutException; public class NettyHttpClientHandler extends ChannelDuplexHandler { private final BlockingQueue<NettyHttpClientRequest> sendedQueue = @@ -49,7 +51,6 @@ public class NettyHttpClientHandler extends ChannelDuplexHandler { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - // need to deal with the request if (msg instanceof NettyHttpClientRequest) { NettyHttpClientRequest request = (NettyHttpClientRequest)msg; @@ -61,9 +62,12 @@ public class NettyHttpClientHandler extends ChannelDuplexHandler { } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception { - //TODO need to handle the exception here + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if (cause instanceof ReadTimeoutException) { + final NettyHttpClientRequest request = sendedQueue.poll(); + request.getCxfResponseCallback().error(new IOException(cause)); + } + cause.printStackTrace(); ctx.close(); } diff --git a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java index bc52ac7..2196016 100644 --- a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java +++ b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java @@ -19,6 +19,7 @@ package org.apache.cxf.transport.http.netty.client; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -36,6 +37,7 @@ import io.netty.handler.codec.http.HttpRequestEncoder; import io.netty.handler.codec.http.HttpResponseDecoder; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.handler.timeout.ReadTimeoutHandler; public class NettyHttpClientPipelineFactory extends ChannelInitializer<Channel> { @@ -43,9 +45,15 @@ public class NettyHttpClientPipelineFactory extends ChannelInitializer<Channel> private static final Logger LOG = LogUtils.getL7dLogger(NettyHttpClientPipelineFactory.class); private final TLSClientParameters tlsClientParameters; - + private final int readTimeout; + public NettyHttpClientPipelineFactory(TLSClientParameters clientParameters) { + this(clientParameters, 0); + } + + public NettyHttpClientPipelineFactory(TLSClientParameters clientParameters, int readTimeout) { this.tlsClientParameters = clientParameters; + this.readTimeout = readTimeout; } @Override @@ -66,9 +74,13 @@ public class NettyHttpClientPipelineFactory extends ChannelInitializer<Channel> pipeline.addLast("aggregator", new HttpObjectAggregator(1048576)); pipeline.addLast("encoder", new HttpRequestEncoder()); pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); + if (readTimeout > 0) { + pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS)); + } pipeline.addLast("client", new NettyHttpClientHandler()); } + private SslHandler configureClientSSLOnDemand() throws Exception { if (tlsClientParameters != null) { SSLEngine sslEngine = SSLUtils.createClientSSLEngine(tlsClientParameters); diff --git a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java index cef980e..1e11c36 100644 --- a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java +++ b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java @@ -337,10 +337,11 @@ public class NettyHttpConduit extends URLConnectionHTTPConduit implements BusLif protected void connect(boolean output) { if ("https".equals(url.getScheme())) { TLSClientParameters clientParameters = findTLSClientParameters(); - bootstrap.handler(new NettyHttpClientPipelineFactory(clientParameters)); + bootstrap.handler(new NettyHttpClientPipelineFactory(clientParameters, entity.getReceiveTimeout())); } else { - bootstrap.handler(new NettyHttpClientPipelineFactory(null)); + bootstrap.handler(new NettyHttpClientPipelineFactory(null, entity.getReceiveTimeout())); } + ChannelFuture connFuture = bootstrap.connect(new InetSocketAddress(url.getHost(), url.getPort() != -1 ? url.getPort() : "http".equals(url.getScheme()) ? 80 : 443)); @@ -381,6 +382,11 @@ public class NettyHttpConduit extends URLConnectionHTTPConduit implements BusLif public void responseReceived(HttpResponse response) { setHttpResponse(response); } + + @Override + public void error(Throwable ex) { + setException(ex); + } }; entity.setCxfResponseCallback(callBack); diff --git a/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitTest.java b/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitTest.java new file mode 100644 index 0000000..b270445 --- /dev/null +++ b/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitTest.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.http.netty.client; + +import java.net.URL; +import java.util.Collections; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.xml.ws.AsyncHandler; +import javax.xml.ws.Endpoint; +import javax.xml.ws.Response; + +import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; +import org.apache.cxf.continuations.Continuation; +import org.apache.cxf.continuations.ContinuationProvider; +import org.apache.cxf.frontend.ClientProxy; +import org.apache.cxf.jaxws.JaxWsProxyFactoryBean; +import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; +import org.apache.cxf.transport.http.HTTPConduit; +import org.apache.hello_world_soap_http.Greeter; +import org.apache.hello_world_soap_http.SOAPService; +import org.apache.hello_world_soap_http.types.GreetMeLaterResponse; +import org.apache.hello_world_soap_http.types.GreetMeResponse; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +public class NettyHttpConduitTest extends AbstractBusClientServerTestBase { + public static final String PORT = allocatePort(NettyHttpConduitTest.class); + public static final String PORT_INV = allocatePort(NettyHttpConduitTest.class, 2); + public static final String FILL_BUFFER = "FillBuffer"; + + private Endpoint ep; + private String request; + private Greeter g; + + @Before + public void start() throws Exception { + Bus b = createStaticBus(); + b.setProperty(NettyHttpConduit.USE_ASYNC, NettyHttpConduitFactory.UseAsyncPolicy.ALWAYS); + + BusFactory.setThreadDefaultBus(b); + + ep = Endpoint.publish("http://localhost:" + PORT + "/SoapContext/SoapPort", + new org.apache.hello_world_soap_http.GreeterImpl() { + public String greetMeLater(long cnt) { + //use the continuations so the async client can + //have a ton of connections, use less threads + // + //mimic a slow server by delaying somewhere between + //1 and 2 seconds, with a preference of delaying the earlier + //requests longer to create a sort of backlog/contention + //with the later requests + ContinuationProvider p = (ContinuationProvider) + getContext().getMessageContext().get(ContinuationProvider.class.getName()); + Continuation c = p.getContinuation(); + if (c.isNew()) { + if (cnt < 0) { + c.suspend(-cnt); + } else { + c.suspend(2000 - (cnt % 1000)); + } + return null; + } + return "Hello, finally! " + cnt; + } + public String greetMe(String me) { + if (me.equals(FILL_BUFFER)) { + return String.join("", Collections.nCopies(16093, " ")); + } else { + return "Hello " + me; + } + } + }); + + StringBuilder builder = new StringBuilder("NaNaNa"); + for (int x = 0; x < 50; x++) { + builder.append(" NaNaNa "); + } + request = builder.toString(); + + URL wsdl = NettyHttpConduitTest.class.getResource("/wsdl/hello_world_services.wsdl"); + assertNotNull("WSDL is null", wsdl); + + SOAPService service = new SOAPService(); + assertNotNull("Service is null", service); + + g = service.getSoapPort(); + assertNotNull("Port is null", g); + } + + @After + public void stop() throws Exception { + ((java.io.Closeable)g).close(); + ep.stop(); + ep = null; + } + + @Test + public void testResponseSameBufferSize() throws Exception { + updateAddressPort(g, PORT); + HTTPConduit c = (HTTPConduit)ClientProxy.getClient(g).getConduit(); + c.getClient().setReceiveTimeout(12000); + try { + g.greetMe(FILL_BUFFER); + g.greetMe("Hello"); + } catch (Exception ex) { + fail(); + } + } + + @Test + public void testTimeout() throws Exception { + updateAddressPort(g, PORT); + HTTPConduit c = (HTTPConduit)ClientProxy.getClient(g).getConduit(); + c.getClient().setReceiveTimeout(3000); + try { + assertEquals("Hello " + request, g.greetMeLater(-5000)); + fail(); + } catch (Exception ex) { + //expected!!! + } + } + + + @Test + public void testTimeoutWithPropertySetting() throws Exception { + ((javax.xml.ws.BindingProvider)g).getRequestContext().put("javax.xml.ws.client.receiveTimeout", + "3000"); + updateAddressPort(g, PORT); + + try { + assertEquals("Hello " + request, g.greetMeLater(-5000)); + fail(); + } catch (Exception ex) { + //expected!!! + } + } + + @Test + public void testTimeoutAsync() throws Exception { + updateAddressPort(g, PORT); + HTTPConduit c = (HTTPConduit)ClientProxy.getClient(g).getConduit(); + c.getClient().setReceiveTimeout(3000); + c.getClient().setAsyncExecuteTimeout(3000); + try { + Response<GreetMeLaterResponse> future = g.greetMeLaterAsync(-5000L); + future.get(); + fail(); + } catch (Exception ex) { + //expected!!! + } + } + + @Test + public void testTimeoutAsyncWithPropertySetting() throws Exception { + updateAddressPort(g, PORT); + ((javax.xml.ws.BindingProvider)g).getRequestContext().put("javax.xml.ws.client.receiveTimeout", + "3000"); + try { + Response<GreetMeLaterResponse> future = g.greetMeLaterAsync(-5000L); + future.get(); + fail(); + } catch (Exception ex) { + //expected!!! + } + } + + @Test + public void testConnectIssue() throws Exception { + updateAddressPort(g, PORT_INV); + try { + g.greetMe(request); + fail("should have connect exception"); + } catch (Exception ex) { + //expected + } + } + + @Test + public void testInovationWithNettyAddress() throws Exception { + String address = "netty://http://localhost:" + PORT + "/SoapContext/SoapPort"; + JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); + factory.setServiceClass(Greeter.class); + factory.setAddress(address); + Greeter greeter = factory.create(Greeter.class); + String response = greeter.greetMe("test"); + assertEquals("Get a wrong response", "Hello test", response); + } + + @Test + public void testInvocationWithTransportId() throws Exception { + String address = "http://localhost:" + PORT + "/SoapContext/SoapPort"; + JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); + factory.setServiceClass(Greeter.class); + factory.setAddress(address); + factory.setTransportId("http://cxf.apache.org/transports/http/netty/client"); + Greeter greeter = factory.create(Greeter.class); + String response = greeter.greetMe("test"); + assertEquals("Get a wrong response", "Hello test", response); + } + + @Test + public void testCallAsync() throws Exception { + updateAddressPort(g, PORT); + GreetMeResponse resp = (GreetMeResponse)g.greetMeAsync(request, new AsyncHandler<GreetMeResponse>() { + public void handleResponse(Response<GreetMeResponse> res) { + try { + res.get().getResponseType(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + } + }).get(); + assertEquals("Hello " + request, resp.getResponseType()); + + g.greetMeLaterAsync(1000, new AsyncHandler<GreetMeLaterResponse>() { + public void handleResponse(Response<GreetMeLaterResponse> res) { + } + }).get(); + } + + @Test + public void testCallAsyncCallbackInvokedOnlyOnce() throws Exception { + updateAddressPort(g, PORT_INV); + int repeat = 20; + final AtomicInteger count = new AtomicInteger(0); + for (int i = 0; i < repeat; i++) { + try { + g.greetMeAsync(request, new AsyncHandler<GreetMeResponse>() { + public void handleResponse(Response<GreetMeResponse> res) { + count.incrementAndGet(); + } + }).get(); + } catch (Exception e) { + } + } + Thread.sleep(1000); + assertEquals("Callback should be invoked only once per request", repeat, count.intValue()); + } +}
