This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new 54d10d37 PROTON-2571 Flush netty only on read completion
54d10d37 is described below
commit 54d10d37597782c461046c0c8f0123237b8b3d53
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Jun 27 11:12:26 2022 -0400
PROTON-2571 Flush netty only on read completion
During a read operation that might cause more than one channel
write we can save the overhead of multople forced flush operations
by hooking the channel read complete event point to initiate a
final flush operation for all write inside the read event.
---
.../apache/qpid/protonj2/client/impl/ClientConnection.java | 10 ++++++++--
.../org/apache/qpid/protonj2/client/impl/ClientSender.java | 11 +++++------
.../apache/qpid/protonj2/client/impl/ClientStreamSender.java | 8 +++++---
.../qpid/protonj2/client/impl/ClientTransportListener.java | 7 ++++++-
.../apache/qpid/protonj2/client/transport/TcpTransport.java | 5 +++++
5 files changed, 29 insertions(+), 12 deletions(-)
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
index a1546ebe..09f59556 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
@@ -615,8 +615,10 @@ public final class ClientConnection implements Connection {
});
}
- void autoFlushOff() {
+ boolean autoFlushOff() {
+ boolean oldState = autoFlush;
autoFlush = false;
+ return oldState;
}
void autoFlushOn() {
@@ -781,6 +783,10 @@ public final class ClientConnection implements Connection {
protonConnection.close();
} catch (Exception ignore) {}
+ try {
+ transport.flush();
+ } catch (Exception ignore) {}
+
try {
transport.close();
} catch (Exception ignore) {}
@@ -976,7 +982,7 @@ public final class ClientConnection implements Connection {
reconnectAttempts++;
transport = ioContext.newTransport();
LOG.trace("Connection {} Attempting connection to remote {}:{}",
getId(), location.getHost(), location.getPort());
- transport.connect(location.getHost(), location.getPort(), new
ClientTransportListener(engine));
+ transport.connect(location.getHost(), location.getPort(), new
ClientTransportListener(this, engine));
} catch (Throwable error) {
engine.engineFailed(ClientExceptionSupport.createOrPassthroughFatal(error));
}
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
index 7ff4849b..3d73c99a 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
@@ -337,14 +337,11 @@ public final class ClientSender extends
ClientSenderLinkType<Sender> implements
if (delivery == null) {
delivery = sender.protonLink().next();
delivery.setLinkedResource(sender.createTracker(delivery));
- }
-
- if (delivery.getTransferCount() == 0) {
delivery.setMessageFormat(messageFormat);
delivery.disposition(state, settled);
}
- sender.connection().autoFlushOff();
+ boolean wasAutoFlushOn = sender.connection().autoFlushOff();
try {
delivery.streamBytes(payload, true);
if (payload != null && payload.isReadable()) {
@@ -352,9 +349,11 @@ public final class ClientSender extends
ClientSenderLinkType<Sender> implements
} else {
succeeded();
}
- sender.connection().flush();
} finally {
- sender.connection().autoFlushOn();
+ if (wasAutoFlushOn) {
+ sender.connection().flush();
+ sender.connection().autoFlushOn();
+ }
}
}
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
index 511f9028..2e0ef82a 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
@@ -518,7 +518,7 @@ public final class ClientStreamSender extends
ClientSenderLinkType<StreamSender>
delivery.abort();
succeeded();
} else {
- sender.connection().autoFlushOff();
+ boolean wasAutoFlushOn = sender.connection().autoFlushOff();
try {
delivery.streamBytes(payload, complete);
if (payload != null && payload.isReadable()) {
@@ -526,9 +526,11 @@ public final class ClientStreamSender extends
ClientSenderLinkType<StreamSender>
} else {
succeeded();
}
- sender.connection().flush();
} finally {
- sender.connection().autoFlushOn();
+ if (wasAutoFlushOn) {
+ sender.connection().flush();
+ sender.connection().autoFlushOn();
+ }
}
}
}
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransportListener.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransportListener.java
index 84049dfe..0345e5a5 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransportListener.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransportListener.java
@@ -34,9 +34,11 @@ final class ClientTransportListener implements
TransportListener {
private static final Logger LOG =
LoggerFactory.getLogger(ClientTransportListener.class);
private final Engine engine;
+ private final ClientConnection connection;
- ClientTransportListener(Engine engine) {
+ ClientTransportListener(ClientConnection connection, Engine engine) {
this.engine = engine;
+ this.connection = connection;
}
@Override
@@ -52,6 +54,7 @@ final class ClientTransportListener implements
TransportListener {
@Override
public void transportRead(ProtonBuffer incoming) {
try {
+ connection.autoFlushOff();
do {
engine.ingest(incoming);
} while (incoming.isReadable() && engine.isWritable());
@@ -59,6 +62,8 @@ final class ClientTransportListener implements
TransportListener {
} catch (EngineStateException e) {
LOG.warn("Caught problem during incoming data processing: {}",
e.getMessage(), e);
engine.engineFailed(ClientExceptionSupport.createOrPassthroughFatal(e));
+ } finally {
+ connection.autoFlushOn();
}
}
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/TcpTransport.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/TcpTransport.java
index a1790751..119467cf 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/TcpTransport.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/TcpTransport.java
@@ -450,6 +450,11 @@ public class TcpTransport implements Transport {
}
}
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) throws
Exception {
+ ctx.flush();
+ }
+
@Override
public void channelInactive(ChannelHandlerContext context) throws
Exception {
handleTransportFailure(context.channel(), new IOException("Remote
closed connection unexpectedly"));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]