This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 0659f18f8d JAMES-3870 Group IMAP response line within TCP packets (#1364) 0659f18f8d is described below commit 0659f18f8d624ba118903ca124b07bd730e89957 Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Tue Jan 3 10:48:14 2023 +0700 JAMES-3870 Group IMAP response line within TCP packets (#1364) Today each Imap response line is transmitted in a distinct TCP packet. This causes a lot of network overhead as => TCP headers are added for each response line. To give an idea a LIST response line is 35 bytes long but result in a 101 bytes TCP frame so a 188% overcost.... => TCP ack are conducted for each line independently. An ACK is 66 bytes This is especially problematic for LIST, FETCH commands that actually result in many (100, 1000, maybe millions) response lines. We should try to limit the calls to "flush" with Netty, and force the flush only once per IMAP command (at the end of processing). Netty is free to transmit some data earlier if it's buffer states requires it. The entire mailbox list take 1 packet to transmit (1420 bytes total for 41 mailboxes so ~35 bytes per mailbox) and a single ACK (66 bytes). --- .../apache/james/imap/api/process/ImapSession.java | 4 ++++ .../apache/james/imap/processor/IdleProcessor.java | 5 +++-- .../imapserver/netty/ChannelImapResponseWriter.java | 12 +++++++----- .../netty/ImapChannelUpstreamHandler.java | 21 ++++++++++++++------- .../imapserver/netty/ImapLineHandlerAdapter.java | 1 + .../james/imapserver/netty/NettyImapSession.java | 7 +++++++ 6 files changed, 36 insertions(+), 14 deletions(-) diff --git a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java index 0e384c755b..35224b7ceb 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java @@ -263,4 +263,8 @@ public interface ImapSession extends CommandDetectionSession { } void schedule(Runnable runnable, Duration waitDelay); + + default void flush() { + + } } diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java index a8cb8d5f53..b6fdf32eaa 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java @@ -97,7 +97,6 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme if (sm != null) { sm.unregisterIdle(); } - session1.popLineHandler(); if (!DONE.equals(line.toUpperCase(Locale.US))) { String message = String.format("Continuation for IMAP IDLE was not understood. Expected 'DONE', got '%s'.", line); StatusResponse response = getStatusResponseFactory() @@ -109,6 +108,7 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme } else { okComplete(request, responder); } + session1.popLineHandler(); idleActive.set(false); }); @@ -167,7 +167,8 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme @Override public Publisher<Void> reactiveEvent(Event event) { - return unsolicitedResponses(session, responder, false); + return unsolicitedResponses(session, responder, false) + .then(Mono.fromRunnable(session::flush)); } @Override diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ChannelImapResponseWriter.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ChannelImapResponseWriter.java index 399b3f6f60..78d96fe75e 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ChannelImapResponseWriter.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ChannelImapResponseWriter.java @@ -57,7 +57,7 @@ public class ChannelImapResponseWriter implements ImapResponseWriter { @Override public void write(byte[] buffer) throws IOException { if (channel.isActive()) { - channel.writeAndFlush(Unpooled.wrappedBuffer(buffer)); + channel.write(Unpooled.wrappedBuffer(buffer)); } } @@ -72,16 +72,18 @@ public class ChannelImapResponseWriter implements ImapResponseWriter { // See JAMES-1305 and JAMES-1306 ChannelPipeline cp = channel.pipeline(); if (zeroCopy && cp.get(SslHandler.class) == null && cp.get(ZlibEncoder.class) == null) { - channel.writeAndFlush(new DefaultFileRegion(fc, fc.position(), literal.size())); + channel.write(new DefaultFileRegion(fc, fc.position(), literal.size())); } else { - channel.writeAndFlush(new ChunkedNioFile(fc, 8192)); + channel.write(new ChunkedNioFile(fc, 8192)); } } else { - channel.writeAndFlush(new ChunkedStream(in)); + channel.write(new ChunkedStream(in)); } } } - + public void flush() { + channel.flush(); + } } diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java index b0dd0e66f6..d451b4ed0a 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java @@ -181,10 +181,11 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp LOGGER.info("Connection established from {}", address.getAddress().getHostAddress()); imapConnectionsMetric.increment(); - ImapResponseComposer response = new ImapResponseComposerImpl(new ChannelImapResponseWriter(ctx.channel())); - + ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel()); + ImapResponseComposer response = new ImapResponseComposerImpl(writer); // write hello to client response.untagged().message("OK").message(hello).end(); + writer.flush(); super.channelActive(ctx); } @@ -249,8 +250,10 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp // command length." // // See also JAMES-1190 - ImapResponseComposer response = new ImapResponseComposerImpl(new ChannelImapResponseWriter(ctx.channel())); + ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel()); + ImapResponseComposer response = new ImapResponseComposerImpl(writer); response.untaggedResponse(ImapConstants.BAD + " failed. Maximum command line length exceeded"); + writer.flush(); } else if (cause instanceof ReactiveThrottler.RejectedException) { manageRejectedException(ctx, (ReactiveThrottler.RejectedException) cause); @@ -263,10 +266,12 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp private void manageRejectedException(ChannelHandlerContext ctx, ReactiveThrottler.RejectedException cause) { if (cause.getImapMessage() instanceof AbstractImapRequest) { AbstractImapRequest req = (AbstractImapRequest) cause.getImapMessage(); - ImapResponseComposer response = new ImapResponseComposerImpl(new ChannelImapResponseWriter(ctx.channel())); - + ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel()); + ImapResponseComposer response = new ImapResponseComposerImpl(writer); new ResponseEncoder(encoder, response) - .respond(new ImmutableStatusResponse(StatusResponse.Type.NO, req.getTag(), req.getCommand(), new HumanReadableText(cause.getClass().getName(), cause.getMessage()), null)); + .respond(new ImmutableStatusResponse(StatusResponse.Type.NO, req.getTag(), req.getCommand(), + new HumanReadableText(cause.getClass().getName(), cause.getMessage()), null)); + writer.flush(); } else { manageUnknownError(ctx); } @@ -306,7 +311,8 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp imapCommandsMetric.increment(); ImapSession session = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get(); Attribute<Disposable> disposableAttribute = ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY); - ImapResponseComposer response = new ImapResponseComposerImpl(new ChannelImapResponseWriter(ctx.channel())); + ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel()); + ImapResponseComposer response = new ImapResponseComposerImpl(writer); ImapMessage message = (ImapMessage) msg; beforeIDLEUponProcessing(ctx); @@ -344,6 +350,7 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp ctx.fireExceptionCaught(signal.getThrowable()); } disposableAttribute.set(null); + writer.flush(); ctx.fireChannelReadComplete(); })) .contextWrite(ReactorUtils.context("imap", mdc(ctx))), message) diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapLineHandlerAdapter.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapLineHandlerAdapter.java index c03e8d76ee..59febae27b 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapLineHandlerAdapter.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapLineHandlerAdapter.java @@ -49,6 +49,7 @@ public class ImapLineHandlerAdapter extends ChannelInboundHandlerAdapter { byte[] data = new byte[buf.readableBytes()]; buf.readBytes(data); lineHandler.onLine(session, data); + ctx.channel().flush(); } } diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java index 6c8b228d10..611e84561d 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java @@ -183,6 +183,7 @@ public class NettyImapSession implements ImapSession, NettyConstants { } executeSafely(() -> { runnable.run(); + channel.flush(); channel.pipeline().addFirst(SSL_HANDLER, secure.sslHandler()); stopDetectingCommandInjection(); }); @@ -229,6 +230,7 @@ public class NettyImapSession implements ImapSession, NettyConstants { executeSafely(() -> { runnable.run(); + channel.flush(); ZlibDecoder decoder = new JZlibDecoder(ZlibWrapper.NONE); ZlibEncoder encoder = new JZlibEncoder(ZlibWrapper.NONE, 5); @@ -307,4 +309,9 @@ public class NettyImapSession implements ImapSession, NettyConstants { public void schedule(Runnable runnable, Duration waitDelay) { channel.eventLoop().schedule(runnable, waitDelay.toMillis(), TimeUnit.MILLISECONDS); } + + @Override + public void flush() { + channel.flush(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org