This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/master by this push:
     new d06bff9  QPIDJMS-441: add support for supplying proxy handlers to the 
transports
d06bff9 is described below

commit d06bff94a089bc1fa36bfbe7e9719777b0a1d61e
Author: Stephan Siano <stephan.si...@sap.com>
AuthorDate: Wed Oct 9 10:28:02 2019 +0200

    QPIDJMS-441: add support for supplying proxy handlers to the transports
    
    Changes from Stephan Siano, Robbie Gemmell and Timothy Bish. This closes 
#32.
---
 pom.xml                                            |   5 +
 qpid-jms-client/pom.xml                            |   5 +
 .../apache/qpid/jms/JmsConnectionExtensions.java   |  22 +-
 .../qpid/jms/provider/amqp/AmqpProvider.java       |   9 +
 .../qpid/jms/transports/TransportOptions.java      |  12 +
 .../jms/transports/netty/NettyTcpTransport.java    |  16 +
 .../jms/transports/netty/NettyWsTransport.java     |  15 +-
 .../qpid/jms/integration/ProxyIntegrationTest.java | 410 +++++++++++++++++++++
 .../org/apache/qpid/jms/test/proxy/TestProxy.java  | 389 +++++++++++++++++++
 .../qpid/jms/transports/TransportOptionsTest.java  |  11 +-
 .../qpid/jms/transports/netty/NettyServer.java     |  10 +-
 .../transports/netty/NettySimpleAmqpServer.java    |   9 +-
 .../transports/netty/NettySslTransportTest.java    |  66 +++-
 .../transports/netty/NettyTcpToMockServerTest.java | 101 +++--
 .../transports/netty/NettyTcpTransportTest.java    |  58 +++
 .../jms/transports/netty/NettyWsTransportTest.java |  51 +++
 .../transports/netty/NettyWssTransportTest.java    |  12 +
 17 files changed, 1132 insertions(+), 69 deletions(-)

diff --git a/pom.xml b/pom.xml
index 0745357..6f73d16 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,6 +145,11 @@
       </dependency>
       <dependency>
         <groupId>io.netty</groupId>
+        <artifactId>netty-handler-proxy</artifactId>
+        <version>${netty-version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.netty</groupId>
         <artifactId>netty-transport</artifactId>
         <version>${netty-version}</version>
       </dependency>
diff --git a/qpid-jms-client/pom.xml b/qpid-jms-client/pom.xml
index b879020..fe3c2b5 100644
--- a/qpid-jms-client/pom.xml
+++ b/qpid-jms-client/pom.xml
@@ -88,6 +88,11 @@
       <artifactId>opentracing-util</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-handler-proxy</artifactId>
+      <scope>provided</scope>
+    </dependency>
 
     <!-- =================================== -->
     <!-- Testing Dependencies                -->
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionExtensions.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionExtensions.java
index 46a592f..5e1b03d 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionExtensions.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionExtensions.java
@@ -19,6 +19,7 @@ package org.apache.qpid.jms;
 import java.net.URI;
 import java.util.Map;
 import java.util.function.BiFunction;
+import java.util.function.Supplier;
 
 import javax.jms.Connection;
 import javax.net.ssl.SSLContext;
@@ -103,7 +104,26 @@ public enum JmsConnectionExtensions {
      *   </li>
      * </ul>
      */
-    HTTP_HEADERS_OVERRIDE("httpHeaders");
+    HTTP_HEADERS_OVERRIDE("httpHeaders"),
+
+    /**
+     * Allows a user to inject a custom proxy handler supplier used when 
creating a transport
+     * for the connection.
+     * <p>
+     * For example, for Netty based transports if a supplier was returned it 
would provide
+     * one of Nettys proxy handlers such as HttpProxyHandler, 
Socks4ProxyHandler, or
+     * Socks5ProxyHandler created with appropriate login configuration etc.
+     * <p>
+     * If the function returns a {@link Supplier}, it must supply a proxy 
handler when requested.
+     * <p>
+     * The extension function takes the form of a BiFunction defined as the 
following:
+     * <ul>
+     *   <li>
+     *     {@link BiFunction}&lt;{@link Connection}, {@link URI}, {@link 
Supplier}&gt;
+     *   </li>
+     * </ul>
+     */
+    PROXY_HANDLER_SUPPLIER("proxyHandlerSupplier");
 
     private final String extensionKey;
 
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 73a5789..718019b 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 
 import javax.net.ssl.SSLContext;
 
@@ -204,6 +205,14 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
             sslContextOverride = null;
         }
 
+        if 
(connectionInfo.getExtensionMap().containsKey(JmsConnectionExtensions.PROXY_HANDLER_SUPPLIER))
 {
+            Supplier<?> proxyHandlerSupplier = (Supplier<?>) 
connectionInfo.getExtensionMap().get(
+                    
JmsConnectionExtensions.PROXY_HANDLER_SUPPLIER).apply(connectionInfo.getConnection(),
 transport.getRemoteLocation());
+            if (proxyHandlerSupplier != null) {
+                
transport.getTransportOptions().setProxyHandlerSupplier(proxyHandlerSupplier);
+            }
+        }
+
         if 
(connectionInfo.getExtensionMap().containsKey(JmsConnectionExtensions.HTTP_HEADERS_OVERRIDE))
 {
             @SuppressWarnings({ "unchecked" })
             Map<String, String> headers = (Map<String, String>)
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
index 2435151..ba62d04 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 
 import javax.net.ssl.SSLContext;
 
@@ -90,6 +91,7 @@ public class TransportOptions implements Cloneable {
     private String keyAlias;
     private int defaultSslPort = DEFAULT_SSL_PORT;
     private SSLContext sslContextOverride;
+    private Supplier<?> proxyHandlerSupplier;
 
     private final Map<String, String> httpHeaders = new HashMap<>();
 
@@ -527,6 +529,15 @@ public class TransportOptions implements Cloneable {
         return sslContextOverride;
     }
 
+    @SuppressWarnings("unchecked")
+    public <T> Supplier<T> getProxyHandlerSupplier() {
+        return (Supplier<T>) proxyHandlerSupplier;
+    }
+
+    public <T> void setProxyHandlerSupplier(Supplier<T> proxyHandlerFactory) {
+        this.proxyHandlerSupplier = proxyHandlerFactory;
+    }
+
     // TODO - Expose headers ( ? getWSHeaders : getAuthHeaders ...
     public Map<String, String> getHttpHeaders() {
         return httpHeaders;
@@ -575,6 +586,7 @@ public class TransportOptions implements Cloneable {
         copy.setContextProtocol(getContextProtocol());
         copy.setDefaultSslPort(getDefaultSslPort());
         copy.setSslContextOverride(getSslContextOverride());
+        copy.setProxyHandlerSupplier(getProxyHandlerSupplier());
         copy.setUseOpenSSL(isUseOpenSSL());
         copy.setLocalAddress(getLocalAddress());
         copy.setLocalPort(getLocalPort());
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
index 5ddeea4..9f8e826 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
@@ -19,11 +19,13 @@ package org.apache.qpid.jms.transports.netty;
 import java.io.IOException;
 import java.net.URI;
 import java.security.Principal;
+import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 import javax.net.ssl.SSLContext;
 
@@ -51,7 +53,9 @@ import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.proxy.ProxyHandler;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.resolver.NoopAddressResolverGroup;
 import io.netty.util.ReferenceCountUtil;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
@@ -464,9 +468,21 @@ public class NettyTcpTransport implements Transport {
                 bootstrap.localAddress(options.getLocalPort());
             }
         }
+        if (options.getProxyHandlerSupplier() != null) {
+            // in case we have a proxy we do not want to resolve the address 
by ourselves but leave this to the proxy
+            bootstrap.resolver(NoopAddressResolverGroup.INSTANCE);
+        }
     }
 
     private void configureChannel(final Channel channel) throws Exception {
+        if (options.getProxyHandlerSupplier() != null) {
+            Supplier<ProxyHandler> proxyHandlerSupplier = 
options.getProxyHandlerSupplier();
+
+            ProxyHandler proxyHandler = proxyHandlerSupplier.get();
+            Objects.requireNonNull(proxyHandler, "No proxy handler was 
returned by the supplier");
+
+            channel.pipeline().addFirst(proxyHandler);
+        }
         if (isSecure()) {
             final SslHandler sslHandler;
             try {
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java
index 52d4013..8ce8d41 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java
@@ -32,8 +32,9 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.http.DefaultHttpHeaders;
 import io.netty.handler.codec.http.FullHttpResponse;
-import io.netty.handler.codec.http.HttpClientCodec;
 import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequestEncoder;
+import io.netty.handler.codec.http.HttpResponseDecoder;
 import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
@@ -116,7 +117,17 @@ public class NettyWsTransport extends NettyTcpTransport {
 
     @Override
     protected void addAdditionalHandlers(ChannelPipeline pipeline) {
-        pipeline.addLast(new HttpClientCodec());
+        /*
+         * If we use a HttpClientCodec here instead of the HttpRequestEncoder 
and the HttpResponseDecoder
+         * and there is a HttpProxyHandler in the pipeline, that ProxyHandler 
will add another HttpClientCodec
+         * for communication with the proxy. When the 
WebSocketClientHandshaker tries to exchange the codecs in
+         * the pipeline, it will mix up the two HttpRequestEncoders in the 
pipeline and exchange the wrong one.
+         * HttpReqestEncoder and HttpResponseDecoder has precedence over the 
HttpClientCodec, so the
+         * WebSocketClientHandshaker will remove these handlers inserted here 
and will leave the HttpClientCodec
+         * added by the HttpProxyHandler alone.
+         */
+        pipeline.addLast(new HttpResponseDecoder());
+        pipeline.addLast(new HttpRequestEncoder());
         pipeline.addLast(new HttpObjectAggregator(8192));
     }
 
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProxyIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProxyIntegrationTest.java
new file mode 100644
index 0000000..96fd7b7
--- /dev/null
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProxyIntegrationTest.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocket;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionExtensions;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.proxy.TestProxy;
+import org.apache.qpid.jms.test.proxy.TestProxy.ProxyType;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.TransportSupport;
+import org.apache.qpid.jms.transports.netty.NettySimpleAmqpServer;
+import org.apache.qpid.jms.util.QpidJMSTestRunner;
+import org.apache.qpid.jms.util.Repeat;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.handler.proxy.HttpProxyHandler;
+import io.netty.handler.proxy.ProxyHandler;
+import io.netty.handler.proxy.Socks5ProxyHandler;
+
+@RunWith(QpidJMSTestRunner.class)
+public class ProxyIntegrationTest extends QpidJmsTestCase {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ProxyIntegrationTest.class);
+    private static final String BROKER_JKS_KEYSTORE = 
"src/test/resources/broker-jks.keystore";
+    private static final String CLIENT_JKS_TRUSTSTORE = 
"src/test/resources/client-jks.truststore";
+    private static final String PASSWORD = "password";
+
+    @Test(timeout = 20000)
+    public void testCreateConnectionViaSocksProxy() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();
+             TestProxy testProxy = new TestProxy(ProxyType.SOCKS5)) {
+            testProxy.start();
+
+            AtomicInteger supplierUsageCount = new AtomicInteger();
+            Supplier<ProxyHandler> proxyHandlerSupplier = () -> {
+                supplierUsageCount.incrementAndGet();
+                return new Socks5ProxyHandler(new 
InetSocketAddress("localhost", testProxy.getPort()));
+            };
+
+            Connection connection = establishConnecton(testPeer, 
proxyHandlerSupplier, false, null);
+
+            testPeer.expectClose();
+            connection.close();
+
+            assertEquals(1, testProxy.getSuccessCount());
+            assertEquals("Unexpected handler supplier usage count", 1, 
supplierUsageCount.get());
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testCreateSecureConnectionViaSocksProxy() throws Exception {
+        TransportOptions sslOptions = new TransportOptions();
+        sslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
+        sslOptions.setKeyStorePassword(PASSWORD);
+        sslOptions.setVerifyHost(false);
+
+        SSLContext context = TransportSupport.createJdkSslContext(sslOptions);
+        try (TestAmqpPeer testPeer = new TestAmqpPeer(context, false);
+             TestProxy testProxy = new TestProxy(ProxyType.SOCKS5)) {
+            testProxy.start();
+
+            String connOptions = "?transport.trustStoreLocation=" + 
CLIENT_JKS_TRUSTSTORE + "&" + "transport.trustStorePassword=" + PASSWORD
+                    + "&" + "transport.useOpenSSL=" + false;
+            AtomicInteger supplierUsageCount = new AtomicInteger();
+            Supplier<ProxyHandler> proxyHandlerSupplier = () -> {
+                supplierUsageCount.incrementAndGet();
+                return new Socks5ProxyHandler(new 
InetSocketAddress("localhost", testProxy.getPort()));
+            };
+
+            Connection connection = establishConnecton(testPeer, 
proxyHandlerSupplier, true, connOptions);
+
+            Socket socket = testPeer.getClientSocket();
+            assertTrue(socket instanceof SSLSocket);
+
+            testPeer.expectClose();
+            connection.close();
+
+            assertEquals(1, testProxy.getSuccessCount());
+            assertEquals("Unexpected handler supplier usage count", 1, 
supplierUsageCount.get());
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testFailoverCreateConsumerAfterConnectionDropsViaSocksProxy() 
throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();
+             TestProxy testProxy = new TestProxy(ProxyType.SOCKS5)) {
+            testProxy.start();
+
+            int proxyPort = testProxy.getPort();
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+            LOG.info("Proxy is at port: {}", proxyPort);
+
+            // Connect to the first peer
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.dropAfterLastHandler();
+
+            AtomicInteger supplierUsageCount = new AtomicInteger();
+            Supplier<ProxyHandler> proxyHandlerSupplier = () -> {
+                supplierUsageCount.incrementAndGet();
+                return new Socks5ProxyHandler(new 
InetSocketAddress("localhost", proxyPort));
+            };
+
+            final JmsConnection connection = 
establishFailoverConnecton(proxyHandlerSupplier, originalPeer, finalPeer);
+            ((JmsDefaultPrefetchPolicy) 
connection.getPrefetchPolicy()).setQueuePrefetch(0);
+            connection.addConnectionListener(new 
JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalURI.equals(remoteURI.toString())) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalURI.equals(remoteURI.toString())) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", 
originalConnected.await(5, TimeUnit.SECONDS));
+            assertEquals("Unexpected handler supplier usage count", 1, 
supplierUsageCount.get());
+
+            // --- Post Failover Expectations of FinalPeer --- //
+
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectReceiverAttach();
+            finalPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(1)));
+            finalPeer.expectLinkFlow(true, true, 
equalTo(UnsignedInteger.valueOf(1)));
+            finalPeer.expectDetach(true, true, true);
+            finalPeer.expectClose();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            assertNull(consumer.receive(500));
+            LOG.info("Receive returned");
+
+            assertTrue("Should connect to final peer", finalConnected.await(5, 
TimeUnit.SECONDS));
+
+            LOG.info("Closing consumer");
+            consumer.close();
+
+            // Shut it down
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+
+            // connection to originalPeer and finalPeer
+            assertEquals(2, testProxy.getSuccessCount());
+            assertEquals("Unexpected handler supplier usage count", 2, 
supplierUsageCount.get());
+        }
+    }
+
+    @Test(timeout = 30000)
+    public void testCreateWebSocketConnectionViaHttpProxyAndStart() throws 
Exception {
+        doTestCreateWebSocketConnectionViaHttpProxyAndStart(false);
+    }
+
+    @Test(timeout = 30000)
+    public void testCreateSecureWebSocketConnectionViaHttpProxyAndStart() 
throws Exception {
+        doTestCreateWebSocketConnectionViaHttpProxyAndStart(true);
+    }
+
+    private void doTestCreateWebSocketConnectionViaHttpProxyAndStart(boolean 
secure) throws Exception {
+        TransportOptions options = new TransportOptions();
+        options.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
+        options.setKeyStorePassword(PASSWORD);
+        options.setVerifyHost(false);
+
+        final AtomicBoolean connectedThroughProxy = new AtomicBoolean();
+
+        String connOptions = "?transport.trustStoreLocation=" + 
CLIENT_JKS_TRUSTSTORE +
+                             "&transport.trustStorePassword=" + PASSWORD +
+                             "&transport.useOpenSSL=" + false;
+
+        try (NettySimpleAmqpServer server = new NettySimpleAmqpServer(options, 
secure, false, true);
+             TestProxy testProxy = new TestProxy(ProxyType.HTTP)) {
+
+            server.setConnectionIntercepter((protonConnection) -> {
+                connectedThroughProxy.set(true);
+                return null;
+            });
+
+            server.start();
+            testProxy.start();
+
+            JmsConnectionFactory factory = new 
JmsConnectionFactory(server.getConnectionURI() + connOptions);
+            
factory.setExtension(JmsConnectionExtensions.PROXY_HANDLER_SUPPLIER.toString(), 
(connection, remote) -> {
+                SocketAddress proxyAddress = new 
InetSocketAddress("localhost", testProxy.getPort());
+                Supplier<ProxyHandler> proxyHandlerFactory = () -> {
+                    return new HttpProxyHandler(proxyAddress);
+                };
+                return proxyHandlerFactory;
+            });
+
+            Connection connection = factory.createConnection();
+            assertNotNull(connection);
+
+            connection.start();
+
+            Session session = connection.createSession();
+            assertNotNull(connection);
+            session.close();
+
+            connection.close();
+
+            assertEquals(1, testProxy.getSuccessCount());
+            assertTrue("Client did not connect to test server through the 
proxy.", connectedThroughProxy.get());
+        }
+    }
+
+    @Test(timeout = 30000)
+    public void testCreateWebSocketConnectionViaSocksProxyAndStart() throws 
Exception {
+        doTestCreateWebSocketConnectionViaSocksProxyAndStart(false);
+    }
+
+    @Test(timeout = 30000)
+    public void testCreateSecureWebSocketConnectionViaSocksProxyAndStart() 
throws Exception {
+        doTestCreateWebSocketConnectionViaSocksProxyAndStart(true);
+    }
+
+    private void doTestCreateWebSocketConnectionViaSocksProxyAndStart(boolean 
secure) throws Exception {
+        TransportOptions serverOptions = new TransportOptions();
+        serverOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
+        serverOptions.setKeyStorePassword(PASSWORD);
+        serverOptions.setVerifyHost(false);
+
+        final AtomicBoolean connectedThroughProxy = new AtomicBoolean();
+
+        String connOptions = "?transport.trustStoreLocation=" + 
CLIENT_JKS_TRUSTSTORE +
+                             "&transport.trustStorePassword=" + PASSWORD +
+                             "&transport.useOpenSSL=" + false;
+
+        try (NettySimpleAmqpServer server = new 
NettySimpleAmqpServer(serverOptions, secure, false, true);
+             TestProxy testProxy = new TestProxy(ProxyType.SOCKS5)) {
+
+            server.setConnectionIntercepter((protonConnection) -> {
+                connectedThroughProxy.set(true);
+                return null;
+            });
+
+            server.start();
+            testProxy.start();
+
+            JmsConnectionFactory factory = new 
JmsConnectionFactory(server.getConnectionURI() + connOptions);
+            
factory.setExtension(JmsConnectionExtensions.PROXY_HANDLER_SUPPLIER.toString(), 
(connection, remote) -> {
+                SocketAddress proxyAddress = new 
InetSocketAddress("localhost", testProxy.getPort());
+                Supplier<ProxyHandler> proxyHandlerFactory = () -> {
+                    return new Socks5ProxyHandler(proxyAddress);
+                };
+                return proxyHandlerFactory;
+            });
+
+            Connection connection = factory.createConnection();
+            assertNotNull(connection);
+
+            connection.start();
+
+            Session session = connection.createSession();
+            assertNotNull(connection);
+            session.close();
+
+            connection.close();
+
+            assertEquals(1, testProxy.getSuccessCount());
+            assertTrue("Client did not connect to test server through the 
proxy.", connectedThroughProxy.get());
+        }
+    }
+
+    private Connection establishConnecton(TestAmqpPeer testPeer, 
Supplier<ProxyHandler> proxyHandlerSupplier, boolean ssl, String optionsString) 
throws JMSException {
+        testPeer.expectSaslPlain("guest", "guest");
+        testPeer.expectOpen();
+
+        // Each connection creates a session for managing temporary 
destinations etc
+        testPeer.expectBegin();
+
+        String remoteURI = buildURI(testPeer, ssl, optionsString);
+        LOG.debug("connect to {}", remoteURI);
+        JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+        
factory.setExtension(JmsConnectionExtensions.PROXY_HANDLER_SUPPLIER.toString(), 
(connection1, remote) -> {
+            return proxyHandlerSupplier;
+        });
+        Connection connection = factory.createConnection("guest", "guest");
+
+        // Set a clientId to provoke the actual AMQP connection process to 
occur.
+        connection.setClientID("clientName");
+
+        assertNull(testPeer.getThrowable());
+
+        return connection;
+    }
+
+    private JmsConnection establishFailoverConnecton(Supplier<ProxyHandler> 
proxyHandlerSupplier, TestAmqpPeer... peers) throws JMSException {
+        if (peers.length == 0) {
+            throw new IllegalArgumentException("No test peers were given, at 
least 1 required");
+        }
+
+        String remoteURI = "failover:(";
+        boolean first = true;
+        for (TestAmqpPeer peer : peers) {
+            if (!first) {
+                remoteURI += ",";
+            }
+            remoteURI += createPeerURI(peer, null);
+            first = false;
+        }
+
+        remoteURI += ")?failover.maxReconnectAttempts=10";
+
+        JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+        
factory.setExtension(JmsConnectionExtensions.PROXY_HANDLER_SUPPLIER.toString(), 
(connection, remote) -> {
+            return proxyHandlerSupplier;
+        });
+
+        Connection connection = factory.createConnection();
+
+        return (JmsConnection) connection;
+    }
+
+    private String createPeerURI(TestAmqpPeer peer) {
+        return createPeerURI(peer, null);
+    }
+
+    private String createPeerURI(TestAmqpPeer peer, String params) {
+        return "amqp://localhost:" + peer.getServerPort() + (params != null ? 
"?" + params : "");
+    }
+
+    private String buildURI(TestAmqpPeer testPeer, boolean ssl, String 
optionsString) {
+        String scheme = ssl ? "amqps" : "amqp";
+        final String baseURI = scheme + "://localhost:" + 
testPeer.getServerPort();
+        String remoteURI = baseURI;
+        if (optionsString != null) {
+            if (optionsString.startsWith("?")) {
+                remoteURI = baseURI + optionsString;
+            } else {
+                remoteURI = baseURI + "?" + optionsString;
+            }
+        }
+        return remoteURI;
+    }
+}
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/proxy/TestProxy.java 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/proxy/TestProxy.java
new file mode 100644
index 0000000..4bbe8aa
--- /dev/null
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/proxy/TestProxy.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.test.proxy;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousServerSocketChannel;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.Channel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.CompletionHandler;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestProxy implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(TestProxy.class);
+    private static final int TIMEOUT_IN_S = 2;
+    private AsynchronousServerSocketChannel serverSocketChannel;
+    private AtomicInteger connectCount = new AtomicInteger();
+    private ProxyReadHandler readHandler = new ProxyReadHandler();
+    private ProxyWriteHandler writeHandler = new ProxyWriteHandler();
+    private int port;
+    private final ProxyType type;
+
+    public enum ProxyType {
+        SOCKS5, HTTP
+    }
+
+    public TestProxy(ProxyType type) throws IOException {
+        Objects.requireNonNull(type, "Proxy type must be given");
+
+        this.type = type;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public int getSuccessCount() {
+        return connectCount.get();
+    }
+
+    public void start() throws IOException {
+        serverSocketChannel = AsynchronousServerSocketChannel.open();
+        serverSocketChannel.bind(new InetSocketAddress(0));
+        port = ((InetSocketAddress) 
serverSocketChannel.getLocalAddress()).getPort();
+        LOG.info("Bound listen socket to port {}, waiting for clients...", 
port);
+        serverSocketChannel.accept(null, new ServerConnectionHandler());
+    }
+
+    @Override
+    public void close() {
+        LOG.info("stopping proxy server");
+        // Close Server Socket
+        if (serverSocketChannel != null) {
+            try {
+                LOG.info("Terminating server socket");
+                serverSocketChannel.close();
+            } catch (Exception e) {
+                LOG.error("Cannot close server socket ", e);
+            }
+        }
+    }
+
+    private boolean processHandshakeMessages(ProxyConnectionState attachment) {
+        if (attachment.handshakePhase == HandshakePhase.INITIAL) {
+            byte first = attachment.buffer.get(0);
+            if (first == 0x5) {
+                attachment.handshakePhase = HandshakePhase.SOCKS5_1;
+            } else {
+                attachment.handshakePhase = HandshakePhase.HTTP;
+            }
+        }
+
+        if (!assertExpectedHandshakeType(attachment.handshakePhase)) {
+            LOG.error("Unexpected handshake phase '" + 
attachment.handshakePhase + "' for proxy of type: " + type);
+            return false;
+        }
+
+        switch (attachment.handshakePhase) {
+        case SOCKS5_1:
+            return processSocks5Handshake1(attachment);
+        case SOCKS5_2:
+            return processSocks5Handshake2(attachment);
+        case HTTP:
+            return processHttpHandshake(attachment);
+        default:
+            LOG.error("wrong handshake phase");
+            return false;
+        }
+    }
+
+    private boolean assertExpectedHandshakeType(HandshakePhase handshakePhase) 
{
+        switch (handshakePhase) {
+        case SOCKS5_1:
+        case SOCKS5_2:
+            return type == ProxyType.SOCKS5;
+        case HTTP:
+            return type == ProxyType.HTTP;
+        default:
+            LOG.error("Unknown handshake phase type:" + handshakePhase);
+            return false;
+        }
+    }
+
+    private boolean processHttpHandshake(ProxyConnectionState attachment) {
+        String requestString = 
StandardCharsets.ISO_8859_1.decode(attachment.buffer).toString();
+        LOG.debug("Request received: {}", requestString);
+        String requestType = requestString.substring(0, 
requestString.indexOf(' '));
+        String hostandport = requestString.substring(requestType.length() + 1);
+        hostandport = hostandport.substring(0, hostandport.indexOf(' '));
+        String hostname = hostandport.substring(0, hostandport.indexOf(":"));
+        int port = Integer.parseInt(hostandport.substring(hostname.length() + 
1));
+
+        String line;
+        if (requestType.equals("CONNECT")) {
+            LOG.info("CONNECT to {}:{}", hostname, port);
+            if (connectToServer(hostname, port, attachment)) {
+                attachment.handshakePhase = HandshakePhase.CONNECTED;
+                line = "HTTP/1.1 200 Connection established\r\n\r\n";
+            } else {
+                line = "HTTP/1.1 504 Gateway Timeout\r\n\r\n";
+            }
+        } else {
+            LOG.error("unsupported request type {}", requestType);
+            line = "HTTP/1.1 502 Bad Gateway\r\n\r\n";
+        }
+        attachment.buffer.clear();
+        attachment.buffer.put(StandardCharsets.ISO_8859_1.encode(line));
+        attachment.buffer.flip();
+        return true;
+    }
+
+    private boolean processSocks5Handshake1(ProxyConnectionState attachment) {
+        byte version = attachment.buffer.get();
+        if (version != 0x5) {
+            LOG.error("SOCKS Version {} not supported", version);
+            closeChannel(attachment.readChannel);
+            return false;
+        }
+        attachment.buffer.clear();
+        attachment.buffer.put(version);
+        attachment.buffer.put((byte) 0x0); // no authentication required
+        attachment.buffer.flip();
+        LOG.info("SOCKS5 connection initialized, no authentication required");
+        attachment.handshakePhase = HandshakePhase.SOCKS5_2;
+        return true;
+    }
+
+    private boolean processSocks5Handshake2(ProxyConnectionState attachment) {
+        byte version = attachment.buffer.get();
+        if (version != 0x5) {
+            LOG.error("SOCKS Version {} not supported", version);
+            closeChannel(attachment.readChannel);
+            return false;
+        }
+        byte command = attachment.buffer.get();
+        if (command != 0x1) {
+            LOG.error("CMD {} not supported", command);
+            closeChannel(attachment.readChannel);
+            return false;
+        }
+        attachment.buffer.get(); // skip the next byte (should be 0x0)
+        byte addressType = attachment.buffer.get();
+        if (addressType != 0x3) {
+            LOG.error("Address Type {} not supported", addressType);
+            closeChannel(attachment.readChannel);
+            return false;
+        }
+        int size = attachment.buffer.get() & 0xFF; // unsigned byte
+        byte[] hostBytes = new byte[size];
+        attachment.buffer.get(hostBytes);
+        String hostname = new String(hostBytes, StandardCharsets.UTF_8);
+        int port = attachment.buffer.getShort() & 0xffff; // unsigned short
+        // now we have hostname and port connect to the actual listen
+        LOG.info("Create SOCKS5 connection to {}:{}", hostname, port);
+        if (!connectToServer(hostname, port, attachment)) {
+            return false;
+        }
+
+        // write back the client response
+        attachment.buffer.rewind();
+        attachment.buffer.put(1, (byte) 0x0);
+        attachment.handshakePhase = HandshakePhase.CONNECTED; // handshake done
+        return true;
+    }
+
+    private boolean isInHandshake(ProxyConnectionState attachment) {
+        return attachment.handshakePhase != HandshakePhase.CONNECTED;
+    }
+
+    private boolean connectToServer(String hostname, int port, 
ProxyConnectionState attachment) {
+        try {
+            AsynchronousSocketChannel serverChannel = 
AsynchronousSocketChannel.open();
+            try {
+                serverChannel.setOption(StandardSocketOptions.TCP_NODELAY, 
true);
+            } catch (IOException e) {
+                LOG.error("Failed to set TCP_NODELAY before connect, closing 
channel", e);
+                closeChannel(serverChannel);
+                return false;
+            }
+
+            SocketAddress serverAddr = new InetSocketAddress(hostname, port);
+            Future<Void> connectResult = serverChannel.connect(serverAddr);
+            connectResult.get(TIMEOUT_IN_S, TimeUnit.SECONDS);
+            attachment.writeChannel = serverChannel;
+            int connectionNumber = connectCount.incrementAndGet();
+            LOG.info("Connection {} to {}:{} established", connectionNumber, 
hostname, port);
+            ProxyConnectionState serverState = new ProxyConnectionState();
+            serverState.readChannel = attachment.writeChannel;
+            serverState.writeChannel = attachment.readChannel;
+            serverState.buffer = ByteBuffer.allocate(4096);
+            serverState.handshakePhase = HandshakePhase.CONNECTED;
+
+            // read from server
+            serverState.readChannel.read(serverState.buffer, 2, 
TimeUnit.SECONDS, serverState, readHandler);
+            return true;
+        } catch (IOException | InterruptedException | ExecutionException | 
TimeoutException e) {
+            LOG.error("connection failed ", e);
+            closeChannel(attachment.readChannel);
+            return false;
+        }
+    }
+
+    private void closeChannel(Channel channel) {
+        if (channel != null && channel.isOpen()) {
+            try {
+                channel.close();
+            } catch (IOException e) {
+                LOG.error("cannot close", e);
+            }
+        }
+    }
+
+    private void shutdownOutput(AsynchronousSocketChannel channel) {
+        if (channel != null && channel.isOpen()) {
+            try {
+                LOG.info("shutdown output for ({})", channel);
+                channel.shutdownOutput();
+            } catch (IOException e) {
+                LOG.error("cannot shutdown output to ({})", channel, e);
+            }
+        }
+    }
+
+    private static class ProxyConnectionState {
+        AsynchronousSocketChannel readChannel;
+        AsynchronousSocketChannel writeChannel;
+        ByteBuffer buffer;
+        HandshakePhase handshakePhase;
+    }
+
+    private enum HandshakePhase {
+        INITIAL, HTTP, SOCKS5_1, SOCKS5_2, CONNECTED
+    }
+
+    private class ServerConnectionHandler implements 
CompletionHandler<AsynchronousSocketChannel, Object> {
+
+        @Override
+        public void completed(AsynchronousSocketChannel clientChannel, Object 
attachment) {
+            // keep listening
+            serverSocketChannel.accept(attachment, this);
+
+            ProxyConnectionState clientState = new ProxyConnectionState();
+            clientState.readChannel = clientChannel;
+            clientState.buffer = ByteBuffer.allocate(4096);
+            clientState.handshakePhase = HandshakePhase.INITIAL;
+
+            try {
+                clientChannel.setOption(StandardSocketOptions.TCP_NODELAY, 
true);
+            } catch (IOException e) {
+                LOG.error("Failed to set TCP_NODELAY after accept, closing 
channel", e);
+                closeChannel(clientChannel);
+                return;
+            }
+
+            // read from readChannel
+            clientChannel.read(clientState.buffer, TIMEOUT_IN_S, 
TimeUnit.SECONDS, clientState, readHandler);
+        }
+
+        @Override
+        public void failed(Throwable e, Object attachment) {
+            if (!(e instanceof ClosedChannelException)) {
+                LOG.error("failed to accept connection ", e);
+            }
+            closeChannel(serverSocketChannel);
+        }
+    }
+
+    private class ProxyReadHandler implements CompletionHandler<Integer, 
ProxyConnectionState> {
+
+        @Override
+        public void completed(Integer result, ProxyConnectionState attachment) 
{
+            // connection closed
+            if (result == -1) {
+                LOG.info("read connection reached end of file ({})", 
attachment.readChannel);
+                closeChannel(attachment.readChannel);
+                shutdownOutput(attachment.writeChannel);
+                return;
+            }
+            LOG.info("read {} bytes (from {})", result, 
attachment.readChannel);
+
+            attachment.buffer.flip();
+
+            if (isInHandshake(attachment)) {
+                if (processHandshakeMessages(attachment)) {
+                    attachment.readChannel.write(attachment.buffer, 
TIMEOUT_IN_S, TimeUnit.SECONDS, attachment, writeHandler);
+                }
+            } else {
+                // handshake done
+                if (attachment.writeChannel == null) {
+                    LOG.error("Invalid");
+                    closeChannel(attachment.readChannel);
+                    return;
+                }
+                // forward data to writeChannel
+                if (attachment.writeChannel.isOpen()) {
+                    attachment.writeChannel.write(attachment.buffer, 
TIMEOUT_IN_S, TimeUnit.SECONDS, attachment, writeHandler);
+                } else {
+                    closeChannel(attachment.readChannel);
+                }
+            }
+        }
+
+        @Override
+        public void failed(Throwable e, ProxyConnectionState attachment) {
+            if (!(e instanceof ClosedChannelException)) {
+                LOG.info("read failed", e);
+            }
+            closeChannel(attachment.writeChannel);
+            closeChannel(attachment.readChannel);
+        }
+    }
+
+    private class ProxyWriteHandler implements CompletionHandler<Integer, 
ProxyConnectionState> {
+
+        @Override
+        public void completed(Integer result, ProxyConnectionState attachment) 
{
+            // connection closed
+            if (result == -1) {
+                LOG.info("write connection closed");
+                closeChannel(attachment.writeChannel);
+                closeChannel(attachment.readChannel);
+                return;
+            }
+            LOG.debug("wrote {} bytes", result);
+            attachment.buffer.clear();
+            if (attachment.readChannel.isOpen()) {
+                attachment.readChannel.read(attachment.buffer, attachment, 
readHandler);
+            } else {
+                closeChannel(attachment.writeChannel);
+            }
+        }
+
+        @Override
+        public void failed(Throwable e, ProxyConnectionState attachment) {
+            if (!(e instanceof ClosedChannelException)) {
+                LOG.info("write failed", e);
+            }
+            closeChannel(attachment.writeChannel);
+            closeChannel(attachment.readChannel);
+        }
+    }
+}
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java
index 272cd86..402608b 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java
@@ -22,12 +22,16 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
+import java.util.function.Supplier;
+
 import javax.net.ssl.SSLContext;
 
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import io.netty.handler.proxy.ProxyHandler;
+
 /**
  * Test for class TransportOptions
  */
@@ -64,13 +68,13 @@ public class TransportOptionsTest extends QpidJmsTestCase {
     private static final String[] DISABLED_CIPHERS = new String[] {"CIPHER_C"};
 
     private static final SSLContext SSL_CONTEXT = 
Mockito.mock(SSLContext.class);
+    private static final Supplier<ProxyHandler> PROXY_HANDLER_SUPPLIER = () -> 
Mockito.mock(ProxyHandler.class);
 
     private static final String JAVAX_NET_SSL_KEY_STORE = 
"javax.net.ssl.keyStore";
     private static final String JAVAX_NET_SSL_KEY_STORE_PASSWORD = 
"javax.net.ssl.keyStorePassword";
     private static final String JAVAX_NET_SSL_TRUST_STORE = 
"javax.net.ssl.trustStore";
     private static final String JAVAX_NET_SSL_TRUST_STORE_PASSWORD = 
"javax.net.ssl.trustStorePassword";
 
-
     @Test
     public void testCreate() {
         TransportOptions options = new TransportOptions();
@@ -93,6 +97,7 @@ public class TransportOptionsTest extends QpidJmsTestCase {
         assertNull(options.getTrustStorePassword());
         assertNull(options.getKeyAlias());
         assertNull(options.getSslContextOverride());
+        assertNull(options.getProxyHandlerSupplier());
     }
 
     @Test
@@ -139,6 +144,7 @@ public class TransportOptionsTest extends QpidJmsTestCase {
         assertEquals(LOCAL_ADDRESS,options.getLocalAddress());
         assertEquals(LOCAL_PORT,options.getLocalPort());
         assertEquals(SSL_CONTEXT, options.getSslContextOverride());
+        assertEquals(PROXY_HANDLER_SUPPLIER, 
options.getProxyHandlerSupplier());
         assertArrayEquals(ENABLED_PROTOCOLS,options.getEnabledProtocols());
         assertArrayEquals(DISABLED_PROTOCOLS,options.getDisabledProtocols());
         assertArrayEquals(ENABLED_CIPHERS,options.getEnabledCipherSuites());
@@ -220,6 +226,7 @@ public class TransportOptionsTest extends QpidJmsTestCase {
         assertEquals(KEY_ALIAS, options.getKeyAlias());
         assertEquals(CONTEXT_PROTOCOL, options.getContextProtocol());
         assertEquals(SSL_CONTEXT, options.getSslContextOverride());
+        assertEquals(PROXY_HANDLER_SUPPLIER, 
options.getProxyHandlerSupplier());
         assertArrayEquals(ENABLED_PROTOCOLS,options.getEnabledProtocols());
         assertArrayEquals(DISABLED_PROTOCOLS,options.getDisabledProtocols());
         assertArrayEquals(ENABLED_CIPHERS,options.getEnabledCipherSuites());
@@ -253,6 +260,7 @@ public class TransportOptionsTest extends QpidJmsTestCase {
         options.setConnectTimeout(TEST_CONNECT_TIMEOUT);
         options.setDefaultSslPort(TEST_DEFAULT_SSL_PORT);
         options.setSslContextOverride(SSL_CONTEXT);
+        options.setProxyHandlerSupplier(PROXY_HANDLER_SUPPLIER);
 
         return options;
     }
@@ -328,6 +336,7 @@ public class TransportOptionsTest extends QpidJmsTestCase {
         options.setDisabledCipherSuites(DISABLED_CIPHERS);
         options.setLocalAddress(LOCAL_ADDRESS);
         options.setLocalPort(LOCAL_PORT);
+        options.setProxyHandlerSupplier(PROXY_HANDLER_SUPPLIER);
 
         return options;
     }
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyServer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyServer.java
index 2046854..b22e916 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyServer.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyServer.java
@@ -25,6 +25,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
@@ -92,6 +93,7 @@ public abstract class NettyServer implements AutoCloseable {
     private volatile SslHandler sslHandler;
     private volatile HandshakeComplete handshakeComplete;
     private final CountDownLatch handshakeCompletion = new CountDownLatch(1);
+    private final AtomicInteger channelActiveCount = new AtomicInteger();
 
     private final AtomicBoolean started = new AtomicBoolean();
 
@@ -154,7 +156,7 @@ public abstract class NettyServer implements AutoCloseable {
         return handshakeComplete;
     }
 
-    protected URI getConnectionURI() throws Exception {
+    public URI getConnectionURI() throws Exception {
         if (!started.get()) {
             throw new IllegalStateException("Cannot get URI of non-started 
server");
         }
@@ -326,6 +328,8 @@ public abstract class NettyServer implements AutoCloseable {
                     }
                 });
             }
+
+            channelActiveCount.incrementAndGet();
         }
 
         @Override
@@ -383,4 +387,8 @@ public abstract class NettyServer implements AutoCloseable {
     protected SslHandler getSslHandler() {
         return sslHandler;
     }
+
+    public int getChannelActiveCount() {
+        return channelActiveCount.get();
+    }
 }
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java
index 01d1f32..e5054d0 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java
@@ -37,6 +37,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 
 import org.apache.qpid.jms.transports.TransportOptions;
 import org.apache.qpid.jms.util.IdGenerator;
@@ -284,7 +285,7 @@ public class NettySimpleAmqpServer extends NettyServer {
             protonConnection.open();
 
             if (connectionIntercepter != null) {
-                failure = 
connectionIntercepter.interceptConnectionAttempt(connection);
+                failure = connectionIntercepter.apply(connection);
             }
 
             if (failure == null) {
@@ -496,9 +497,11 @@ public class NettySimpleAmqpServer extends NettyServer {
         this.connectionIntercepter = connectionIntercepter;
     }
 
-    public interface ConnectionIntercepter {
+    @FunctionalInterface
+    public interface ConnectionIntercepter extends Function<Connection, 
ErrorCondition> {
 
-        ErrorCondition interceptConnectionAttempt(Connection connection);
+        @Override
+        ErrorCondition apply(Connection connection);
 
     }
 
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java
index 3a562cc..1205587 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java
@@ -23,12 +23,17 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.cert.Certificate;
 import java.security.cert.X509Certificate;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
+import org.apache.qpid.jms.test.Wait;
+import org.apache.qpid.jms.test.proxy.TestProxy;
 import org.apache.qpid.jms.transports.Transport;
 import org.apache.qpid.jms.transports.TransportListener;
 import org.apache.qpid.jms.transports.TransportOptions;
@@ -39,6 +44,10 @@ import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.handler.proxy.HttpProxyHandler;
+import io.netty.handler.proxy.ProxyHandler;
+import io.netty.handler.proxy.Socks5ProxyHandler;
+
 /**
  * Test basic functionality of the Netty based TCP Transport ruuing in secure 
mode (SSL).
  */
@@ -238,19 +247,34 @@ public class NettySslTransportTest extends 
NettyTcpTransportTest {
 
     @Test(timeout = 60 * 1000)
     public void testConnectToServerVerifyHost() throws Exception {
-        doConnectToServerVerifyHostTestImpl(true);
+        doConnectToServerVerifyHostTestImpl(true, null);
     }
 
     @Test(timeout = 60 * 1000)
     public void testConnectToServerNoVerifyHost() throws Exception {
-        doConnectToServerVerifyHostTestImpl(false);
+        doConnectToServerVerifyHostTestImpl(false, null);
     }
 
-    private void doConnectToServerVerifyHostTestImpl(boolean verifyHost) 
throws Exception, URISyntaxException, IOException, InterruptedException {
+    @Test(timeout = 60 * 1000)
+    public void testConnectViaSocksProxyToServerVerifyHost() throws Exception {
+        doConnectToServerVerifyHostTestImpl(true, TestProxy.ProxyType.SOCKS5);
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testConnectViaSocksProxyToServerNoVerifyHost() throws 
Exception {
+        doConnectToServerVerifyHostTestImpl(false, TestProxy.ProxyType.SOCKS5);
+    }
+
+    protected void doConnectToServerVerifyHostTestImpl(boolean verifyHost, 
TestProxy.ProxyType proxyType) throws Exception {
         TransportOptions serverOptions = createServerOptions();
         serverOptions.setKeyStoreLocation(SERVER_WRONG_HOST_KEYSTORE);
 
+        TestProxy testProxy = null;
         try (NettyEchoServer server = createEchoServer(serverOptions)) {
+            if (proxyType != null) {
+                testProxy = new TestProxy(proxyType);
+                testProxy.start();
+            }
             server.start();
 
             int port = server.getServerPort();
@@ -258,6 +282,10 @@ public class NettySslTransportTest extends 
NettyTcpTransportTest {
 
             TransportOptions clientOptions = 
createClientOptionsIsVerify(verifyHost);
 
+            if (proxyType != null) {
+                configureProxyHandlerSupplier(proxyType, testProxy, 
clientOptions);
+            }
+
             if (verifyHost) {
                 assertTrue("Expected verifyHost to be true", 
clientOptions.isVerifyHost());
             } else {
@@ -286,6 +314,38 @@ public class NettySslTransportTest extends 
NettyTcpTransportTest {
             }
 
             transport.close();
+
+            if (proxyType != null) {
+                assertEquals(1, testProxy.getSuccessCount());
+            }
+            assertTrue(Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisfied() throws Exception {
+                    return server.getChannelActiveCount() == 1;
+                }
+            }, 10_000, 10));
+        } finally {
+            if (testProxy != null) {
+                testProxy.close();
+            }
+        }
+    }
+
+    protected void configureProxyHandlerSupplier(TestProxy.ProxyType 
proxyType, TestProxy testProxy, TransportOptions clientOptions) {
+        if (proxyType == TestProxy.ProxyType.SOCKS5) {
+            SocketAddress proxyAddress = new InetSocketAddress("localhost", 
testProxy.getPort());
+            Supplier<ProxyHandler> proxyHandlerFactory = () -> {
+                return new Socks5ProxyHandler(proxyAddress);
+            };
+            clientOptions.setProxyHandlerSupplier(proxyHandlerFactory);
+        } else if (proxyType == TestProxy.ProxyType.HTTP) {
+            SocketAddress proxyAddress = new InetSocketAddress("localhost", 
testProxy.getPort());
+            Supplier<ProxyHandler> proxyHandlerFactory = () -> {
+                return new HttpProxyHandler(proxyAddress);
+            };
+            clientOptions.setProxyHandlerSupplier(proxyHandlerFactory);
+        } else {
+            throw new IllegalArgumentException("Unknown proxy type:" + 
proxyType);
         }
     }
 
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java
index b9f5b07..ea067bb 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java
@@ -41,7 +41,6 @@ import org.apache.qpid.jms.JmsConnectionExtensions;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.transports.TransportOptions;
-import 
org.apache.qpid.jms.transports.netty.NettySimpleAmqpServer.ConnectionIntercepter;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.transport.ConnectionError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@@ -159,38 +158,31 @@ public class NettyTcpToMockServerTest extends 
QpidJmsTestCase {
 
             final CountDownLatch redirectComplete = new CountDownLatch(1);
 
-            server.setConnectionIntercepter(new ConnectionIntercepter() {
+            server.setConnectionIntercepter((protonConnection) -> {
 
-                @Override
-                public ErrorCondition 
interceptConnectionAttempt(org.apache.qpid.proton.engine.Connection connection) 
{
-                    ErrorCondition redirection = new 
ErrorCondition(ConnectionError.REDIRECT,
-                        "Server redirecting connection.");
+                ErrorCondition redirection = new 
ErrorCondition(ConnectionError.REDIRECT,
+                    "Server redirecting connection.");
 
-                    URI serverURI = null;
-                    try {
-                        serverURI = redirect.getConnectionURI();
-                    } catch (Exception e) {
-                        new RuntimeException();
-                    }
+                URI serverURI = null;
+                try {
+                    serverURI = redirect.getConnectionURI();
+                } catch (Exception e) {
+                    new RuntimeException();
+                }
 
-                    // Create standard redirection condition
-                    Map<Symbol, Object> infoMap = new HashMap<Symbol, Object> 
();
-                    infoMap.put(OPEN_HOSTNAME, serverURI.getHost());
-                    infoMap.put(NETWORK_HOST, serverURI.getHost());
-                    infoMap.put(PORT, serverURI.getPort());
-                    redirection.setInfo(infoMap);
+                // Create standard redirection condition
+                Map<Symbol, Object> infoMap = new HashMap<Symbol, Object> ();
+                infoMap.put(OPEN_HOSTNAME, serverURI.getHost());
+                infoMap.put(NETWORK_HOST, serverURI.getHost());
+                infoMap.put(PORT, serverURI.getPort());
+                redirection.setInfo(infoMap);
 
-                    return redirection;
-                }
+                return redirection;
             });
 
-            redirect.setConnectionIntercepter(new ConnectionIntercepter() {
-
-                @Override
-                public ErrorCondition 
interceptConnectionAttempt(org.apache.qpid.proton.engine.Connection connection) 
{
-                    redirectComplete.countDown();
-                    return null;
-                }
+            redirect.setConnectionIntercepter((protonConnection) -> {
+                redirectComplete.countDown();
+                return null;
             });
 
             server.start();
@@ -234,40 +226,33 @@ public class NettyTcpToMockServerTest extends 
QpidJmsTestCase {
             primary.setWebSocketPath("/primary");
             redirect.setWebSocketPath("/redirect");
 
-            primary.setConnectionIntercepter(new ConnectionIntercepter() {
-
-                @Override
-                public ErrorCondition 
interceptConnectionAttempt(org.apache.qpid.proton.engine.Connection connection) 
{
-                    ErrorCondition redirection = new 
ErrorCondition(ConnectionError.REDIRECT,
-                        "Server redirecting connection.");
-
-                    URI serverURI = null;
-                    try {
-                        serverURI = redirect.getConnectionURI();
-                    } catch (Exception e) {
-                        new RuntimeException();
-                    }
-
-                    // Create standard redirection condition
-                    Map<Symbol, Object> infoMap = new HashMap<Symbol, Object> 
();
-                    infoMap.put(OPEN_HOSTNAME, serverURI.getHost());
-                    infoMap.put(NETWORK_HOST, serverURI.getHost());
-                    infoMap.put(PORT, serverURI.getPort());
-                    infoMap.put(PATH, redirect.getWebSocketPath());
-                    infoMap.put(SCHEME, redirect.isSecureServer() ? "wss" : 
"ws");
-                    redirection.setInfo(infoMap);
-
-                    return redirection;
-                }
-            });
+            primary.setConnectionIntercepter((protonConnection) -> {
 
-            redirect.setConnectionIntercepter(new ConnectionIntercepter() {
+                ErrorCondition redirection = new 
ErrorCondition(ConnectionError.REDIRECT,
+                    "Server redirecting connection.");
 
-                @Override
-                public ErrorCondition 
interceptConnectionAttempt(org.apache.qpid.proton.engine.Connection connection) 
{
-                    redirectComplete.countDown();
-                    return null;
+                URI serverURI = null;
+                try {
+                    serverURI = redirect.getConnectionURI();
+                } catch (Exception e) {
+                    new RuntimeException();
                 }
+
+                // Create standard redirection condition
+                Map<Symbol, Object> infoMap = new HashMap<Symbol, Object> ();
+                infoMap.put(OPEN_HOSTNAME, serverURI.getHost());
+                infoMap.put(NETWORK_HOST, serverURI.getHost());
+                infoMap.put(PORT, serverURI.getPort());
+                infoMap.put(PATH, redirect.getWebSocketPath());
+                infoMap.put(SCHEME, redirect.isSecureServer() ? "wss" : "ws");
+                redirection.setInfo(infoMap);
+
+                return redirection;
+            });
+
+            redirect.setConnectionIntercepter((protonConnection) -> {
+                redirectComplete.countDown();
+                return null;
             });
 
             primary.start();
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
index 87cf074..74ce189 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
@@ -27,14 +27,19 @@ import static org.junit.Assume.assumeTrue;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.Wait;
+import org.apache.qpid.jms.test.proxy.TestProxy;
+import org.apache.qpid.jms.test.proxy.TestProxy.ProxyType;
 import org.apache.qpid.jms.transports.Transport;
 import org.apache.qpid.jms.transports.TransportListener;
 import org.apache.qpid.jms.transports.TransportOptions;
@@ -53,6 +58,8 @@ import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.kqueue.KQueue;
 import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.handler.proxy.ProxyHandler;
+import io.netty.handler.proxy.Socks5ProxyHandler;
 import io.netty.util.ResourceLeakDetector;
 import io.netty.util.ResourceLeakDetector.Level;
 
@@ -617,6 +624,57 @@ public class NettyTcpTransportTest extends QpidJmsTestCase 
{
         doTestEpollSupport(false);
     }
 
+    @Test(timeout = 60 * 1000)
+    public void testConnectToServerViaProxy() throws Exception {
+        try (TestProxy testProxy = new TestProxy(ProxyType.SOCKS5);
+             NettyEchoServer server = createEchoServer(createServerOptions())) 
{
+
+            testProxy.start();
+            server.start();
+
+            int port = server.getServerPort();
+            LOG.info("Echo server bound at: {}", port);
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            TransportOptions clientOptions = createClientOptions();
+            SocketAddress proxyAddress = new InetSocketAddress("localhost", 
testProxy.getPort());
+            Supplier<ProxyHandler> proxyHandlerFactory = () -> {
+                return new Socks5ProxyHandler(proxyAddress);
+            };
+            clientOptions.setProxyHandlerSupplier(proxyHandlerFactory);
+
+            Transport transport = createTransport(serverLocation, 
testListener, clientOptions);
+            try {
+                transport.connect(null, null);
+                LOG.info("Connected to server:{} as expected.", 
serverLocation);
+            } catch (Exception e) {
+                fail("Should have connected to the server at " + 
serverLocation + " but got exception: " + e);
+            }
+
+            assertTrue(transport.isConnected());
+            assertEquals(serverLocation, transport.getRemoteLocation());
+
+            assertTrue(Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisfied() throws Exception {
+                    return server.getChannelActiveCount() == 1;
+                }
+            }, 10_000, 10));
+
+            assertEquals(1, testProxy.getSuccessCount());
+
+            transport.close();
+
+            // Additional close should not fail or cause other problems.
+            transport.close();
+
+        }
+
+        assertTrue(!transportClosed); // Normal shutdown does not trigger the 
event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
+
     private void doTestEpollSupport(boolean useEpoll) throws Exception {
         assumeTrue(Epoll.isAvailable());
 
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
index 4e3164c..655be3f 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
@@ -23,11 +23,16 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Supplier;
 
 import org.apache.qpid.jms.test.Wait;
+import org.apache.qpid.jms.test.proxy.TestProxy;
+import org.apache.qpid.jms.test.proxy.TestProxy.ProxyType;
 import org.apache.qpid.jms.transports.Transport;
 import org.apache.qpid.jms.transports.TransportListener;
 import org.apache.qpid.jms.transports.TransportOptions;
@@ -40,6 +45,8 @@ import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
 import io.netty.handler.codec.http.HttpHeaders;
 import 
io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler.HandshakeComplete;
+import io.netty.handler.proxy.HttpProxyHandler;
+import io.netty.handler.proxy.ProxyHandler;
 
 /**
  * Test the Netty based WebSocket Transport
@@ -387,4 +394,48 @@ public class NettyWsTransportTest extends 
NettyTcpTransportTest {
         assertTrue(exceptions.isEmpty());
         assertTrue(data.isEmpty());
     }
+
+    @Test(timeout = 60000)
+    public void testConnectViaHttpProxy() throws Exception {
+        try (TestProxy testProxy = new TestProxy(ProxyType.HTTP);
+             NettyEchoServer server = createEchoServer(createServerOptions())) 
{
+            testProxy.start();
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            TransportOptions clientOptions = createClientOptions();
+            SocketAddress proxyAddress = new InetSocketAddress("localhost", 
testProxy.getPort());
+            Supplier<ProxyHandler> proxyHandlerFactory = () -> {
+                return new HttpProxyHandler(proxyAddress);
+            };
+            clientOptions.setProxyHandlerSupplier(proxyHandlerFactory);
+
+            Transport transport = createTransport(serverLocation, 
testListener, clientOptions);
+            try {
+                transport.connect(null, null);
+                LOG.info("Connected to server:{} as expected.", 
serverLocation);
+            } catch (Exception e) {
+                fail("Should have connected to the server at " + 
serverLocation + " but got exception: " + e);
+            }
+
+            assertTrue(transport.isConnected());
+            assertEquals(serverLocation, transport.getRemoteLocation());
+
+            transport.close();
+
+            assertEquals(1, testProxy.getSuccessCount());
+            assertTrue(Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisfied() throws Exception {
+                    return server.getChannelActiveCount() == 1;
+                }
+            }, 10_000, 10));
+        }
+
+        assertTrue(!transportClosed); // Normal shutdown does not trigger the 
event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
 }
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWssTransportTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWssTransportTest.java
index f157537..1f2a4bd 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWssTransportTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWssTransportTest.java
@@ -18,8 +18,10 @@ package org.apache.qpid.jms.transports.netty;
 
 import java.net.URI;
 
+import org.apache.qpid.jms.test.proxy.TestProxy;
 import org.apache.qpid.jms.transports.TransportListener;
 import org.apache.qpid.jms.transports.TransportOptions;
+import org.junit.Test;
 
 /**
  * Test the NettyWsTransport with channel level security enabled.
@@ -39,4 +41,14 @@ public class NettyWssTransportTest extends 
NettySslTransportTest {
             return new NettyWsTransport(listener, serverLocation, options, 
true);
         }
     }
+
+    @Test(timeout = 60 * 1000)
+    public void testConnectViaHttpProxyToServerVerifyHost() throws Exception {
+        doConnectToServerVerifyHostTestImpl(true, TestProxy.ProxyType.HTTP);
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testConnectViaHttpProxyToServerNoVerifyHost() throws Exception 
{
+        doConnectToServerVerifyHostTestImpl(false, TestProxy.ProxyType.HTTP);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to