This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new d53b7ad4c3 JAMES-3870 Prevent partial flush in IMAP (#1432) d53b7ad4c3 is described below commit d53b7ad4c3016974f3afa3bc9fc555b7e4dff14e Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Tue Feb 14 07:11:51 2023 +0700 JAMES-3870 Prevent partial flush in IMAP (#1432) Some clients like evolution do not tolerate IMAP messages split across 2 IMAP frames --- .../james/imap/api/process/ImapProcessor.java | 2 ++ .../apache/james/imap/api/process/ImapSession.java | 3 --- .../imap/decode/main/ImapRequestStreamHandler.java | 9 +++++++- .../main/OutputStreamImapResponseWriter.java | 13 +++++++++++ .../james/imap/encode/ImapResponseComposer.java | 2 ++ .../imap/encode/base/ImapResponseComposerImpl.java | 21 +++++++++++++---- .../imap/main/AbstractImapRequestHandler.java | 5 +++++ .../apache/james/imap/main/ResponseEncoder.java | 9 ++++++++ .../imap/processor/AbstractAuthProcessor.java | 2 ++ .../imap/processor/AuthenticateProcessor.java | 4 ++++ .../james/imap/processor/CompressProcessor.java | 9 +++++++- .../apache/james/imap/processor/IdleProcessor.java | 4 +++- .../james/imap/processor/LogoutProcessor.java | 3 +-- .../james/imap/processor/StartTLSProcessor.java | 5 ++++- .../encode/FetchResponseEncoderEnvelopeTest.java | 19 +++++++++++++++- .../FetchResponseEncoderNoExtensionsTest.java | 4 ++++ .../imap/encode/FetchResponseEncoderTest.java | 3 +++ .../imap/encode/ImapResponseComposerImplTest.java | 1 + .../james/imap/encode/LSubResponseEncoderTest.java | 1 + .../james/imap/encode/ListResponseEncoderTest.java | 1 + .../imap/encode/ListingEncodingUtilsTest.java | 9 ++++++++ .../encode/MailboxStatusResponseEncoderTest.java | 1 + .../imap/encode/MetadataResponseEncoderTest.java | 5 +++++ .../imap/encode/QuotaResponseEncoderTest.java | 2 ++ .../imap/encode/QuotaRootResponseEncoderTest.java | 1 + .../imap/encode/SearchResponseEncoderTest.java | 1 + .../imap/encode/XListResponseEncoderTest.java | 1 + .../james/imap/processor/SelectProcessorTest.java | 4 +++- .../java/org/apache/james/IMAPIntegrationTest.java | 2 +- .../netty/ChannelImapResponseWriter.java | 26 +++++++++++++++++----- .../netty/ImapChannelUpstreamHandler.java | 20 ++++++++--------- .../james/imapserver/netty/NettyImapSession.java | 7 ------ .../james/imapserver/netty/IMAPServerTest.java | 2 +- 33 files changed, 161 insertions(+), 40 deletions(-) diff --git a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapProcessor.java index 82ae5ad2c6..5917f6c81a 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapProcessor.java @@ -65,5 +65,7 @@ public interface ImapProcessor { * @param message <code>not null</code> */ void respond(ImapResponseMessage message); + + void flush(); } } diff --git a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java index 35224b7ceb..81a93d0194 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java @@ -264,7 +264,4 @@ public interface ImapSession extends CommandDetectionSession { void schedule(Runnable runnable, Duration waitDelay); - default void flush() { - - } } diff --git a/protocols/imap/src/main/java/org/apache/james/imap/decode/main/ImapRequestStreamHandler.java b/protocols/imap/src/main/java/org/apache/james/imap/decode/main/ImapRequestStreamHandler.java index 3b498bd687..07744fff34 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/decode/main/ImapRequestStreamHandler.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/decode/main/ImapRequestStreamHandler.java @@ -74,7 +74,9 @@ public final class ImapRequestStreamHandler extends AbstractImapRequestHandler { return false; } - ImapResponseComposerImpl response = new ImapResponseComposerImpl(new OutputStreamImapResponseWriter(output)); + OutputStreamImapResponseWriter writer = new OutputStreamImapResponseWriter(output); + ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer); + writer.setFlushCallback(response::flush); if (doProcessRequest(request, response, session)) { @@ -97,6 +99,11 @@ public final class ImapRequestStreamHandler extends AbstractImapRequestHandler { result = false; abandon(output, session); } + try { + response.flush(); + } catch (IOException e) { + throw new RuntimeException(e); + } } return result; } diff --git a/protocols/imap/src/main/java/org/apache/james/imap/decode/main/OutputStreamImapResponseWriter.java b/protocols/imap/src/main/java/org/apache/james/imap/decode/main/OutputStreamImapResponseWriter.java index d2510fe227..dcf87defc0 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/decode/main/OutputStreamImapResponseWriter.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/decode/main/OutputStreamImapResponseWriter.java @@ -32,16 +32,29 @@ import org.apache.james.imap.message.Literal; * client. */ public class OutputStreamImapResponseWriter implements ImapResponseWriter { + @FunctionalInterface + interface FlushCallback { + void run() throws IOException; + } public static final int BUFFER_SIZE = 1024; private final OutputStream output; + private FlushCallback flushCallback; public OutputStreamImapResponseWriter(OutputStream output) { this.output = output; + this.flushCallback = () -> { + + }; + } + + public void setFlushCallback(FlushCallback flushCallback) { + this.flushCallback = flushCallback; } @Override public void write(Literal literal) throws IOException { + flushCallback.run(); try (InputStream in = literal.getInputStream()) { IOUtils.copy(in, output, BUFFER_SIZE); } diff --git a/protocols/imap/src/main/java/org/apache/james/imap/encode/ImapResponseComposer.java b/protocols/imap/src/main/java/org/apache/james/imap/encode/ImapResponseComposer.java index 376a5d05d1..f53f66787d 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/encode/ImapResponseComposer.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/encode/ImapResponseComposer.java @@ -32,6 +32,8 @@ import org.apache.james.imap.message.Literal; public interface ImapResponseComposer { + void flush() throws IOException; + /** * Writes an untagged NO response. Indicates that a warning. The command may * still complete sucessfully. diff --git a/protocols/imap/src/main/java/org/apache/james/imap/encode/base/ImapResponseComposerImpl.java b/protocols/imap/src/main/java/org/apache/james/imap/encode/base/ImapResponseComposerImpl.java index 06dcf22ad9..56700dcb19 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/encode/base/ImapResponseComposerImpl.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/encode/base/ImapResponseComposerImpl.java @@ -22,6 +22,7 @@ package org.apache.james.imap.encode.base; import static java.nio.charset.StandardCharsets.US_ASCII; import java.io.IOException; +import java.util.Optional; import javax.mail.Flags; @@ -52,6 +53,10 @@ public class ImapResponseComposerImpl implements ImapConstants, ImapResponseComp private static final byte[] DRAFT = "\\Draft".getBytes(US_ASCII); private static final byte[] DELETED = "\\Deleted".getBytes(US_ASCII); private static final byte[] ANSWERED = "\\Answered".getBytes(US_ASCII); + private static final int FLUSH_BUFFER_SIZE = Optional.ofNullable(System.getProperty("james.imap.flush.buffer.size")) + .map(Integer::parseInt) + .orElse(8192); + private static final byte[] CONTINUATION_BYTES = "+\r\n".getBytes(US_ASCII); private final ImapResponseWriter writer; @@ -90,8 +95,8 @@ public class ImapResponseComposerImpl implements ImapConstants, ImapResponseComp @Override public ImapResponseComposer continuationResponse() throws IOException { - buffer.write(CONTINUATION); - end(); + flush(); + writer.write(CONTINUATION_BYTES); return this; } @@ -134,11 +139,19 @@ public class ImapResponseComposerImpl implements ImapConstants, ImapResponseComp @Override public ImapResponseComposer end() throws IOException { buffer.write(LINE_END_BYTES); - writer.write(buffer.toByteArray()); - buffer.reset(); + if (buffer.size() > FLUSH_BUFFER_SIZE) { + flush(); + } return this; } + public void flush() throws IOException { + if (buffer.size() > 0) { + writer.write(buffer.toByteArray()); + buffer.reset(); + } + } + @Override public ImapResponseComposer tag(Tag tag) throws IOException { writeASCII(tag.asString()); diff --git a/protocols/imap/src/main/java/org/apache/james/imap/main/AbstractImapRequestHandler.java b/protocols/imap/src/main/java/org/apache/james/imap/main/AbstractImapRequestHandler.java index 0bc421d1bc..18a020d193 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/main/AbstractImapRequestHandler.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/main/AbstractImapRequestHandler.java @@ -88,6 +88,11 @@ public abstract class AbstractImapRequestHandler { public void respond(ImapResponseMessage message) { // Swallow } + + @Override + public void flush() { + + } } } diff --git a/protocols/imap/src/main/java/org/apache/james/imap/main/ResponseEncoder.java b/protocols/imap/src/main/java/org/apache/james/imap/main/ResponseEncoder.java index 7e74dd2f02..6524bd7de4 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/main/ResponseEncoder.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/main/ResponseEncoder.java @@ -55,4 +55,13 @@ public class ResponseEncoder implements Responder { return failure; } + + @Override + public void flush() { + try { + composer.flush(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractAuthProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractAuthProcessor.java index df82824b18..16e0226d23 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractAuthProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractAuthProcessor.java @@ -78,6 +78,7 @@ public abstract class AbstractAuthProcessor<R extends ImapRequest> extends Abstr session.setMailboxSession(mailboxSession); provisionInbox(session, mailboxManager, mailboxSession); okComplete(request, responder); + responder.flush(); session.stopDetectingCommandInjection(); } catch (BadCredentialsException e) { authFailure = true; @@ -89,6 +90,7 @@ public abstract class AbstractAuthProcessor<R extends ImapRequest> extends Abstr } catch (MailboxException e) { LOGGER.error("Error encountered while login", e); no(request, responder, HumanReadableText.GENERIC_FAILURE_DURING_PROCESSING); + responder.flush(); } } diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/AuthenticateProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/AuthenticateProcessor.java index cfab921d05..fde4363f8b 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/AuthenticateProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/AuthenticateProcessor.java @@ -93,10 +93,12 @@ public class AuthenticateProcessor extends AbstractAuthProcessor<AuthenticateReq } else { session.executeSafely(() -> { responder.respond(new AuthenticateResponse()); + responder.flush(); session.pushLineHandler((requestSession, data) -> { doPlainAuth(extractInitialClientResponse(data), requestSession, request, responder); // remove the handler now requestSession.popLineHandler(); + responder.flush(); }); }); } @@ -108,9 +110,11 @@ public class AuthenticateProcessor extends AbstractAuthProcessor<AuthenticateReq } else { session.executeSafely(() -> { responder.respond(new AuthenticateResponse()); + responder.flush(); session.pushLineHandler((requestSession, data) -> { doOAuth(extractInitialClientResponse(data), requestSession, request, responder); requestSession.popLineHandler(); + responder.flush(); }); }); } diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/CompressProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/CompressProcessor.java index 42b774f6ad..f2d3dd8b87 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/CompressProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/CompressProcessor.java @@ -62,7 +62,7 @@ public class CompressProcessor extends AbstractProcessor<CompressRequest> implem } else { StatusResponse response = factory.taggedOk(request.getTag(), request.getCommand(), HumanReadableText.DEFLATE_ACTIVE); - if (session.startCompression(() -> responder.respond(response))) { + if (activateCompression(responder, session, response)) { session.setAttribute(COMPRESSED, true); } } @@ -73,6 +73,13 @@ public class CompressProcessor extends AbstractProcessor<CompressRequest> implem }); } + private boolean activateCompression(Responder responder, ImapSession session, StatusResponse response) { + return session.startCompression(() -> { + responder.respond(response); + responder.flush(); + }); + } + @Override public List<Capability> getImplementedCapabilities(ImapSession session) { if (session.isCompressionSupported()) { diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java index b6fdf32eaa..f1489a31cd 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java @@ -105,8 +105,10 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme "failed. " + message)); LOGGER.info(message); responder.respond(response); + responder.flush(); } else { okComplete(request, responder); + responder.flush(); } session1.popLineHandler(); idleActive.set(false); @@ -168,7 +170,7 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme @Override public Publisher<Void> reactiveEvent(Event event) { return unsolicitedResponses(session, responder, false) - .then(Mono.fromRunnable(session::flush)); + .then(Mono.fromRunnable(responder::flush)); } @Override diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/LogoutProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/LogoutProcessor.java index 9cb3cacceb..a24df271c7 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/LogoutProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/LogoutProcessor.java @@ -25,7 +25,6 @@ import org.apache.james.imap.api.message.response.StatusResponseFactory; import org.apache.james.imap.api.process.ImapSession; import org.apache.james.imap.message.request.LogoutRequest; import org.apache.james.mailbox.MailboxManager; -import org.apache.james.mailbox.MailboxSession; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.util.MDCBuilder; @@ -40,11 +39,11 @@ public class LogoutProcessor extends AbstractMailboxProcessor<LogoutRequest> { @Override protected Mono<Void> processRequestReactive(LogoutRequest request, ImapSession session, Responder responder) { - MailboxSession mailboxSession = session.getMailboxSession(); return session.logout() .then(Mono.fromRunnable(() -> { bye(responder); okComplete(request, responder); + responder.flush(); })); } diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/StartTLSProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/StartTLSProcessor.java index fabe1366c4..9f2eb850f0 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/StartTLSProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/StartTLSProcessor.java @@ -53,7 +53,10 @@ public class StartTLSProcessor extends AbstractProcessor<StartTLSRequest> implem protected Mono<Void> doProcess(StartTLSRequest request, Responder responder, ImapSession session) { return Mono.fromRunnable(() -> { if (session.supportStartTLS()) { - session.startTLS(() -> responder.respond(factory.taggedOk(request.getTag(), request.getCommand(), HumanReadableText.STARTTLS))); + session.startTLS(() -> { + responder.respond(factory.taggedOk(request.getTag(), request.getCommand(), HumanReadableText.STARTTLS)); + responder.flush(); + }); } else { responder.respond(factory.taggedBad(request.getTag(), request.getCommand(), HumanReadableText.UNKNOWN_COMMAND)); } diff --git a/protocols/imap/src/test/java/org/apache/james/imap/encode/FetchResponseEncoderEnvelopeTest.java b/protocols/imap/src/test/java/org/apache/james/imap/encode/FetchResponseEncoderEnvelopeTest.java index 1198141fbf..d8890ae665 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/encode/FetchResponseEncoderEnvelopeTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/encode/FetchResponseEncoderEnvelopeTest.java @@ -142,7 +142,8 @@ public class FetchResponseEncoderEnvelopeTest { void testShouldNilAllNullProperties() throws Exception { envelopExpects(); encoder.encode(message, composer); - + + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (ENVELOPE (NIL NIL NIL NIL NIL NIL NIL NIL NIL NIL))\r\n"); } @@ -153,6 +154,7 @@ public class FetchResponseEncoderEnvelopeTest { encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (ENVELOPE (\"a date\" NIL NIL NIL NIL NIL NIL NIL NIL NIL))\r\n"); } @@ -163,6 +165,7 @@ public class FetchResponseEncoderEnvelopeTest { envelopExpects(); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (ENVELOPE (NIL \"some subject\" NIL NIL NIL NIL NIL NIL NIL NIL))\r\n"); } @@ -173,6 +176,7 @@ public class FetchResponseEncoderEnvelopeTest { envelopExpects(); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (ENVELOPE (NIL NIL NIL NIL NIL NIL NIL NIL \"some reply to\" NIL))\r\n"); } @@ -182,6 +186,7 @@ public class FetchResponseEncoderEnvelopeTest { envelopExpects(); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (ENVELOPE (NIL NIL NIL NIL NIL NIL NIL NIL NIL \"some message id\"))\r\n"); } @@ -191,6 +196,7 @@ public class FetchResponseEncoderEnvelopeTest { from = mockOneAddress(); envelopExpects(); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (ENVELOPE (NIL NIL ((\"NAME\" \"DOMAIN LIST\" \"MAILBOX\" \"HOST\")) NIL NIL NIL NIL NIL NIL NIL))\r\n"); } @@ -201,6 +207,7 @@ public class FetchResponseEncoderEnvelopeTest { envelopExpects(); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (ENVELOPE (NIL NIL ((\"NAME\" \"DOMAIN LIST\" \"MAILBOX\" \"HOST\")(\"2NAME\" \"2DOMAIN LIST\" \"2MAILBOX\" \"2HOST\")) NIL NIL NIL NIL NIL NIL NIL))\r\n"); } @@ -211,6 +218,7 @@ public class FetchResponseEncoderEnvelopeTest { envelopExpects(); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (ENVELOPE (NIL NIL NIL ((\"NAME\" \"DOMAIN LIST\" \"MAILBOX\" \"HOST\")) NIL NIL NIL NIL NIL NIL))\r\n"); } @@ -221,6 +229,7 @@ public class FetchResponseEncoderEnvelopeTest { envelopExpects(); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (ENVELOPE (NIL NIL NIL ((\"NAME\" \"DOMAIN LIST\" \"MAILBOX\" \"HOST\")(\"2NAME\" \"2DOMAIN LIST\" \"2MAILBOX\" \"2HOST\")) NIL NIL NIL NIL NIL NIL))\r\n"); } @@ -232,6 +241,7 @@ public class FetchResponseEncoderEnvelopeTest { envelopExpects(); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (ENVELOPE (NIL NIL NIL NIL ((\"NAME\" \"DOMAIN LIST\" \"MAILBOX\" \"HOST\")) NIL NIL NIL NIL NIL))\r\n"); } @@ -242,6 +252,7 @@ public class FetchResponseEncoderEnvelopeTest { envelopExpects(); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (ENVELOPE (NIL NIL NIL NIL ((\"NAME\" \"DOMAIN LIST\" \"MAILBOX\" \"HOST\")(\"2NAME\" \"2DOMAIN LIST\" \"2MAILBOX\" \"2HOST\")) NIL NIL NIL NIL NIL))\r\n"); } @@ -252,6 +263,7 @@ public class FetchResponseEncoderEnvelopeTest { envelopExpects(); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (ENVELOPE (NIL NIL NIL NIL NIL ((\"NAME\" \"DOMAIN LIST\" \"MAILBOX\" \"HOST\")) NIL NIL NIL NIL))\r\n"); } @@ -262,6 +274,7 @@ public class FetchResponseEncoderEnvelopeTest { envelopExpects(); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (ENVELOPE (NIL NIL NIL NIL NIL ((\"NAME\" \"DOMAIN LIST\" \"MAILBOX\" \"HOST\")(\"2NAME\" \"2DOMAIN LIST\" \"2MAILBOX\" \"2HOST\")) NIL NIL NIL NIL))\r\n"); } @@ -272,6 +285,7 @@ public class FetchResponseEncoderEnvelopeTest { envelopExpects(); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (ENVELOPE (NIL NIL NIL NIL NIL NIL ((\"NAME\" \"DOMAIN LIST\" \"MAILBOX\" \"HOST\")) NIL NIL NIL))\r\n"); } @@ -282,6 +296,7 @@ public class FetchResponseEncoderEnvelopeTest { envelopExpects(); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (ENVELOPE (NIL NIL NIL NIL NIL NIL ((\"NAME\" \"DOMAIN LIST\" \"MAILBOX\" \"HOST\")(\"2NAME\" \"2DOMAIN LIST\" \"2MAILBOX\" \"2HOST\")) NIL NIL NIL))\r\n"); } @@ -292,6 +307,7 @@ public class FetchResponseEncoderEnvelopeTest { envelopExpects(); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (ENVELOPE (NIL NIL NIL NIL NIL NIL NIL ((\"NAME\" \"DOMAIN LIST\" \"MAILBOX\" \"HOST\")) NIL NIL))\r\n"); } @@ -302,6 +318,7 @@ public class FetchResponseEncoderEnvelopeTest { envelopExpects(); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (ENVELOPE (NIL NIL NIL NIL NIL NIL NIL ((\"NAME\" \"DOMAIN LIST\" \"MAILBOX\" \"HOST\")(\"2NAME\" \"2DOMAIN LIST\" \"2MAILBOX\" \"2HOST\")) NIL NIL))\r\n"); } diff --git a/protocols/imap/src/test/java/org/apache/james/imap/encode/FetchResponseEncoderNoExtensionsTest.java b/protocols/imap/src/test/java/org/apache/james/imap/encode/FetchResponseEncoderNoExtensionsTest.java index 6930d6ca5c..ef1947adaa 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/encode/FetchResponseEncoderNoExtensionsTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/encode/FetchResponseEncoderNoExtensionsTest.java @@ -60,6 +60,7 @@ class FetchResponseEncoderNoExtensionsTest { FetchResponse message = new FetchResponse(MSN, flags, null, null, null, null, null, null, null, null, null, null, null); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (FLAGS (\\Deleted))\r\n"); } @@ -68,6 +69,7 @@ class FetchResponseEncoderNoExtensionsTest { FetchResponse message = new FetchResponse(MSN, null, MessageUid.of(72), null, null, null, null, null, null, null, null, null, null); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (UID 72)\r\n"); } @@ -77,6 +79,7 @@ class FetchResponseEncoderNoExtensionsTest { FetchResponse message = new FetchResponse(MSN, flags, MessageUid.of(72), null, null, null, null, null, null, null, null, null, null); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (FLAGS (\\Deleted) UID 72)\r\n"); } @@ -101,6 +104,7 @@ class FetchResponseEncoderNoExtensionsTest { when(stubStructure.getDescription()).thenReturn(""); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (FLAGS (\\Deleted) BODYSTRUCTURE (\"TEXT\" \"HTML\" (\"CHARSET\" \"US-ASCII\") \"\" \"\" \"7BIT\" 2279 48) UID 72)\r\n"); } diff --git a/protocols/imap/src/test/java/org/apache/james/imap/encode/FetchResponseEncoderTest.java b/protocols/imap/src/test/java/org/apache/james/imap/encode/FetchResponseEncoderTest.java index 2575b652fa..cdddc02321 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/encode/FetchResponseEncoderTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/encode/FetchResponseEncoderTest.java @@ -54,6 +54,7 @@ class FetchResponseEncoderTest { FetchResponse message = new FetchResponse(MSN, flags, null, null, null, null, null, null, null, null, null, null, null); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (FLAGS (\\Deleted))\r\n"); @@ -64,6 +65,7 @@ class FetchResponseEncoderTest { FetchResponse message = new FetchResponse(MSN, null, MessageUid.of(72), null, null, null, null, null, null, null, null, null, null); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (UID 72)\r\n"); @@ -74,6 +76,7 @@ class FetchResponseEncoderTest { FetchResponse message = new FetchResponse(MSN, flags, MessageUid.of(72), null, null, null, null, null, null, null, null, null, null); encoder.encode(message, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* 100 FETCH (FLAGS (\\Deleted) UID 72)\r\n"); } diff --git a/protocols/imap/src/test/java/org/apache/james/imap/encode/ImapResponseComposerImplTest.java b/protocols/imap/src/test/java/org/apache/james/imap/encode/ImapResponseComposerImplTest.java index 593516f38e..9be8d9aec6 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/encode/ImapResponseComposerImplTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/encode/ImapResponseComposerImplTest.java @@ -41,6 +41,7 @@ class ImapResponseComposerImplTest { Character c = 128; composer.quote(c.toString()); composer.end(); + composer.flush(); assertThat(writer.getString()).isEqualTo(" \"?\"\r\n"); } diff --git a/protocols/imap/src/test/java/org/apache/james/imap/encode/LSubResponseEncoderTest.java b/protocols/imap/src/test/java/org/apache/james/imap/encode/LSubResponseEncoderTest.java index c31a2b036f..6254474d77 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/encode/LSubResponseEncoderTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/encode/LSubResponseEncoderTest.java @@ -46,6 +46,7 @@ class LSubResponseEncoderTest { @Test void encoderShouldIncludeLSUBCommand() throws Exception { encoder.encode(new LSubResponse("name", true, '.'), composer); + composer.flush(); assertThat(writer.getString()).startsWith("* LSUB"); } diff --git a/protocols/imap/src/test/java/org/apache/james/imap/encode/ListResponseEncoderTest.java b/protocols/imap/src/test/java/org/apache/james/imap/encode/ListResponseEncoderTest.java index d6559d7dda..dce57b7cfa 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/encode/ListResponseEncoderTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/encode/ListResponseEncoderTest.java @@ -51,6 +51,7 @@ class ListResponseEncoderTest { encoder.encode(new ListResponse(MailboxMetaData.Children.HAS_CHILDREN, MailboxMetaData.Selectability.NONE, "name", '.', false, false, EnumSet.noneOf(ListResponse.ChildInfo.class), MailboxType.OTHER), composer); + composer.flush(); assertThat(writer.getString()).startsWith("* LIST"); } } diff --git a/protocols/imap/src/test/java/org/apache/james/imap/encode/ListingEncodingUtilsTest.java b/protocols/imap/src/test/java/org/apache/james/imap/encode/ListingEncodingUtilsTest.java index ab748e79a8..171eb7c325 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/encode/ListingEncodingUtilsTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/encode/ListingEncodingUtilsTest.java @@ -46,6 +46,7 @@ public class ListingEncodingUtilsTest { ListResponse input = new ListResponse(Children.HAS_CHILDREN, Selectability.NONE, nameParameter, ((char) Character.UNASSIGNED), false, false, EnumSet.noneOf(ListResponse.ChildInfo.class), MailboxType.OTHER); ListingEncodingUtils.encodeListingResponse(LIST_COMMAND, composer, input); + composer.flush(); assertThat(writer.getString()).isEqualTo("* LIST (\\HasChildren) NIL \"mailbox\"\r\n"); } @@ -54,6 +55,7 @@ public class ListingEncodingUtilsTest { ListResponse input = new ListResponse(Children.HAS_CHILDREN, Selectability.NONE, nameParameter, '#', false, false, EnumSet.noneOf(ListResponse.ChildInfo.class), MailboxType.OTHER); ListingEncodingUtils.encodeListingResponse(LIST_COMMAND, composer, input); + composer.flush(); assertThat(writer.getString()).isEqualTo("* LIST (\\HasChildren) \"#\" \"mailbox\"\r\n"); } @@ -62,6 +64,7 @@ public class ListingEncodingUtilsTest { ListResponse input = new ListResponse(Children.CHILDREN_ALLOWED_BUT_UNKNOWN, MailboxMetaData.Selectability.NONE, nameParameter, '.', false, false, EnumSet.noneOf(ListResponse.ChildInfo.class), MailboxType.OTHER); ListingEncodingUtils.encodeListingResponse(LIST_COMMAND, composer, input); + composer.flush(); assertThat(writer.getString()).isEqualTo("* LIST () \".\" \"mailbox\"\r\n"); } @@ -70,6 +73,7 @@ public class ListingEncodingUtilsTest { ListResponse input = new ListResponse(Children.HAS_CHILDREN, Selectability.NONE, nameParameter, '.', false, false, EnumSet.noneOf(ListResponse.ChildInfo.class), MailboxType.OTHER); ListingEncodingUtils.encodeListingResponse(LIST_COMMAND, composer, input); + composer.flush(); assertThat(writer.getString()).isEqualTo("* LIST (\\HasChildren) \".\" \"mailbox\"\r\n"); } @@ -78,6 +82,7 @@ public class ListingEncodingUtilsTest { ListResponse input = new ListResponse(Children.HAS_NO_CHILDREN, Selectability.NONE, nameParameter, '.', false, false, EnumSet.noneOf(ListResponse.ChildInfo.class), MailboxType.OTHER); ListingEncodingUtils.encodeListingResponse(LIST_COMMAND, composer, input); + composer.flush(); assertThat(writer.getString()).isEqualTo("* LIST (\\HasNoChildren) \".\" \"mailbox\"\r\n"); } @@ -86,6 +91,7 @@ public class ListingEncodingUtilsTest { ListResponse input = new ListResponse(Children.NO_INFERIORS, Selectability.NOSELECT, nameParameter, '.', false, false, EnumSet.noneOf(ListResponse.ChildInfo.class), MailboxType.OTHER); ListingEncodingUtils.encodeListingResponse(LIST_COMMAND, composer, input); + composer.flush(); assertThat(writer.getString()).isEqualTo("* LIST (\\Noselect \\Noinferiors) \".\" \"mailbox\"\r\n"); } @@ -94,6 +100,7 @@ public class ListingEncodingUtilsTest { ListResponse input = new ListResponse(Children.CHILDREN_ALLOWED_BUT_UNKNOWN, Selectability.MARKED, nameParameter, '.', false, false, EnumSet.noneOf(ListResponse.ChildInfo.class), MailboxType.OTHER); ListingEncodingUtils.encodeListingResponse(LIST_COMMAND, composer, input); + composer.flush(); assertThat(writer.getString()).isEqualTo("* LIST (\\Marked) \".\" \"mailbox\"\r\n"); } @@ -102,6 +109,7 @@ public class ListingEncodingUtilsTest { ListResponse input = new ListResponse(Children.CHILDREN_ALLOWED_BUT_UNKNOWN, Selectability.UNMARKED, nameParameter, '.', false, false, EnumSet.noneOf(ListResponse.ChildInfo.class), MailboxType.OTHER); ListingEncodingUtils.encodeListingResponse(LIST_COMMAND, composer, input); + composer.flush(); assertThat(writer.getString()).isEqualTo("* LIST (\\Unmarked) \".\" \"mailbox\"\r\n"); } @@ -110,6 +118,7 @@ public class ListingEncodingUtilsTest { XListResponse input = new XListResponse(Children.HAS_CHILDREN, Selectability.NONE, nameParameter, '.', MailboxType.INBOX); ListingEncodingUtils.encodeListingResponse(XLIST_COMMAND, composer, input); + composer.flush(); assertThat(writer.getString()).isEqualTo("* XLIST (\\HasChildren \\Inbox) \".\" \"mailbox\"\r\n"); } } diff --git a/protocols/imap/src/test/java/org/apache/james/imap/encode/MailboxStatusResponseEncoderTest.java b/protocols/imap/src/test/java/org/apache/james/imap/encode/MailboxStatusResponseEncoderTest.java index 2c590d8824..04ce8b6e98 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/encode/MailboxStatusResponseEncoderTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/encode/MailboxStatusResponseEncoderTest.java @@ -60,6 +60,7 @@ class MailboxStatusResponseEncoderTest { encoder.encode(new MailboxStatusResponse(null, null, null, deletedStorage, messages, recent, uidNext, null, uidValidity, unseen, mailbox, null), composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* STATUS \"A mailbox named desire\" (MESSAGES 2 DELETED-STORAGE 13 RECENT 3 UIDNEXT 5 UIDVALIDITY 7 UNSEEN 11)\r\n"); } } diff --git a/protocols/imap/src/test/java/org/apache/james/imap/encode/MetadataResponseEncoderTest.java b/protocols/imap/src/test/java/org/apache/james/imap/encode/MetadataResponseEncoderTest.java index 96858a7009..2a3509ea1f 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/encode/MetadataResponseEncoderTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/encode/MetadataResponseEncoderTest.java @@ -55,6 +55,7 @@ class MetadataResponseEncoderTest { MetadataResponse response = new MetadataResponse(null, ImmutableList.of()); encoder.encode(response, composer); + composer.flush(); assertThat(byteImapResponseWriter.getString()).isEqualTo("* METADATA \"\"\r\n"); } @@ -64,6 +65,7 @@ class MetadataResponseEncoderTest { MetadataResponse response = new MetadataResponse("INBOX", ImmutableList.of()); encoder.encode(response, composer); + composer.flush(); assertThat(byteImapResponseWriter.getString()).isEqualTo("* METADATA \"INBOX\"\r\n"); } @@ -73,6 +75,7 @@ class MetadataResponseEncoderTest { MetadataResponse response = new MetadataResponse("INBOX", ImmutableList.of(PRIVATE_ANNOTATION)); encoder.encode(response, composer); + composer.flush(); assertThat(byteImapResponseWriter.getString()).isEqualTo("* METADATA \"INBOX\" (/private/comment \"My own comment\")\r\n"); } @@ -81,6 +84,7 @@ class MetadataResponseEncoderTest { void encodingShouldWellFormWhenManyReturnedAnnotations() throws Exception { MetadataResponse response = new MetadataResponse("INBOX", ImmutableList.of(PRIVATE_ANNOTATION, SHARED_ANNOTATION)); encoder.encode(response, composer); + composer.flush(); assertThat(byteImapResponseWriter.getString()).isEqualTo("* METADATA \"INBOX\" (/private/comment \"My own comment\" /shared/comment \"Shared comment\")\r\n"); } @@ -90,6 +94,7 @@ class MetadataResponseEncoderTest { MetadataResponse response = new MetadataResponse("INBOX", ImmutableList.of(MailboxAnnotation.nil(PRIVATE_KEY))); encoder.encode(response, composer); + composer.flush(); assertThat(byteImapResponseWriter.getString()).isEqualTo("* METADATA \"INBOX\" ()\r\n"); } diff --git a/protocols/imap/src/test/java/org/apache/james/imap/encode/QuotaResponseEncoderTest.java b/protocols/imap/src/test/java/org/apache/james/imap/encode/QuotaResponseEncoderTest.java index e50108d680..eb74b7c656 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/encode/QuotaResponseEncoderTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/encode/QuotaResponseEncoderTest.java @@ -43,6 +43,7 @@ class QuotaResponseEncoderTest { ImapResponseComposer composer = new ImapResponseComposerImpl(byteImapResponseWriter, 1024); QuotaResponseEncoder encoder = new QuotaResponseEncoder(); encoder.encode(response, composer); + composer.flush(); String responseString = byteImapResponseWriter.getString(); assertThat(responseString).isEqualTo("* QUOTA root (MESSAGE 231 1024)\r\n"); } @@ -55,6 +56,7 @@ class QuotaResponseEncoderTest { ImapResponseComposer composer = new ImapResponseComposerImpl(byteImapResponseWriter, 1024); QuotaResponseEncoder encoder = new QuotaResponseEncoder(); encoder.encode(response, composer); + composer.flush(); String responseString = byteImapResponseWriter.getString(); assertThat(responseString).isEqualTo("* QUOTA root (STORAGE 231 1024)\r\n"); } diff --git a/protocols/imap/src/test/java/org/apache/james/imap/encode/QuotaRootResponseEncoderTest.java b/protocols/imap/src/test/java/org/apache/james/imap/encode/QuotaRootResponseEncoderTest.java index b967430ec2..18d3c22983 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/encode/QuotaRootResponseEncoderTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/encode/QuotaRootResponseEncoderTest.java @@ -37,6 +37,7 @@ class QuotaRootResponseEncoderTest { ImapResponseComposer composer = new ImapResponseComposerImpl(byteImapResponseWriter, 1024); QuotaRootResponseEncoder encoder = new QuotaRootResponseEncoder(); encoder.encode(response, composer); + composer.flush(); String responseString = byteImapResponseWriter.getString(); assertThat(responseString).isEqualTo("* QUOTAROOT \"INBOX\" root\r\n"); } diff --git a/protocols/imap/src/test/java/org/apache/james/imap/encode/SearchResponseEncoderTest.java b/protocols/imap/src/test/java/org/apache/james/imap/encode/SearchResponseEncoderTest.java index 5fd1135337..8a794545b7 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/encode/SearchResponseEncoderTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/encode/SearchResponseEncoderTest.java @@ -59,6 +59,7 @@ class SearchResponseEncoderTest { @Test void testEncode() throws Exception { encoder.encode(response, composer); + composer.flush(); assertThat(writer.getString()).isEqualTo("* SEARCH 1 4 9 16\r\n"); } } diff --git a/protocols/imap/src/test/java/org/apache/james/imap/encode/XListResponseEncoderTest.java b/protocols/imap/src/test/java/org/apache/james/imap/encode/XListResponseEncoderTest.java index bf5b18e324..63d38e56b5 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/encode/XListResponseEncoderTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/encode/XListResponseEncoderTest.java @@ -56,6 +56,7 @@ class XListResponseEncoderTest { '.', MailboxType.INBOX), composer); + composer.flush(); assertThat(writer.getString()).startsWith("* XLIST"); } } diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/SelectProcessorTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/SelectProcessorTest.java index 9a88be9e48..f1c1e71205 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/processor/SelectProcessorTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/SelectProcessorTest.java @@ -147,11 +147,13 @@ class SelectProcessorTest { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ImapResponseComposerImpl composer = new ImapResponseComposerImpl(new OutputStreamImapResponseWriter(outputStream)); testee.process(message, new ResponseEncoder( new DefaultImapEncoderFactory(new DefaultLocalizer(), true).buildImapEncoder(), - new ImapResponseComposerImpl(new OutputStreamImapResponseWriter(outputStream))), + composer), session); + composer.flush(); assertThat(new String(outputStream.toByteArray())) .contains("* VANISHED (EARLIER) 2,4"); diff --git a/server/apps/memory-app/src/test/java/org/apache/james/IMAPIntegrationTest.java b/server/apps/memory-app/src/test/java/org/apache/james/IMAPIntegrationTest.java index 5eeef109ad..aef3dd4a11 100644 --- a/server/apps/memory-app/src/test/java/org/apache/james/IMAPIntegrationTest.java +++ b/server/apps/memory-app/src/test/java/org/apache/james/IMAPIntegrationTest.java @@ -35,7 +35,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -public class IMAPIntegrationTest { +class IMAPIntegrationTest { @RegisterExtension static JamesServerExtension jamesServerExtension = new JamesServerBuilder<MemoryJamesConfiguration>(tmpDir -> diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ChannelImapResponseWriter.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ChannelImapResponseWriter.java index 78d96fe75e..0a4f53ce9c 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ChannelImapResponseWriter.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ChannelImapResponseWriter.java @@ -41,9 +41,14 @@ import io.netty.handler.stream.ChunkedStream; * {@link Channel} */ public class ChannelImapResponseWriter implements ImapResponseWriter { + @FunctionalInterface + interface FlushCallback { + void run() throws IOException; + } private final Channel channel; private final boolean zeroCopy; + private FlushCallback flushCallback; public ChannelImapResponseWriter(Channel channel) { this(channel, true); @@ -52,17 +57,25 @@ public class ChannelImapResponseWriter implements ImapResponseWriter { public ChannelImapResponseWriter(Channel channel, boolean zeroCopy) { this.channel = channel; this.zeroCopy = zeroCopy; + this.flushCallback = () -> { + + }; + } + + public void setFlushCallback(FlushCallback flushCallback) { + this.flushCallback = flushCallback; } @Override - public void write(byte[] buffer) throws IOException { + public void write(byte[] buffer) { if (channel.isActive()) { - channel.write(Unpooled.wrappedBuffer(buffer)); + channel.writeAndFlush(Unpooled.wrappedBuffer(buffer)); } } @Override public void write(Literal literal) throws IOException { + flushCallback.run(); if (channel.isActive()) { InputStream in = literal.getInputStream(); if (in instanceof FileInputStream) { @@ -72,17 +85,18 @@ public class ChannelImapResponseWriter implements ImapResponseWriter { // See JAMES-1305 and JAMES-1306 ChannelPipeline cp = channel.pipeline(); if (zeroCopy && cp.get(SslHandler.class) == null && cp.get(ZlibEncoder.class) == null) { - channel.write(new DefaultFileRegion(fc, fc.position(), literal.size())); + channel.writeAndFlush(new DefaultFileRegion(fc, fc.position(), literal.size())); } else { - channel.write(new ChunkedNioFile(fc, 8192)); + channel.writeAndFlush(new ChunkedNioFile(fc, 8192)); } } else { - channel.write(new ChunkedStream(in)); + channel.writeAndFlush(new ChunkedStream(in)); } } } - public void flush() { + public void flush() throws IOException { + flushCallback.run(); channel.flush(); } diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java index 7963e12c35..d64d77ed89 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java @@ -36,7 +36,6 @@ import org.apache.james.imap.api.process.ImapProcessor; import org.apache.james.imap.api.process.ImapSession; import org.apache.james.imap.api.process.ImapSession.SessionId; import org.apache.james.imap.encode.ImapEncoder; -import org.apache.james.imap.encode.ImapResponseComposer; import org.apache.james.imap.encode.base.ImapResponseComposerImpl; import org.apache.james.imap.main.ResponseEncoder; import org.apache.james.imap.message.request.AbstractImapRequest; @@ -184,10 +183,10 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp imapConnectionsMetric.increment(); ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel()); - ImapResponseComposer response = new ImapResponseComposerImpl(writer); + ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer); // write hello to client response.untagged().message("OK").message(hello).end(); - writer.flush(); + response.flush(); super.channelActive(ctx); } @@ -257,9 +256,9 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp // // See also JAMES-1190 ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel()); - ImapResponseComposer response = new ImapResponseComposerImpl(writer); + ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer); response.untaggedResponse(ImapConstants.BAD + " failed. Maximum command line length exceeded"); - writer.flush(); + response.flush(); } else if (cause instanceof ReactiveThrottler.RejectedException) { manageRejectedException(ctx, (ReactiveThrottler.RejectedException) cause); @@ -269,15 +268,15 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp } } - private void manageRejectedException(ChannelHandlerContext ctx, ReactiveThrottler.RejectedException cause) { + private void manageRejectedException(ChannelHandlerContext ctx, ReactiveThrottler.RejectedException cause) throws IOException { if (cause.getImapMessage() instanceof AbstractImapRequest) { AbstractImapRequest req = (AbstractImapRequest) cause.getImapMessage(); ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel()); - ImapResponseComposer response = new ImapResponseComposerImpl(writer); + ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer); new ResponseEncoder(encoder, response) .respond(new ImmutableStatusResponse(StatusResponse.Type.NO, req.getTag(), req.getCommand(), new HumanReadableText(cause.getClass().getName(), cause.getMessage()), null)); - writer.flush(); + response.flush(); } else { manageUnknownError(ctx); } @@ -318,7 +317,8 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp ImapSession session = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get(); Attribute<Disposable> disposableAttribute = ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY); ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel()); - ImapResponseComposer response = new ImapResponseComposerImpl(writer); + ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer); + writer.setFlushCallback(response::flush); ImapMessage message = (ImapMessage) msg; beforeIDLEUponProcessing(ctx); @@ -356,7 +356,7 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp ctx.fireExceptionCaught(signal.getThrowable()); } disposableAttribute.set(null); - writer.flush(); + response.flush(); ctx.fireChannelReadComplete(); })) .contextWrite(ReactorUtils.context("imap", mdc(session))), message) diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java index 611e84561d..6c8b228d10 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java @@ -183,7 +183,6 @@ public class NettyImapSession implements ImapSession, NettyConstants { } executeSafely(() -> { runnable.run(); - channel.flush(); channel.pipeline().addFirst(SSL_HANDLER, secure.sslHandler()); stopDetectingCommandInjection(); }); @@ -230,7 +229,6 @@ public class NettyImapSession implements ImapSession, NettyConstants { executeSafely(() -> { runnable.run(); - channel.flush(); ZlibDecoder decoder = new JZlibDecoder(ZlibWrapper.NONE); ZlibEncoder encoder = new JZlibEncoder(ZlibWrapper.NONE, 5); @@ -309,9 +307,4 @@ public class NettyImapSession implements ImapSession, NettyConstants { public void schedule(Runnable runnable, Duration waitDelay) { channel.eventLoop().schedule(runnable, waitDelay.toMillis(), TimeUnit.MILLISECONDS); } - - @Override - public void flush() { - channel.flush(); - } } diff --git a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java index 2fbe1f58b6..889ca4a2f6 100644 --- a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java +++ b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java @@ -977,7 +977,7 @@ class IMAPServerTest { .doesNotThrowAnyException(); } - @Test + @RepeatedTest(100) void authenticatePlainShouldSucceed() { assertThatCode(() -> testIMAPClient.connect("127.0.0.1", port) --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org