Repository: qpid-jms
Updated Branches:
  refs/heads/master 0cf4c0fce -> 1144acaf8


QPIDJMS-191 Add some tests for connection URI path handling.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/1144acaf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/1144acaf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/1144acaf

Branch: refs/heads/master
Commit: 1144acaf8c4085762a5e65860e4124ac844330fd
Parents: 0cf4c0f
Author: Timothy Bish <[email protected]>
Authored: Tue Aug 9 14:06:37 2016 -0400
Committer: Timothy Bish <[email protected]>
Committed: Tue Aug 9 14:06:37 2016 -0400

----------------------------------------------------------------------
 .../jms/transports/netty/NettyEchoServer.java   | 57 +++++++++++++--
 .../transports/netty/NettyWsTransportTest.java  | 74 ++++++++++++++++++++
 2 files changed, 124 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1144acaf/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
index 933e872..0884fc4 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
@@ -16,8 +16,12 @@
  */
 package org.apache.qpid.jms.transports.netty;
 
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
 import java.io.IOException;
 import java.net.ServerSocket;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -33,7 +37,10 @@ import org.slf4j.LoggerFactory;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
@@ -41,9 +48,13 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpServerCodec;
-import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
@@ -67,6 +78,7 @@ public class NettyEchoServer implements AutoCloseable {
     private int serverPort;
     private final boolean needClientAuth;
     private final boolean webSocketServer;
+    private String webSocketPath = WEBSOCKET_PATH;
     private volatile SslHandler sslHandler;
 
     private final AtomicBoolean started = new AtomicBoolean();
@@ -85,6 +97,14 @@ public class NettyEchoServer implements AutoCloseable {
         this.webSocketServer = webSocketServer;
     }
 
+    public String getWebSocketPath() {
+        return webSocketPath;
+    }
+
+    public void setWebSocketPath(String webSocketPath) {
+        this.webSocketPath = webSocketPath;
+    }
+
     public void start() throws Exception {
 
         if (started.compareAndSet(false, true)) {
@@ -115,7 +135,7 @@ public class NettyEchoServer implements AutoCloseable {
                     if (webSocketServer) {
                         ch.pipeline().addLast(new HttpServerCodec());
                         ch.pipeline().addLast(new HttpObjectAggregator(65536));
-                        ch.pipeline().addLast(new 
WebSocketServerProtocolHandler(WEBSOCKET_PATH, "amqp", true));
+                        ch.pipeline().addLast(new 
WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true));
                     }
 
                     ch.pipeline().addLast(new EchoServerHandler());
@@ -197,10 +217,17 @@ public class NettyEchoServer implements AutoCloseable {
         @Override
         public void channelRead0(ChannelHandlerContext ctx, Object msg) {
             LOG.trace("Channel read: {}", msg);
-            if (webSocketServer && msg instanceof BinaryWebSocketFrame) {
-                BinaryWebSocketFrame frame = (BinaryWebSocketFrame) msg;
-                ctx.write(frame.copy());
-                return;
+            if (webSocketServer) {
+                if (msg instanceof WebSocketFrame) {
+                    WebSocketFrame frame = (WebSocketFrame) msg;
+                    ctx.write(frame.copy());
+                    return;
+                } else if (msg instanceof FullHttpRequest) {
+                    // Reject anything not on the WebSocket path
+                    FullHttpRequest request = (FullHttpRequest) msg;
+                    sendHttpResponse(ctx, request, new 
DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
+                    return;
+                }
             } else if (msg instanceof ByteBuf) {
                 ctx.write(((ByteBuf) msg).copy());
                 return;
@@ -224,7 +251,23 @@ public class NettyEchoServer implements AutoCloseable {
         }
     }
 
-    SslHandler getSslHandler() {
+    private static void sendHttpResponse(ChannelHandlerContext ctx, 
FullHttpRequest request, FullHttpResponse response) {
+        // Generate an error page if response getStatus code is not OK (200).
+        if (response.getStatus().code() != 200) {
+            ByteBuf buf = 
Unpooled.copiedBuffer(response.getStatus().toString(), StandardCharsets.UTF_8);
+            response.content().writeBytes(buf);
+            buf.release();
+            HttpHeaders.setContentLength(response, 
response.content().readableBytes());
+        }
+
+        // Send the response and close the connection if necessary.
+        ChannelFuture f = ctx.channel().writeAndFlush(response);
+        if (!HttpHeaders.isKeepAlive(request) || response.getStatus().code() 
!= 200) {
+            f.addListener(ChannelFutureListener.CLOSE);
+        }
+    }
+
+    protected SslHandler getSslHandler() {
         return sslHandler;
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1144acaf/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
index 7348775..6ed537a 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
@@ -16,17 +16,27 @@
  */
 package org.apache.qpid.jms.transports.netty;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.net.URI;
 
 import org.apache.qpid.jms.transports.Transport;
 import org.apache.qpid.jms.transports.TransportListener;
 import org.apache.qpid.jms.transports.TransportOptions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test the Netty based WebSocket Transport
  */
 public class NettyWsTransportTest extends NettyTcpTransportTest {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(NettyWsTransportTest.class);
+
     @Override
     protected NettyEchoServer createEchoServer(TransportOptions options, 
boolean needClientAuth) {
         return new NettyEchoServer(options, needClientAuth, true);
@@ -40,4 +50,68 @@ public class NettyWsTransportTest extends 
NettyTcpTransportTest {
             return new NettyWsTransport(listener, serverLocation, options);
         }
     }
+
+    @Test(timeout = 60 * 1000)
+    public void testConnectToServerUsingCorrectPath() throws Exception {
+        final String WEBSOCKET_PATH = "/testpath";
+
+        try (NettyEchoServer server = createEchoServer(createServerOptions())) 
{
+            server.setWebSocketPath(WEBSOCKET_PATH);
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port + 
WEBSOCKET_PATH);
+
+            Transport transport = createTransport(serverLocation, 
testListener, createClientOptions());
+            try {
+                transport.connect();
+                LOG.info("Connected to server:{} as expected.", 
serverLocation);
+            } catch (Exception e) {
+                fail("Should have connected to the server at " + 
serverLocation + " but got exception: " + e);
+            }
+
+            assertTrue(transport.isConnected());
+            assertEquals(serverLocation, transport.getRemoteLocation());
+
+            transport.close();
+
+            // Additional close should not fail or cause other problems.
+            transport.close();
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the 
event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testConnectToServerUsingIncorrectPath() throws Exception {
+        final String WEBSOCKET_PATH = "/testpath";
+
+        try (NettyEchoServer server = createEchoServer(createServerOptions())) 
{
+            // No configured path means it won't match the requested one.
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port + 
WEBSOCKET_PATH);
+
+            server.close();
+
+            Transport transport = createTransport(serverLocation, 
testListener, createClientOptions());
+            try {
+                transport.connect();
+                fail("Should have failed to connect to the server: " + 
serverLocation);
+            } catch (Exception e) {
+                LOG.info("Failed to connect to: {} as expected.", 
serverLocation);
+            }
+
+            assertFalse(transport.isConnected());
+
+            transport.close();
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the 
event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to