This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new e9de35dc PROTON-2841 Adds support for enabling WS compression to the 
transport
e9de35dc is described below

commit e9de35dcd4d7a2c095aa984c4f67059c98036e61
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Wed Jul 24 15:49:55 2024 -0400

    PROTON-2841 Adds support for enabling WS compression to the transport
    
    Optional (default disabled) WS compression added to Transport 
implementations
    and the test peer. Peers can check is WS compression is enabled after a 
connection
    is established.
---
 .../qpid/protonj2/client/TransportOptions.java     | 28 ++++++++++
 .../transport/netty4/WebSocketTransport.java       |  4 ++
 .../transport/netty5/WebSocketTransport.java       |  5 +-
 .../protonj2/client/impl/WsConnectionTest.java     | 63 ++++++++++++++++++++++
 .../client/transport/netty4/NettyServer.java       | 12 +++++
 .../client/transport/netty4/TcpTransportTest.java  |  4 ++
 .../transport/netty4/WebSocketTransportTest.java   | 50 +++++++++++++++++
 .../client/transport/netty5/NettyServer.java       | 12 +++++
 .../client/transport/netty5/TcpTransportTest.java  |  4 ++
 .../transport/netty5/WebSocketTransportTest.java   | 50 +++++++++++++++++
 .../protonj2/test/driver/ProtonTestClient.java     |  4 ++
 .../test/driver/ProtonTestClientOptions.java       | 11 ++++
 .../protonj2/test/driver/ProtonTestServer.java     |  6 +++
 .../test/driver/ProtonTestServerOptions.java       | 12 +++++
 .../protonj2/test/driver/netty/NettyClient.java    |  5 ++
 .../protonj2/test/driver/netty/NettyServer.java    |  5 ++
 .../test/driver/netty/netty4/Netty4Client.java     | 54 +++++++++++++++++++
 .../test/driver/netty/netty4/Netty4Server.java     | 54 +++++++++++++++++++
 .../test/driver/netty/netty5/Netty5Client.java     | 52 ++++++++++++++++++
 .../test/driver/netty/netty5/Netty5Server.java     | 49 +++++++++++++++++
 .../test/driver/ProtonTestWSClientTest.java        | 61 +++++++++++++++++++++
 21 files changed, 544 insertions(+), 1 deletion(-)

diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/TransportOptions.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/TransportOptions.java
index f681da61..47e4f3c7 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/TransportOptions.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/TransportOptions.java
@@ -40,6 +40,7 @@ public class TransportOptions implements Cloneable {
     public static final boolean DEFAULT_TRACE_BYTES = false;
     public static final int DEFAULT_LOCAL_PORT = 0;
     public static final boolean DEFAULT_USE_WEBSOCKETS = false;
+    public static final boolean DEFAULT_WEBSOCKET_COMPRESSION = false;
     public static final int DEFAULT_WEBSOCKET_MAX_FRAME_SIZE = 65535;
     private static final String[] DEFAULT_NATIVEIO_PREFERENCES_ARRAY = { 
"EPOLL", "KQUEUE" };
     public static final List<String> DEFAULT_NATIVEIO_PREFERENCES =
@@ -62,6 +63,7 @@ public class TransportOptions implements Cloneable {
     private boolean useWebSockets = DEFAULT_USE_WEBSOCKETS;
     private String webSocketPath;
     private int webSocketMaxFrameSize = DEFAULT_WEBSOCKET_MAX_FRAME_SIZE;
+    private boolean webSocketCompression = DEFAULT_WEBSOCKET_COMPRESSION;
 
     private final Map<String, String> webSocketHeaders = new HashMap<>();
 
@@ -454,6 +456,31 @@ public class TransportOptions implements Cloneable {
         return this;
     }
 
+    /**
+     * @return the configured value for the WebSocket compression support 
enabled flag.
+     */
+    public boolean webSocketCompression() {
+        return webSocketCompression;
+    }
+
+    /**
+     * Set to true to configure the transport layer as a WebSocket based 
connection that
+     * support compression of the WebSocket packets. This option simply allows 
the client
+     * to support compression if the server offers support but does not 
influence the server
+     * side, if the server does not offer support for compression of WS 
packets then this
+     * value has no affect on the WS packets and they remain uncompressed as 
if not enabled.
+     * (default is disabled).
+     *
+     * @param enabled
+     *                 should the transport support WebSocket compression if 
server offers it.
+     *
+     * @return this {@link TransportOptions} instance.
+     */
+    public TransportOptions webSocketCompression(boolean enabled) {
+        this.webSocketCompression = enabled;
+        return this;
+    }
+
     /**
      * Copy all configuration into the given {@link TransportOptions} from 
this instance.
      *
@@ -481,6 +508,7 @@ public class TransportOptions implements Cloneable {
         other.webSocketPath(webSocketPath());
         other.webSocketHeaders().putAll(webSocketHeaders);
         other.webSocketMaxFrameSize(webSocketMaxFrameSize());
+        other.webSocketCompression(webSocketCompression());
 
         return other;
     }
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java
index f150a2c9..5d3aae6c 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java
@@ -47,6 +47,7 @@ import 
io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
 import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketVersion;
+import 
io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.ScheduledFuture;
@@ -151,6 +152,9 @@ public class WebSocketTransport extends TcpTransport {
     protected void addAdditionalHandlers(ChannelPipeline pipeline) {
         pipeline.addLast(new HttpClientCodec());
         pipeline.addLast(new HttpObjectAggregator(8192));
+        if (options.webSocketCompression()) {
+            pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
+        }
     }
 
     @Override
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java
index 0c7fb599..62c0097d 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java
@@ -49,6 +49,7 @@ import 
io.netty5.handler.codec.http.websocketx.WebSocketClientHandshaker;
 import 
io.netty5.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
 import io.netty5.handler.codec.http.websocketx.WebSocketFrame;
 import io.netty5.handler.codec.http.websocketx.WebSocketVersion;
+import 
io.netty5.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
 import io.netty5.util.concurrent.Future;
 import io.netty5.util.concurrent.FutureListener;
 
@@ -98,6 +99,9 @@ public class WebSocketTransport extends TcpTransport {
     protected void addAdditionalHandlers(ChannelPipeline pipeline) {
         pipeline.addLast(new HttpClientCodec());
         pipeline.addLast(new HttpObjectAggregator<DefaultHttpContent>(8192));
+        if (options.webSocketCompression()) {
+            pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
+        }
     }
 
     @Override
@@ -169,7 +173,6 @@ public class WebSocketTransport extends TcpTransport {
             super.channelActive(context);
         }
 
-        @SuppressWarnings("resource")
         @Override
         protected void messageReceived(ChannelHandlerContext ctx, Object 
message) throws Exception {
             LOG.trace("New data read: incoming: {}", message);
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/WsConnectionTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/WsConnectionTest.java
index caa1546c..92aed113 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/WsConnectionTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/WsConnectionTest.java
@@ -16,15 +16,23 @@
  */
 package org.apache.qpid.protonj2.client.impl;
 
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 import java.net.URI;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.qpid.protonj2.client.Client;
 import org.apache.qpid.protonj2.client.Connection;
 import org.apache.qpid.protonj2.client.ConnectionOptions;
+import org.apache.qpid.protonj2.client.DeliveryMode;
+import org.apache.qpid.protonj2.client.Message;
+import org.apache.qpid.protonj2.client.Sender;
+import org.apache.qpid.protonj2.client.SenderOptions;
+import org.apache.qpid.protonj2.client.Session;
+import org.apache.qpid.protonj2.client.Tracker;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServerOptions;
@@ -88,4 +96,59 @@ public class WsConnectionTest extends ConnectionTest {
             peer.waitForScriptToCompleteIgnoreErrors();
         }
     }
+
+    @Test
+    public void testSendMessageWithLargeStringBodyWithCompressionEnabled() 
throws Exception {
+        final int BODY_SIZE = 16384;
+
+        final String payload = new String("A").repeat(BODY_SIZE);
+
+        try (ProtonTestServer peer = new 
ProtonTestServer(testServerOptions().setWebSocketCompression(true))) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofSender().respond();
+            peer.remoteFlow().withLinkCredit(10).queue();
+            peer.expectAttach().respond();  // Open a receiver to ensure 
sender link has processed
+            peer.expectFlow();              // the inbound flow frame we sent 
previously before send.
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Sender test started, peer listening on: {}", remoteURI);
+
+            ConnectionOptions connectOptions = new ConnectionOptions();
+            connectOptions.transportOptions().webSocketCompression(true);
+            connectOptions.transportOptions().useWebSockets(true);
+            connectOptions.traceFrames(true);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort(), connectOptions).openFuture().get();
+
+            Session session = connection.openSession().openFuture().get();
+            SenderOptions options = new 
SenderOptions().deliveryMode(DeliveryMode.AT_MOST_ONCE);
+            Sender sender = session.openSender("test-qos", options);
+
+            // Gates send on remote flow having been sent and received
+            session.openReceiver("dummy").openFuture().get();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.expectTransfer().withMessage().withValue(payload);
+            peer.expectDetach().respond();
+            peer.expectClose().respond();
+
+            final Message<String> message = Message.create(payload);
+            final Tracker tracker = sender.send(message);
+
+            assertNotNull(tracker);
+            assertNotNull(tracker.settlementFuture().isDone());
+            assertNotNull(tracker.settlementFuture().get().settled());
+
+            sender.closeAsync().get(10, TimeUnit.SECONDS);
+
+            connection.closeAsync().get(10, TimeUnit.SECONDS);
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
 }
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java
index c02f890a..6164acb9 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java
@@ -61,6 +61,7 @@ import 
io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
 import 
io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler.HandshakeComplete;
+import 
io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
 import io.netty.handler.ssl.SslHandler;
@@ -112,6 +113,10 @@ public abstract class NettyServer implements AutoCloseable 
{
         return options.useWebSockets();
     }
 
+    public boolean isUseWebSocketCompression() {
+        return options.webSocketCompression();
+    }
+
     public String getWebSocketPath() {
         return webSocketPath;
     }
@@ -210,9 +215,16 @@ public abstract class NettyServer implements AutoCloseable 
{
                         ch.pipeline().addLast(sslHandler);
                     }
 
+                    if (options.traceBytes()) {
+                        ch.pipeline().addLast("logger", new 
LoggingHandler(getClass()));
+                    }
+
                     if (isWebSocketServer()) {
                         ch.pipeline().addLast(new HttpServerCodec());
                         ch.pipeline().addLast(new HttpObjectAggregator(65536));
+                        if (isUseWebSocketCompression()) {
+                            ch.pipeline().addLast(new 
WebSocketServerCompressionHandler());
+                        }
                         ch.pipeline().addLast(new 
WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true, maxFrameSize));
                     }
 
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
index 3d17e55f..229e553a 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
@@ -1167,6 +1167,10 @@ public class TcpTransportTest extends 
ImperativeClientTestCase {
         return createEchoServer(createServerTransportOptions(), options, 
needClientAuth);
     }
 
+    protected final NettyEchoServer createEchoServer(TransportOptions options) 
{
+        return new NettyEchoServer(options, createServerSSLOptions(), false);
+    }
+
     protected final NettyEchoServer createEchoServer(TransportOptions options, 
SslOptions sslOptions, boolean needClientAuth) {
         return new NettyEchoServer(options, sslOptions, needClientAuth);
     }
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransportTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransportTest.java
index 7927d11c..899aa85c 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransportTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransportTest.java
@@ -351,6 +351,56 @@ public class WebSocketTransportTest extends 
TcpTransportTest {
         }
     }
 
+    @Test
+    public void 
testTransportConnectionDoesNotDropWhenServerAndClientUseCompressionWithLargePayloads()
 throws Exception {
+        final int FRAME_SIZE = 16384; // This value would exceed the set max 
frame size without compression.
+
+        final ProtonBuffer sendBuffer = allocator.allocate(FRAME_SIZE);
+        for (int i = 0; i < FRAME_SIZE; ++i) {
+            sendBuffer.writeByte((byte) 'A');
+        }
+
+        try (NettyEchoServer server = 
createEchoServer(createServerTransportOptions().webSocketCompression(true).traceBytes(true)))
 {
+            // Server won't accept the data as it's to large and will close 
the connection.
+            server.setMaxFrameSize(FRAME_SIZE / 2);
+            server.start();
+
+            final int port = server.getServerPort();
+
+            List<Transport> transports = new ArrayList<>();
+
+            final Transport transport = 
createTransport(createTransportOptions().webSocketMaxFrameSize(FRAME_SIZE)
+                                                                               
 .webSocketCompression(true), createSSLOptions());
+
+            assertTrue(transport instanceof WebSocketTransport);
+
+            try {
+                // Transport allows bigger frames in so that server is the one 
causing the failure.
+                transport.connect(HOSTNAME, port, testListener).awaitConnect();
+                transports.add(transport);
+                transport.writeAndFlush(sendBuffer.copy());
+            } catch (Exception e) {
+                fail("Should have connected to the server at " + HOSTNAME + 
":" + port + " but got exception: " + e);
+            }
+
+            assertTrue(Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisfied() throws Exception {
+                    try {
+                        transport.writeAndFlush(sendBuffer.copy());
+                    } catch (IOException e) {
+                        LOG.info("Transport send caught error:", e);
+                        return false;
+                    }
+
+                    return true;
+                }
+            }, 10000, 10), "Transport should not have lost connection");
+
+            transport.close();
+        }
+    }
+
     @Test
     public void testConfiguredHttpHeadersArriveAtServer() throws Exception {
         try (NettyEchoServer server = createEchoServer()) {
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/NettyServer.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/NettyServer.java
index 2a3bbd85..ce3a75be 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/NettyServer.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/NettyServer.java
@@ -61,6 +61,7 @@ import 
io.netty5.handler.codec.http.websocketx.ContinuationWebSocketFrame;
 import io.netty5.handler.codec.http.websocketx.WebSocketFrame;
 import 
io.netty5.handler.codec.http.websocketx.WebSocketServerHandshakeCompletionEvent;
 import io.netty5.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import 
io.netty5.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
 import io.netty5.handler.logging.LogLevel;
 import io.netty5.handler.logging.LoggingHandler;
 import io.netty5.handler.ssl.SslHandler;
@@ -112,6 +113,10 @@ public abstract class NettyServer implements AutoCloseable 
{
         return options.useWebSockets();
     }
 
+    public boolean isUseWebSocketCompression() {
+        return options.webSocketCompression();
+    }
+
     public String getWebSocketPath() {
         return webSocketPath;
     }
@@ -210,9 +215,16 @@ public abstract class NettyServer implements AutoCloseable 
{
                         ch.pipeline().addLast(sslHandler);
                     }
 
+                    if (options.traceBytes()) {
+                        ch.pipeline().addLast("logger", new 
LoggingHandler(getClass()));
+                    }
+
                     if (isWebSocketServer()) {
                         ch.pipeline().addLast(new HttpServerCodec());
                         ch.pipeline().addLast(new 
HttpObjectAggregator<DefaultHttpContent>(65536));
+                        if (isUseWebSocketCompression()) {
+                            ch.pipeline().addLast(new 
WebSocketServerCompressionHandler());
+                        }
                         ch.pipeline().addLast(new 
WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true, maxFrameSize));
                     }
 
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/TcpTransportTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/TcpTransportTest.java
index 62f33bfd..3999bcb3 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/TcpTransportTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/TcpTransportTest.java
@@ -1095,6 +1095,10 @@ public class TcpTransportTest extends 
ImperativeClientTestCase {
         return createEchoServer(createServerTransportOptions(), options, 
needClientAuth);
     }
 
+    protected final NettyEchoServer createEchoServer(TransportOptions options) 
{
+        return new NettyEchoServer(options, createServerSSLOptions(), false);
+    }
+
     protected final NettyEchoServer createEchoServer(TransportOptions options, 
SslOptions sslOptions, boolean needClientAuth) {
         return new NettyEchoServer(options, sslOptions, needClientAuth);
     }
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransportTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransportTest.java
index a8afb0d2..8b850b28 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransportTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransportTest.java
@@ -350,6 +350,56 @@ public class WebSocketTransportTest extends 
TcpTransportTest {
         }
     }
 
+    @Test
+    public void 
testTransportConnectionDoesNotDropWhenServerAndClientUseCompressionWithLargePayloads()
 throws Exception {
+        final int FRAME_SIZE = 16384; // This value would exceed the set max 
frame size without compression.
+
+        final ProtonBuffer sendBuffer = allocator.allocate(FRAME_SIZE);
+        for (int i = 0; i < FRAME_SIZE; ++i) {
+            sendBuffer.writeByte((byte) 'A');
+        }
+
+        try (NettyEchoServer server = 
createEchoServer(createServerTransportOptions().webSocketCompression(true).traceBytes(true)))
 {
+            // Server won't accept the data as it's to large and will close 
the connection.
+            server.setMaxFrameSize(FRAME_SIZE / 2);
+            server.start();
+
+            final int port = server.getServerPort();
+
+            List<Transport> transports = new ArrayList<>();
+
+            final Transport transport = 
createTransport(createTransportOptions().webSocketMaxFrameSize(FRAME_SIZE)
+                                                                               
 .webSocketCompression(true), createSSLOptions());
+
+            assertTrue(transport instanceof WebSocketTransport);
+
+            try {
+                // Transport allows bigger frames in so that server is the one 
causing the failure.
+                transport.connect(HOSTNAME, port, testListener).awaitConnect();
+                transports.add(transport);
+                transport.writeAndFlush(sendBuffer.copy());
+            } catch (Exception e) {
+                fail("Should have connected to the server at " + HOSTNAME + 
":" + port + " but got exception: " + e);
+            }
+
+            assertTrue(Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisfied() throws Exception {
+                    try {
+                        transport.writeAndFlush(sendBuffer.copy());
+                    } catch (IOException e) {
+                        LOG.info("Transport send caught error:", e);
+                        return false;
+                    }
+
+                    return true;
+                }
+            }, 10000, 10), "Transport should not have lost connection");
+
+            transport.close();
+        }
+    }
+
     @Test
     public void testConfiguredHttpHeadersArriveAtServer() throws Exception {
         try (NettyEchoServer server = createEchoServer()) {
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
index 97c59e28..e4249afc 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
@@ -76,6 +76,10 @@ public class ProtonTestClient extends ProtonTestPeer 
implements AutoCloseable {
         return driver;
     }
 
+    public boolean isWSCompressionActive() {
+        return client.isWSCompressionActive();
+    }
+
     @Override
     protected void processConnectionEstablished() {
         LOG.trace("AMQP Client connected to remote.");
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientOptions.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientOptions.java
index c40cf207..bac99f04 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientOptions.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientOptions.java
@@ -49,6 +49,7 @@ public class ProtonTestClientOptions implements Cloneable {
     public static final boolean DEFAULT_USE_WEBSOCKETS = false;
     public static final boolean DEFAULT_FRAGMENT_WEBSOCKET_WRITES = false;
     public static final String DEFAULT_WEBSOCKET_PATH = "/";
+    public static final boolean DEFAULT_WEBSOCKET_COMPRESSION = false;
     public static final int DEFAULT_WEBSOCKET_MAX_FRAME_SIZE = 65535;
     public static final boolean DEFAULT_SECURE_SERVER = false;
     public static final boolean DEFAULT_NEEDS_CLIENT_AUTH = false;
@@ -75,6 +76,7 @@ public class ProtonTestClientOptions implements Cloneable {
     private boolean fragmentWebSocketWrites = 
DEFAULT_FRAGMENT_WEBSOCKET_WRITES;
     private String webSocketPath = DEFAULT_WEBSOCKET_PATH;
     private int webSocketMaxFrameSize = DEFAULT_WEBSOCKET_MAX_FRAME_SIZE;
+    private boolean webSocketCompression = DEFAULT_WEBSOCKET_COMPRESSION;
 
     private boolean secure = DEFAULT_SECURE_SERVER;
     private boolean needClientAuth = DEFAULT_NEEDS_CLIENT_AUTH;
@@ -554,6 +556,14 @@ public class ProtonTestClientOptions implements Cloneable {
         this.webSocketMaxFrameSize = webSocketMaxFrameSize;
     }
 
+    public boolean isWebSocketCompression() {
+        return webSocketCompression;
+    }
+
+    public void setWebSocketCompression(boolean enabled) {
+        this.webSocketCompression = enabled;
+    }
+
     protected ProtonTestClientOptions copyOptions(ProtonTestClientOptions 
copy) {
         copy.setReceiveBufferSize(getReceiveBufferSize());
         copy.setSendBufferSize(getSendBufferSize());
@@ -587,6 +597,7 @@ public class ProtonTestClientOptions implements Cloneable {
         copy.setFragmentWrites(isFragmentWrites());
         copy.setWebSocketPath(getWebSocketPath());
         copy.setWebSocketMaxFrameSize(getWebSocketMaxFrameSize());
+        copy.setWebSocketCompression(isWebSocketCompression());
 
         return copy;
     }
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
index b4ab29ce..9b7d6420 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
@@ -131,6 +131,7 @@ public class ProtonTestServer extends ProtonTestPeer {
      *
      * @return this test peer instance.
      */
+    @Override
     public ProtonTestPeer dropAfterLastHandler() {
         getDriver().addScriptedElement(new ConnectionDropAction(this));
         return this;
@@ -147,6 +148,7 @@ public class ProtonTestServer extends ProtonTestPeer {
      *
      * @return this test peer instance.
      */
+    @Override
     public ProtonTestPeer dropAfterLastHandler(int delay) {
         getDriver().addScriptedElement(new 
ConnectionDropAction(this).afterDelay(delay));
         return this;
@@ -176,6 +178,10 @@ public class ProtonTestServer extends ProtonTestPeer {
         return server.getClientPort();
     }
 
+    public boolean isWSCompressionActive() {
+        return server.isWSCompressionActive();
+    }
+
     @Override
     public AMQPTestDriver getDriver() {
         return driver;
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServerOptions.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServerOptions.java
index 06b85e75..23ebff32 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServerOptions.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServerOptions.java
@@ -48,6 +48,7 @@ public class ProtonTestServerOptions implements Cloneable {
     public static final int DEFAULT_LOCAL_PORT = 0;
     public static final boolean DEFAULT_USE_WEBSOCKETS = false;
     public static final boolean DEFAULT_FRAGMENT_WEBSOCKET_WRITES = false;
+    public static final boolean DEFAULT_WEBSOCKET_COMPRESSION = false;
     public static final boolean DEFAULT_SECURE_SERVER = false;
     public static final boolean DEFAULT_NEEDS_CLIENT_AUTH = false;
 
@@ -70,6 +71,7 @@ public class ProtonTestServerOptions implements Cloneable {
     private int localPort = DEFAULT_LOCAL_PORT;
     private boolean traceBytes = DEFAULT_TRACE_BYTES;
     private boolean useWebSockets = DEFAULT_USE_WEBSOCKETS;
+    private boolean webSocketCompression = DEFAULT_WEBSOCKET_COMPRESSION;
     private boolean fragmentWebSocketWrites = 
DEFAULT_FRAGMENT_WEBSOCKET_WRITES;
 
     private boolean secure = DEFAULT_SECURE_SERVER;
@@ -612,6 +614,15 @@ public class ProtonTestServerOptions implements Cloneable {
         return this;
     }
 
+    public boolean isWebSocketCompression() {
+        return webSocketCompression;
+    }
+
+    public ProtonTestServerOptions setWebSocketCompression(boolean enabled) {
+        this.webSocketCompression = enabled;
+        return this;
+    }
+
     protected ProtonTestServerOptions copyOptions(ProtonTestServerOptions 
copy) {
         copy.setReceiveBufferSize(getReceiveBufferSize());
         copy.setSendBufferSize(getSendBufferSize());
@@ -643,6 +654,7 @@ public class ProtonTestServerOptions implements Cloneable {
         copy.setNeedClientAuth(isNeedClientAuth());
         copy.setUseWebSockets(isUseWebSockets());
         copy.setFragmentWrites(isFragmentWrites());
+        copy.setWebSocketCompression(isWebSocketCompression());
 
         return copy;
     }
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyClient.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyClient.java
index b90af803..550943da 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyClient.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyClient.java
@@ -66,4 +66,9 @@ public interface NettyClient extends AutoCloseable {
      */
     URI getRemoteURI();
 
+    /**
+     * @return true if the connected client has WS compression activated by 
the server.
+     */
+    boolean isWSCompressionActive();
+
 }
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyServer.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyServer.java
index 99d775c8..b5b91c51 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyServer.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyServer.java
@@ -92,6 +92,11 @@ public interface NettyServer extends AutoCloseable {
      */
     int getClientPort();
 
+    /**
+     * @return true if a connected client has negotiated WS compression.
+     */
+    boolean isWSCompressionActive();
+
     /**
      * @return has the SSL handshake for a client completed successfully.
      */
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java
index 3bb61d50..b31c6e90 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java
@@ -38,6 +38,7 @@ import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
@@ -52,8 +53,10 @@ import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.http.DefaultHttpHeaders;
+import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpClientCodec;
+import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
@@ -65,6 +68,7 @@ import 
io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
 import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketVersion;
+import 
io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
 import io.netty.handler.logging.LoggingHandler;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Future;
@@ -88,6 +92,8 @@ public final class Netty4Client implements NettyClient {
     private Channel channel;
     private String host;
     private int port;
+    private boolean wsCompressionRequest;
+    private boolean wsCompressionResponse;
     protected volatile IOException failureCause;
     private final ProtonTestClientOptions options;
     private volatile SslHandler sslHandler;
@@ -223,6 +229,15 @@ public final class Netty4Client implements NettyClient {
         return options.isSecure();
     }
 
+    @Override
+    public boolean isWSCompressionActive() {
+        if (channel == null || !channel.isActive()) {
+            throw new IllegalStateException("Channel is not connected or has 
closed");
+        }
+
+        return wsCompressionRequest && wsCompressionResponse;
+    }
+
     @Override
     public URI getRemoteURI() {
         if (host != null) {
@@ -400,6 +415,41 @@ public final class Netty4Client implements NettyClient {
         }
     }
 
+    private class ClientWSCompressionObserver extends ChannelDuplexHandler  {
+
+        final String WS_EXTENSIONS_SECTION = "sec-websocket-extensions";
+        final String WS_PERMESSAGE_DEFLATE = "permessage-deflate";
+        final String WS_UPGRADE = "upgrade";
+
+        @Override
+        public void channelRead(ChannelHandlerContext ctx, Object message) {
+            if (message instanceof FullHttpResponse) {
+                FullHttpResponse request = (FullHttpResponse) message;
+                HttpHeaders headers = request.headers();
+
+                if (headers.contains(WS_UPGRADE) && 
headers.contains(WS_EXTENSIONS_SECTION)) {
+                    wsCompressionRequest = 
headers.get(WS_EXTENSIONS_SECTION).contains(WS_PERMESSAGE_DEFLATE);
+                }
+            }
+
+            ctx.fireChannelRead(message);
+        }
+
+        @Override
+        public void write(ChannelHandlerContext context, Object message, 
ChannelPromise promise) throws Exception {
+            if (message instanceof FullHttpRequest) {
+                FullHttpRequest response = (FullHttpRequest) message;
+                HttpHeaders headers = response.headers();
+
+                if (headers.contains(WS_UPGRADE) && 
headers.contains(WS_EXTENSIONS_SECTION)) {
+                    wsCompressionResponse = 
headers.get(WS_EXTENSIONS_SECTION).contains(WS_PERMESSAGE_DEFLATE);
+                }
+            }
+
+            context.write(message, promise);
+        }
+    }
+
     //----- Internal Client implementation API
 
     protected ChannelHandler getClientHandler() {
@@ -455,6 +505,10 @@ public final class Netty4Client implements NettyClient {
         if (options.isUseWebSockets()) {
             channel.pipeline().addLast(new HttpClientCodec());
             channel.pipeline().addLast(new HttpObjectAggregator(8192));
+            if (options.isWebSocketCompression()) {
+                channel.pipeline().addLast(new ClientWSCompressionObserver());
+                
channel.pipeline().addLast(WebSocketClientCompressionHandler.INSTANCE);
+            }
         }
 
         channel.pipeline().addLast(new NettyClientOutboundHandler());
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java
index 64df5b0a..94a891dc 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java
@@ -43,6 +43,7 @@ import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
@@ -59,6 +60,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpServerCodec;
 import io.netty.handler.codec.http.HttpUtil;
@@ -67,6 +69,7 @@ import 
io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
 import 
io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler.HandshakeComplete;
+import 
io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
 import io.netty.handler.ssl.SslHandler;
@@ -93,6 +96,8 @@ public final class Netty4Server implements NettyServer {
     private final ProtonTestServerOptions options;
     private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
     private String webSocketPath = WEBSOCKET_PATH;
+    private boolean wsCompressionRequest;
+    private boolean wsCompressionResponse;
     private volatile SslHandler sslHandler;
     private volatile HandshakeComplete handshakeComplete;
     private final CountDownLatch handshakeCompletion = new CountDownLatch(1);
@@ -141,6 +146,12 @@ public final class Netty4Server implements NettyServer {
         return (((InetSocketAddress) clientChannel.remoteAddress()).getPort());
     }
 
+    @Override
+    public boolean isWSCompressionActive() {
+        Objects.requireNonNull(clientChannel);
+        return wsCompressionRequest && wsCompressionResponse;
+    }
+
     @Override
     public boolean isPeerVerified() {
         try {
@@ -264,9 +275,17 @@ public final class Netty4Server implements NettyServer {
                         ch.pipeline().addLast(sslHandler = 
SslSupport.createServerSslHandler(null, options));
                     }
 
+                    if (options.isTraceBytes()) {
+                        ch.pipeline().addLast(new LoggingHandler(getClass()));
+                    }
+
                     if (options.isUseWebSockets()) {
                         ch.pipeline().addLast(new HttpServerCodec());
                         ch.pipeline().addLast(new HttpObjectAggregator(65536));
+                        if (options.isWebSocketCompression()) {
+                            ch.pipeline().addLast(new 
ServerWSCompressionObserver());
+                            ch.pipeline().addLast(new 
WebSocketServerCompressionHandler());
+                        }
                         ch.pipeline().addLast(new 
WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true, maxFrameSize));
                     }
 
@@ -513,6 +532,41 @@ public final class Netty4Server implements NettyServer {
         }
     }
 
+    private class ServerWSCompressionObserver extends ChannelDuplexHandler  {
+
+        final String WS_EXTENSIONS_SECTION = "sec-websocket-extensions";
+        final String WS_PERMESSAGE_DEFLATE = "permessage-deflate";
+        final String WS_UPGRADE = "upgrade";
+
+        @Override
+        public void channelRead(ChannelHandlerContext ctx, Object message) {
+            if (message instanceof FullHttpRequest) {
+                FullHttpRequest request = (FullHttpRequest) message;
+                HttpHeaders headers = request.headers();
+
+                if (headers.contains(WS_UPGRADE) && 
headers.contains(WS_EXTENSIONS_SECTION)) {
+                    wsCompressionRequest = 
headers.get(WS_EXTENSIONS_SECTION).contains(WS_PERMESSAGE_DEFLATE);
+                }
+            }
+
+            ctx.fireChannelRead(message);
+        }
+
+        @Override
+        public void write(ChannelHandlerContext context, Object message, 
ChannelPromise promise) throws Exception {
+            if (message instanceof FullHttpResponse) {
+                FullHttpResponse response = (FullHttpResponse) message;
+                HttpHeaders headers = response.headers();
+
+                if (headers.contains(WS_UPGRADE) && 
headers.contains(WS_EXTENSIONS_SECTION)) {
+                    wsCompressionResponse = 
headers.get(WS_EXTENSIONS_SECTION).contains(WS_PERMESSAGE_DEFLATE);
+                }
+            }
+
+            context.write(message, promise);
+        }
+    }
+
     private static void sendHttpResponse(ChannelHandlerContext ctx, 
FullHttpRequest request, FullHttpResponse response) {
         // Generate an error page if response getStatus code is not OK (200).
         if (response.status().code() != 200) {
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Client.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Client.java
index 643374c8..b17209e2 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Client.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Client.java
@@ -51,6 +51,7 @@ import io.netty5.channel.SimpleChannelInboundHandler;
 import io.netty5.channel.nio.NioHandler;
 import io.netty5.channel.socket.nio.NioSocketChannel;
 import io.netty5.handler.codec.http.DefaultHttpContent;
+import io.netty5.handler.codec.http.FullHttpRequest;
 import io.netty5.handler.codec.http.FullHttpResponse;
 import io.netty5.handler.codec.http.HttpClientCodec;
 import io.netty5.handler.codec.http.HttpObjectAggregator;
@@ -65,6 +66,7 @@ import 
io.netty5.handler.codec.http.websocketx.WebSocketClientHandshaker;
 import 
io.netty5.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
 import io.netty5.handler.codec.http.websocketx.WebSocketFrame;
 import io.netty5.handler.codec.http.websocketx.WebSocketVersion;
+import 
io.netty5.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
 import io.netty5.handler.logging.LoggingHandler;
 import io.netty5.handler.ssl.SslHandler;
 import io.netty5.util.concurrent.Future;
@@ -87,6 +89,8 @@ public final class Netty5Client implements NettyClient {
     private Channel channel;
     private String host;
     private int port;
+    private boolean wsCompressionRequest;
+    private boolean wsCompressionResponse;
     protected volatile IOException failureCause;
     private final ProtonTestClientOptions options;
     private volatile SslHandler sslHandler;
@@ -222,6 +226,15 @@ public final class Netty5Client implements NettyClient {
         return options.isSecure();
     }
 
+    @Override
+    public boolean isWSCompressionActive() {
+        if (channel == null || !channel.isActive()) {
+            throw new IllegalStateException("Channel is not connected or has 
closed");
+        }
+
+        return wsCompressionRequest && wsCompressionResponse;
+    }
+
     @Override
     public URI getRemoteURI() {
         if (host != null) {
@@ -399,6 +412,41 @@ public final class Netty5Client implements NettyClient {
         }
     }
 
+    private class ClientWSCompressionObserverHandler extends 
ChannelHandlerAdapter  {
+
+        final String WS_EXTENSIONS_SECTION = "sec-websocket-extensions";
+        final String WS_PERMESSAGE_DEFLATE = "permessage-deflate";
+        final String WS_UPGRADE = "upgrade";
+
+        @Override
+        public void channelRead(ChannelHandlerContext ctx, Object message) {
+            if (message instanceof FullHttpResponse) {
+                FullHttpResponse response = (FullHttpResponse) message;
+                HttpHeaders headers = response.headers();
+
+                if (headers.contains(WS_UPGRADE) && 
headers.contains(WS_EXTENSIONS_SECTION)) {
+                    wsCompressionRequest = 
headers.get(WS_EXTENSIONS_SECTION).toString().contains(WS_PERMESSAGE_DEFLATE);
+                }
+            }
+
+            ctx.fireChannelRead(message);
+        }
+
+        @Override
+        public Future<Void> write(ChannelHandlerContext context, Object 
message) {
+            if (message instanceof FullHttpRequest) {
+                FullHttpRequest request = (FullHttpRequest) message;
+                HttpHeaders headers = request.headers();
+
+                if (headers.contains(WS_UPGRADE) && 
headers.contains(WS_EXTENSIONS_SECTION)) {
+                    wsCompressionRequest = 
headers.get(WS_EXTENSIONS_SECTION).toString().contains(WS_PERMESSAGE_DEFLATE);
+                }
+            }
+
+            return context.write(message);
+        }
+    }
+
     //----- Internal Client implementation API
 
     protected ChannelHandler getClientHandler() {
@@ -454,6 +502,10 @@ public final class Netty5Client implements NettyClient {
         if (options.isUseWebSockets()) {
             channel.pipeline().addLast(new HttpClientCodec());
             channel.pipeline().addLast(new 
HttpObjectAggregator<DefaultHttpContent>(8192));
+            if (options.isWebSocketCompression()) {
+                channel.pipeline().addLast(new 
ClientWSCompressionObserverHandler());
+                
channel.pipeline().addLast(WebSocketClientCompressionHandler.INSTANCE);
+            }
         }
 
         channel.pipeline().addLast(new NettyClientOutboundHandler());
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Server.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Server.java
index 0148b091..4e08ba49 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Server.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Server.java
@@ -61,11 +61,13 @@ import io.netty5.handler.codec.http.HttpResponseStatus;
 import io.netty5.handler.codec.http.HttpServerCodec;
 import io.netty5.handler.codec.http.HttpUtil;
 import io.netty5.handler.codec.http.HttpVersion;
+import io.netty5.handler.codec.http.headers.HttpHeaders;
 import io.netty5.handler.codec.http.websocketx.BinaryWebSocketFrame;
 import io.netty5.handler.codec.http.websocketx.ContinuationWebSocketFrame;
 import io.netty5.handler.codec.http.websocketx.WebSocketFrame;
 import 
io.netty5.handler.codec.http.websocketx.WebSocketServerHandshakeCompletionEvent;
 import io.netty5.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import 
io.netty5.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
 import io.netty5.handler.logging.LogLevel;
 import io.netty5.handler.logging.LoggingHandler;
 import io.netty5.handler.ssl.SslHandler;
@@ -92,6 +94,8 @@ public final class Netty5Server implements NettyServer {
     private final ProtonTestServerOptions options;
     private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
     private String webSocketPath = WEBSOCKET_PATH;
+    private boolean wsCompressionRequest;
+    private boolean wsCompressionResponse;
     private volatile SslHandler sslHandler;
     private volatile WebSocketServerHandshakeCompletionEvent handshakeComplete;
     private final CountDownLatch handshakeCompletion = new CountDownLatch(1);
@@ -139,6 +143,12 @@ public final class Netty5Server implements NettyServer {
         return (((InetSocketAddress) clientChannel.remoteAddress()).getPort());
     }
 
+    @Override
+    public boolean isWSCompressionActive() {
+        Objects.requireNonNull(clientChannel);
+        return wsCompressionRequest && wsCompressionResponse;
+    }
+
     @Override
     public boolean isPeerVerified() {
         try {
@@ -261,6 +271,10 @@ public final class Netty5Server implements NettyServer {
                     if (options.isUseWebSockets()) {
                         ch.pipeline().addLast(new HttpServerCodec());
                         ch.pipeline().addLast(new 
HttpObjectAggregator<DefaultHttpContent>(65536));
+                        if (options.isWebSocketCompression()) {
+                            ch.pipeline().addLast(new 
ServerWSCompressionObserverHandler());
+                            ch.pipeline().addLast(new 
WebSocketServerCompressionHandler());
+                        }
                         ch.pipeline().addLast(new 
WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true, maxFrameSize));
                     }
 
@@ -510,6 +524,41 @@ public final class Netty5Server implements NettyServer {
         }
     }
 
+    private class ServerWSCompressionObserverHandler extends 
ChannelHandlerAdapter  {
+
+        final String WS_EXTENSIONS_SECTION = "sec-websocket-extensions";
+        final String WS_PERMESSAGE_DEFLATE = "permessage-deflate";
+        final String WS_UPGRADE = "upgrade";
+
+        @Override
+        public void channelRead(ChannelHandlerContext ctx, Object message) {
+            if (message instanceof FullHttpRequest) {
+                FullHttpRequest request = (FullHttpRequest) message;
+                HttpHeaders headers = request.headers();
+
+                if (headers.contains(WS_UPGRADE) && 
headers.contains(WS_EXTENSIONS_SECTION)) {
+                    wsCompressionRequest = 
headers.get(WS_EXTENSIONS_SECTION).toString().contains(WS_PERMESSAGE_DEFLATE);
+                }
+            }
+
+            ctx.fireChannelRead(message);
+        }
+
+        @Override
+        public Future<Void> write(ChannelHandlerContext context, Object 
message) {
+            if (message instanceof FullHttpResponse) {
+                FullHttpResponse response = (FullHttpResponse) message;
+                HttpHeaders headers = response.headers();
+
+                if (headers.contains(WS_UPGRADE) && 
headers.contains(WS_EXTENSIONS_SECTION)) {
+                    wsCompressionResponse = 
headers.get(WS_EXTENSIONS_SECTION).toString().contains(WS_PERMESSAGE_DEFLATE);
+                }
+            }
+
+            return context.write(message);
+        }
+    }
+
     private static void sendHttpResponse(ChannelHandlerContext ctx, 
FullHttpRequest request, FullHttpResponse response) {
         // Generate an error page if response getStatus code is not OK (200).
         if (response.status().code() != 200) {
diff --git 
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestWSClientTest.java
 
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestWSClientTest.java
index 458b4761..cc9942b5 100644
--- 
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestWSClientTest.java
+++ 
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestWSClientTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.qpid.protonj2.test.driver;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 import java.net.URI;
 import java.util.concurrent.TimeUnit;
 
@@ -95,4 +97,63 @@ class ProtonTestWSClientTest extends TestPeerTestsBase {
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testClientAndServerEnabledWSCompressionCanConnect() throws 
Exception {
+        doTestClientAndServerWSCompressionNegotiation(true, true);
+    }
+
+    @Test
+    public void testClientAndServerDisableWSCompressionCanConnect() throws 
Exception {
+        doTestClientAndServerWSCompressionNegotiation(false, false);
+    }
+
+    @Test
+    public void testClientEnablesAndServerDisableWSCompressionCanConnect() 
throws Exception {
+        doTestClientAndServerWSCompressionNegotiation(true, false);
+    }
+
+    @Test
+    public void testClientDisablesAndServerEnablesWSCompressionCanConnect() 
throws Exception {
+        doTestClientAndServerWSCompressionNegotiation(false, true);
+    }
+
+    private void doTestClientAndServerWSCompressionNegotiation(boolean 
serverWSCompression, boolean clientWSCompression) throws Exception {
+        ProtonTestServerOptions serverOpts = new ProtonTestServerOptions();
+        serverOpts.setUseWebSockets(true);
+        serverOpts.setWebSocketCompression(serverWSCompression);
+
+        ProtonTestClientOptions clientOpts = new ProtonTestClientOptions();
+        clientOpts.setUseWebSockets(true);
+        clientOpts.setWebSocketCompression(clientWSCompression);
+
+        try (ProtonTestServer peer = new ProtonTestServer(serverOpts)) {
+            peer.expectAMQPHeader().respondWithAMQPHeader();
+            peer.expectOpen().respond();
+            peer.expectClose().respond();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            ProtonTestClient client = new ProtonTestClient(clientOpts);
+
+            client.connect(remoteURI.getHost(), remoteURI.getPort());
+            client.expectAMQPHeader();
+            client.expectOpen();
+            client.expectClose();
+            client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+            client.remoteOpen().now();
+            client.remoteClose().now();
+            client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            assertEquals(serverWSCompression && clientWSCompression, 
peer.isWSCompressionActive());
+            assertEquals(serverWSCompression && clientWSCompression, 
client.isWSCompressionActive());
+
+            client.close();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to