Repository: qpid-jms Updated Branches: refs/heads/master 0cf4c0fce -> 1144acaf8
QPIDJMS-191 Add some tests for connection URI path handling. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/1144acaf Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/1144acaf Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/1144acaf Branch: refs/heads/master Commit: 1144acaf8c4085762a5e65860e4124ac844330fd Parents: 0cf4c0f Author: Timothy Bish <[email protected]> Authored: Tue Aug 9 14:06:37 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue Aug 9 14:06:37 2016 -0400 ---------------------------------------------------------------------- .../jms/transports/netty/NettyEchoServer.java | 57 +++++++++++++-- .../transports/netty/NettyWsTransportTest.java | 74 ++++++++++++++++++++ 2 files changed, 124 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1144acaf/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java index 933e872..0884fc4 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java @@ -16,8 +16,12 @@ */ package org.apache.qpid.jms.transports.netty; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + import java.io.IOException; import java.net.ServerSocket; +import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -33,7 +37,10 @@ import org.slf4j.LoggerFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -41,9 +48,13 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; @@ -67,6 +78,7 @@ public class NettyEchoServer implements AutoCloseable { private int serverPort; private final boolean needClientAuth; private final boolean webSocketServer; + private String webSocketPath = WEBSOCKET_PATH; private volatile SslHandler sslHandler; private final AtomicBoolean started = new AtomicBoolean(); @@ -85,6 +97,14 @@ public class NettyEchoServer implements AutoCloseable { this.webSocketServer = webSocketServer; } + public String getWebSocketPath() { + return webSocketPath; + } + + public void setWebSocketPath(String webSocketPath) { + this.webSocketPath = webSocketPath; + } + public void start() throws Exception { if (started.compareAndSet(false, true)) { @@ -115,7 +135,7 @@ public class NettyEchoServer implements AutoCloseable { if (webSocketServer) { ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new HttpObjectAggregator(65536)); - ch.pipeline().addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, "amqp", true)); + ch.pipeline().addLast(new WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true)); } ch.pipeline().addLast(new EchoServerHandler()); @@ -197,10 +217,17 @@ public class NettyEchoServer implements AutoCloseable { @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) { LOG.trace("Channel read: {}", msg); - if (webSocketServer && msg instanceof BinaryWebSocketFrame) { - BinaryWebSocketFrame frame = (BinaryWebSocketFrame) msg; - ctx.write(frame.copy()); - return; + if (webSocketServer) { + if (msg instanceof WebSocketFrame) { + WebSocketFrame frame = (WebSocketFrame) msg; + ctx.write(frame.copy()); + return; + } else if (msg instanceof FullHttpRequest) { + // Reject anything not on the WebSocket path + FullHttpRequest request = (FullHttpRequest) msg; + sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)); + return; + } } else if (msg instanceof ByteBuf) { ctx.write(((ByteBuf) msg).copy()); return; @@ -224,7 +251,23 @@ public class NettyEchoServer implements AutoCloseable { } } - SslHandler getSslHandler() { + private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) { + // Generate an error page if response getStatus code is not OK (200). + if (response.getStatus().code() != 200) { + ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), StandardCharsets.UTF_8); + response.content().writeBytes(buf); + buf.release(); + HttpHeaders.setContentLength(response, response.content().readableBytes()); + } + + // Send the response and close the connection if necessary. + ChannelFuture f = ctx.channel().writeAndFlush(response); + if (!HttpHeaders.isKeepAlive(request) || response.getStatus().code() != 200) { + f.addListener(ChannelFutureListener.CLOSE); + } + } + + protected SslHandler getSslHandler() { return sslHandler; } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1144acaf/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java index 7348775..6ed537a 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java @@ -16,17 +16,27 @@ */ package org.apache.qpid.jms.transports.netty; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.net.URI; import org.apache.qpid.jms.transports.Transport; import org.apache.qpid.jms.transports.TransportListener; import org.apache.qpid.jms.transports.TransportOptions; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Test the Netty based WebSocket Transport */ public class NettyWsTransportTest extends NettyTcpTransportTest { + private static final Logger LOG = LoggerFactory.getLogger(NettyWsTransportTest.class); + @Override protected NettyEchoServer createEchoServer(TransportOptions options, boolean needClientAuth) { return new NettyEchoServer(options, needClientAuth, true); @@ -40,4 +50,68 @@ public class NettyWsTransportTest extends NettyTcpTransportTest { return new NettyWsTransport(listener, serverLocation, options); } } + + @Test(timeout = 60 * 1000) + public void testConnectToServerUsingCorrectPath() throws Exception { + final String WEBSOCKET_PATH = "/testpath"; + + try (NettyEchoServer server = createEchoServer(createServerOptions())) { + server.setWebSocketPath(WEBSOCKET_PATH); + server.start(); + + int port = server.getServerPort(); + URI serverLocation = new URI("tcp://localhost:" + port + WEBSOCKET_PATH); + + Transport transport = createTransport(serverLocation, testListener, createClientOptions()); + try { + transport.connect(); + LOG.info("Connected to server:{} as expected.", serverLocation); + } catch (Exception e) { + fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); + } + + assertTrue(transport.isConnected()); + assertEquals(serverLocation, transport.getRemoteLocation()); + + transport.close(); + + // Additional close should not fail or cause other problems. + transport.close(); + } + + assertTrue(!transportClosed); // Normal shutdown does not trigger the event. + assertTrue(exceptions.isEmpty()); + assertTrue(data.isEmpty()); + } + + @Test(timeout = 60 * 1000) + public void testConnectToServerUsingIncorrectPath() throws Exception { + final String WEBSOCKET_PATH = "/testpath"; + + try (NettyEchoServer server = createEchoServer(createServerOptions())) { + // No configured path means it won't match the requested one. + server.start(); + + int port = server.getServerPort(); + URI serverLocation = new URI("tcp://localhost:" + port + WEBSOCKET_PATH); + + server.close(); + + Transport transport = createTransport(serverLocation, testListener, createClientOptions()); + try { + transport.connect(); + fail("Should have failed to connect to the server: " + serverLocation); + } catch (Exception e) { + LOG.info("Failed to connect to: {} as expected.", serverLocation); + } + + assertFalse(transport.isConnected()); + + transport.close(); + } + + assertTrue(!transportClosed); // Normal shutdown does not trigger the event. + assertTrue(exceptions.isEmpty()); + assertTrue(data.isEmpty()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
