This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 54d10d37 PROTON-2571 Flush netty only on read completion
54d10d37 is described below

commit 54d10d37597782c461046c0c8f0123237b8b3d53
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Jun 27 11:12:26 2022 -0400

    PROTON-2571 Flush netty only on read completion
    
    During a read operation that might cause more than one channel
    write we can save the overhead of multople forced flush operations
    by hooking the channel read complete event point to initiate a
    final flush operation for all write inside the read event.
---
 .../apache/qpid/protonj2/client/impl/ClientConnection.java    | 10 ++++++++--
 .../org/apache/qpid/protonj2/client/impl/ClientSender.java    | 11 +++++------
 .../apache/qpid/protonj2/client/impl/ClientStreamSender.java  |  8 +++++---
 .../qpid/protonj2/client/impl/ClientTransportListener.java    |  7 ++++++-
 .../apache/qpid/protonj2/client/transport/TcpTransport.java   |  5 +++++
 5 files changed, 29 insertions(+), 12 deletions(-)

diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
index a1546ebe..09f59556 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
@@ -615,8 +615,10 @@ public final class ClientConnection implements Connection {
         });
     }
 
-    void autoFlushOff() {
+    boolean autoFlushOff() {
+        boolean oldState = autoFlush;
         autoFlush = false;
+        return oldState;
     }
 
     void autoFlushOn() {
@@ -781,6 +783,10 @@ public final class ClientConnection implements Connection {
             protonConnection.close();
         } catch (Exception ignore) {}
 
+        try {
+            transport.flush();
+        } catch (Exception ignore) {}
+
         try {
             transport.close();
         } catch (Exception ignore) {}
@@ -976,7 +982,7 @@ public final class ClientConnection implements Connection {
             reconnectAttempts++;
             transport = ioContext.newTransport();
             LOG.trace("Connection {} Attempting connection to remote {}:{}", 
getId(), location.getHost(), location.getPort());
-            transport.connect(location.getHost(), location.getPort(), new 
ClientTransportListener(engine));
+            transport.connect(location.getHost(), location.getPort(), new 
ClientTransportListener(this, engine));
         } catch (Throwable error) {
             
engine.engineFailed(ClientExceptionSupport.createOrPassthroughFatal(error));
         }
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
index 7ff4849b..3d73c99a 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
@@ -337,14 +337,11 @@ public final class ClientSender extends 
ClientSenderLinkType<Sender> implements
             if (delivery == null) {
                 delivery = sender.protonLink().next();
                 delivery.setLinkedResource(sender.createTracker(delivery));
-            }
-
-            if (delivery.getTransferCount() == 0) {
                 delivery.setMessageFormat(messageFormat);
                 delivery.disposition(state, settled);
             }
 
-            sender.connection().autoFlushOff();
+            boolean wasAutoFlushOn = sender.connection().autoFlushOff();
             try {
                 delivery.streamBytes(payload, true);
                 if (payload != null && payload.isReadable()) {
@@ -352,9 +349,11 @@ public final class ClientSender extends 
ClientSenderLinkType<Sender> implements
                 } else {
                     succeeded();
                 }
-                sender.connection().flush();
             } finally {
-                sender.connection().autoFlushOn();
+                if (wasAutoFlushOn) {
+                    sender.connection().flush();
+                    sender.connection().autoFlushOn();
+                }
             }
         }
 
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
index 511f9028..2e0ef82a 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
@@ -518,7 +518,7 @@ public final class ClientStreamSender extends 
ClientSenderLinkType<StreamSender>
                 delivery.abort();
                 succeeded();
             } else {
-                sender.connection().autoFlushOff();
+                boolean wasAutoFlushOn = sender.connection().autoFlushOff();
                 try {
                     delivery.streamBytes(payload, complete);
                     if (payload != null && payload.isReadable()) {
@@ -526,9 +526,11 @@ public final class ClientStreamSender extends 
ClientSenderLinkType<StreamSender>
                     } else {
                         succeeded();
                     }
-                    sender.connection().flush();
                 } finally {
-                    sender.connection().autoFlushOn();
+                    if (wasAutoFlushOn) {
+                        sender.connection().flush();
+                        sender.connection().autoFlushOn();
+                    }
                 }
             }
         }
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransportListener.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransportListener.java
index 84049dfe..0345e5a5 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransportListener.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransportListener.java
@@ -34,9 +34,11 @@ final class ClientTransportListener implements 
TransportListener {
     private static final Logger LOG = 
LoggerFactory.getLogger(ClientTransportListener.class);
 
     private final Engine engine;
+    private final ClientConnection connection;
 
-    ClientTransportListener(Engine engine) {
+    ClientTransportListener(ClientConnection connection, Engine engine) {
         this.engine = engine;
+        this.connection = connection;
     }
 
     @Override
@@ -52,6 +54,7 @@ final class ClientTransportListener implements 
TransportListener {
     @Override
     public void transportRead(ProtonBuffer incoming) {
         try {
+            connection.autoFlushOff();
             do {
                 engine.ingest(incoming);
             } while (incoming.isReadable() && engine.isWritable());
@@ -59,6 +62,8 @@ final class ClientTransportListener implements 
TransportListener {
         } catch (EngineStateException e) {
             LOG.warn("Caught problem during incoming data processing: {}", 
e.getMessage(), e);
             
engine.engineFailed(ClientExceptionSupport.createOrPassthroughFatal(e));
+        } finally {
+            connection.autoFlushOn();
         }
     }
 
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/TcpTransport.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/TcpTransport.java
index a1790751..119467cf 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/TcpTransport.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/TcpTransport.java
@@ -450,6 +450,11 @@ public class TcpTransport implements Transport {
             }
         }
 
+        @Override
+        public void channelReadComplete(ChannelHandlerContext ctx) throws 
Exception {
+            ctx.flush();
+        }
+
         @Override
         public void channelInactive(ChannelHandlerContext context) throws 
Exception {
             handleTransportFailure(context.channel(), new IOException("Remote 
closed connection unexpectedly"));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to