This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/master by this push: new 4143e5e QPIDJMS-480: ensure the transport handles disconnect before/during the WebSocket handshake, and apply a timeout to the handshake itself 4143e5e is described below commit 4143e5ea268728af25ed535212df8576ffabc6c7 Author: Robbie Gemmell <rob...@apache.org> AuthorDate: Thu Nov 14 17:41:19 2019 +0000 QPIDJMS-480: ensure the transport handles disconnect before/during the WebSocket handshake, and apply a timeout to the handshake itself --- .../jms/transports/netty/NettyTcpTransport.java | 7 +- .../jms/transports/netty/NettyWsTransport.java | 26 ++++++- .../qpid/jms/integration/SslIntegrationTest.java | 25 +++++++ .../qpid/jms/integration/WsIntegrationTest.java | 85 ++++++++++++++++++++++ .../jms/transports/netty/NettyBlackHoleServer.java | 47 ++++++++++++ .../transports/netty/NettySimpleAmqpServer.java | 2 +- 6 files changed, 188 insertions(+), 4 deletions(-) diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java index 9f8e826..38d7465 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java @@ -374,10 +374,15 @@ public class NettyTcpTransport implements Transport { listener.onTransportClosed(); }); } + } else if (!closed.get()) { + if (failureCause == null) { + failureCause = new IOException("Connection failed"); + } + connectionFailed(channel, failureCause); } } - protected void handleException(Channel channel, Throwable cause) throws Exception { + protected void handleException(Channel channel, Throwable cause) { LOG.trace("Exception on channel! Channel is {}", channel); if (connected.compareAndSet(true, false) && !closed.get()) { LOG.trace("Firing onTransportError listener"); diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java index 8ce8d41..9be49ca 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java @@ -19,6 +19,7 @@ package org.apache.qpid.jms.transports.netty; import java.io.IOException; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; import org.apache.qpid.jms.transports.TransportListener; import org.apache.qpid.jms.transports.TransportOptions; @@ -45,6 +46,7 @@ import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketVersion; +import io.netty.util.concurrent.ScheduledFuture; /** * Netty based WebSockets Transport that wraps and extends the TCP Transport. @@ -53,6 +55,7 @@ public class NettyWsTransport extends NettyTcpTransport { private static final Logger LOG = LoggerFactory.getLogger(NettyWsTransport.class); private static final String AMQP_SUB_PROTOCOL = "amqp"; + private ScheduledFuture<?> handshakeTimeoutFuture; /** * Create a new transport instance @@ -65,7 +68,7 @@ public class NettyWsTransport extends NettyTcpTransport { * should the transport enable an SSL layer. */ public NettyWsTransport(URI remoteLocation, TransportOptions options, boolean secure) { - super(null, remoteLocation, options, secure); + this(null, remoteLocation, options, secure); } /** @@ -136,6 +139,16 @@ public class NettyWsTransport extends NettyTcpTransport { LOG.trace("Channel has become active, awaiting WebSocket handshake! Channel is {}", channel); } + protected void handleChannelInactive(Channel channel) throws Exception { + try { + if (handshakeTimeoutFuture != null) { + handshakeTimeoutFuture.cancel(false); + } + } finally { + super.handleChannelInactive(channel); + } + } + //----- Handle connection events -----------------------------------------// private class NettyWebSocketTransportHandler extends NettyDefaultHandler<Object> { @@ -158,6 +171,13 @@ public class NettyWsTransport extends NettyTcpTransport { public void channelActive(ChannelHandlerContext context) throws Exception { handshaker.handshake(context.channel()); + handshakeTimeoutFuture = context.executor().schedule(()-> { + LOG.trace("WebSocket handshake timed out! Channel is {}", context.channel()); + if (!handshaker.isHandshakeComplete()) { + NettyWsTransport.super.handleException(channel, new IOException("WebSocket handshake timed out")); + } + }, getTransportOptions().getConnectTimeout(), TimeUnit.MILLISECONDS); + super.channelActive(context); } @@ -170,7 +190,9 @@ public class NettyWsTransport extends NettyTcpTransport { handshaker.finishHandshake(ch, (FullHttpResponse) message); LOG.trace("WebSocket Client connected! {}", ctx.channel()); // Now trigger super processing as we are really connected. - NettyWsTransport.super.handleConnected(ch); + if(handshakeTimeoutFuture.cancel(false)) { + NettyWsTransport.super.handleConnected(ch); + } return; } diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SslIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SslIntegrationTest.java index fcae27d..e1668a7 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SslIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SslIntegrationTest.java @@ -46,6 +46,7 @@ import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.apache.qpid.jms.transports.TransportOptions; import org.apache.qpid.jms.transports.TransportSupport; +import org.apache.qpid.jms.transports.netty.NettySimpleAmqpServer; import org.junit.Test; import io.netty.handler.ssl.OpenSsl; @@ -495,6 +496,30 @@ public class SslIntegrationTest extends QpidJmsTestCase { doConfigureStoresWithSslSystemPropertiesTestImpl(CLIENT_DN, true); } + @Test(timeout = 30000) + public void testNonSslConnectionFailsToSslServer() throws Exception { + TransportOptions serverOptions = new TransportOptions(); + serverOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE); + serverOptions.setKeyStorePassword(PASSWORD); + serverOptions.setVerifyHost(false); + + try (NettySimpleAmqpServer server = new NettySimpleAmqpServer(serverOptions, true)) { + server.start(); + + JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + server.getServerPort() + "?jms.connectTimeout=25"); + + try { + factory.createConnection(); + fail("should not have connected"); + } + catch (JMSException jmse) { + String message = jmse.getMessage(); + assertNotNull(message); + assertTrue("Unexpected message: " + message, message.contains("Timed out while waiting to connect")); + } + } + } + private void setSslSystemPropertiesForCurrentTest(String keystore, String keystorePassword, String truststore, String truststorePassword) { setTestSystemProperty(JAVAX_NET_SSL_KEY_STORE, keystore); setTestSystemProperty(JAVAX_NET_SSL_KEY_STORE_PASSWORD, keystorePassword); diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/WsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/WsIntegrationTest.java new file mode 100644 index 0000000..6d5e045 --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/WsIntegrationTest.java @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jms.integration; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.test.QpidJmsTestCase; +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.transports.netty.NettyBlackHoleServer; +import org.apache.qpid.jms.transports.netty.NettySimpleAmqpServer; +import org.junit.Test; + +public class WsIntegrationTest extends QpidJmsTestCase { + + private static final String BROKER_JKS_KEYSTORE = "src/test/resources/broker-jks.keystore"; + private static final String PASSWORD = "password"; + + @Test(timeout = 30000) + public void testNonSslWebSocketConnectionFailsToSslServer() throws Exception { + TransportOptions serverOptions = new TransportOptions(); + serverOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE); + serverOptions.setKeyStorePassword(PASSWORD); + serverOptions.setVerifyHost(false); + + try (NettySimpleAmqpServer server = new NettySimpleAmqpServer(serverOptions, true, false, true)) { + server.start(); + + JmsConnectionFactory factory = new JmsConnectionFactory("amqpws://localhost:" + server.getServerPort()); + + try { + factory.createConnection(); + fail("should not have connected"); + } + catch (JMSException jmse) { + String message = jmse.getMessage(); + assertNotNull(message); + assertTrue("Unexpected message: " + message, message.contains("Connection failed")); + } + } + } + + @Test(timeout = 30000) + public void testWebsocketConnectionToBlackHoleServerTimesOut() throws Exception { + TransportOptions serverOptions = new TransportOptions(); + + try (NettyBlackHoleServer server = new NettyBlackHoleServer(serverOptions, false)) { + server.start(); + + JmsConnectionFactory factory = new JmsConnectionFactory("amqpws://localhost:" + server.getServerPort() + "?transport.connectTimeout=25"); + + try { + factory.createConnection(); + fail("should not have connected"); + } + catch (JMSException jmse) { + String message = jmse.getMessage(); + assertNotNull(message); + assertTrue("Unexpected message: " + message, message.contains("WebSocket handshake timed out")); + } + } + } +} diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyBlackHoleServer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyBlackHoleServer.java new file mode 100644 index 0000000..78397c9 --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyBlackHoleServer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import org.apache.qpid.jms.transports.TransportOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +public class NettyBlackHoleServer extends NettyServer { + + private static final Logger LOG = LoggerFactory.getLogger(NettyBlackHoleServer.class); + + public NettyBlackHoleServer(TransportOptions options, boolean secure) { + super(options, secure); + } + + @Override + protected ChannelHandler getServerHandler() { + return new BlackHoleInboundHandler(); + } + + private class BlackHoleInboundHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + LOG.trace("BlackHoleInboundHandler: Channel read, dropping: {}", msg); + } + } +} diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java index e5054d0..0ab16bc 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java @@ -87,7 +87,7 @@ public class NettySimpleAmqpServer extends NettyServer { private ConnectionIntercepter connectionIntercepter; public NettySimpleAmqpServer(TransportOptions options, boolean secure) { - this(options, false, false); + this(options, secure, false); } public NettySimpleAmqpServer(TransportOptions options, boolean secure, boolean needClientAuth) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org