This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a1440fc7dd1d3cc876b6a2ca47ed756d58f01103 Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Mon Feb 26 17:22:56 2024 +0100 JAMES-4007 Avoid a callback --- .../imapserver/netty/ImapRequestFrameDecoder.java | 61 +++++++++------------- 1 file changed, 25 insertions(+), 36 deletions(-) diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java index ddee979b0f..27628ad1f1 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java @@ -34,7 +34,6 @@ import java.util.Optional; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; import java.util.function.Consumer; import org.apache.commons.lang3.tuple.Pair; @@ -215,22 +214,7 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net sink = Pair.of(Sinks.many().unicast().onBackpressureBuffer(), new AtomicInteger(0)); attachment.put(SINK, sink); - FileChunkConsumer fileChunkConsumer = new FileChunkConsumer(size, - (file, written) -> { - ImapRequestLineReader reader = new NettyStreamImapRequestLineReader(ctx.channel(), file, RETRY); - - try { - parseImapMessage(ctx, null, attachment, Pair.of(reader, size), readerIndex) - .ifPresent(message -> { - ctx.fireChannelRead(message); - // Remove ongoing subscription: now on lifecycle of the message will be managed by ImapChannelUpstreamHandler. - // Not doing this causes IDLEd IMAP connections to clear IMAP append literal while they are processed. - attachment.remove(SUBSCRIPTION); - }); - } catch (DecodingException e) { - ctx.fireExceptionCaught(e); - } - }); + FileChunkConsumer fileChunkConsumer = new FileChunkConsumer(size); Disposable subscribe = sink.getLeft().asFlux() .publishOn(Schedulers.boundedElastic()) .subscribe(fileChunkConsumer, @@ -239,7 +223,20 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net ctx.fireExceptionCaught(e); }, () -> { - + fileChunkConsumer.finalizeDataTransfer(); + ImapRequestLineReader reader = new NettyStreamImapRequestLineReader(ctx.channel(), fileChunkConsumer.getFile(), RETRY); + + try { + parseImapMessage(ctx, null, attachment, Pair.of(reader, size), readerIndex) + .ifPresent(message -> { + ctx.fireChannelRead(message); + // Remove ongoing subscription: now on lifecycle of the message will be managed by ImapChannelUpstreamHandler. + // Not doing this causes IDLEd IMAP connections to clear IMAP append literal while they are processed. + attachment.remove(SUBSCRIPTION); + }); + } catch (DecodingException e) { + ctx.fireExceptionCaught(e); + } }); attachment.put(SUBSCRIPTION, (Disposable) () -> { // Clear the file if the connection is reset while buffering the litteral. @@ -259,14 +256,16 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net static class FileChunkConsumer implements Consumer<byte[]> { private final int size; private final AtomicInteger written = new AtomicInteger(0); - private final BiConsumer<File, Integer> callback; private final AtomicBoolean initialized = new AtomicBoolean(false); private OutputStream outputStream; - private File f; + private File file; - FileChunkConsumer(int size, BiConsumer<File, Integer> callback) { + FileChunkConsumer(int size) { this.size = size; - this.callback = callback; + } + + public File getFile() { + return file; } @Override @@ -276,17 +275,12 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net } writeChunk(next); - - // Check if all needed data was streamed to the file. - if (isComplete()) { - finalizeDataTransfer(); - } } private void initialize() { try { - f = Files.createTempFile("imap-literal", ".tmp").toFile(); - outputStream = new FileOutputStream(f, true); + file = Files.createTempFile("imap-literal", ".tmp").toFile(); + outputStream = new FileOutputStream(file, true); initialized.set(true); } catch (IOException e) { throw new RuntimeException(e); @@ -308,9 +302,6 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net } } - private boolean isComplete() { - return written.get() == size; - } private void finalizeDataTransfer() { try { @@ -319,8 +310,6 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net } catch (IOException ignored) { //ignore exception during close } - - callback.accept(f, written.get()); } void discard() { @@ -328,8 +317,8 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net if (outputStream != null) { outputStream.close(); } - if (f != null) { - Files.delete(f.toPath()); + if (file != null) { + Files.delete(file.toPath()); } })).subscribeOn(Schedulers.boundedElastic()) .subscribe(); --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org