This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a1440fc7dd1d3cc876b6a2ca47ed756d58f01103
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Mon Feb 26 17:22:56 2024 +0100

    JAMES-4007 Avoid a callback
---
 .../imapserver/netty/ImapRequestFrameDecoder.java  | 61 +++++++++-------------
 1 file changed, 25 insertions(+), 36 deletions(-)

diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
index ddee979b0f..27628ad1f1 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
@@ -34,7 +34,6 @@ import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -215,22 +214,7 @@ public class ImapRequestFrameDecoder extends 
ByteToMessageDecoder implements Net
             sink = Pair.of(Sinks.many().unicast().onBackpressureBuffer(), new 
AtomicInteger(0));
             attachment.put(SINK, sink);
 
-            FileChunkConsumer fileChunkConsumer = new FileChunkConsumer(size,
-                (file, written) -> {
-                    ImapRequestLineReader reader = new 
NettyStreamImapRequestLineReader(ctx.channel(), file, RETRY);
-
-                    try {
-                        parseImapMessage(ctx, null, attachment, 
Pair.of(reader, size), readerIndex)
-                            .ifPresent(message -> {
-                                ctx.fireChannelRead(message);
-                                // Remove ongoing subscription: now on 
lifecycle of the message will be managed by ImapChannelUpstreamHandler.
-                                // Not doing this causes IDLEd IMAP 
connections to clear IMAP append literal while they are processed.
-                                attachment.remove(SUBSCRIPTION);
-                            });
-                    } catch (DecodingException e) {
-                        ctx.fireExceptionCaught(e);
-                    }
-                });
+            FileChunkConsumer fileChunkConsumer = new FileChunkConsumer(size);
             Disposable subscribe = sink.getLeft().asFlux()
                 .publishOn(Schedulers.boundedElastic())
                 .subscribe(fileChunkConsumer,
@@ -239,7 +223,20 @@ public class ImapRequestFrameDecoder extends 
ByteToMessageDecoder implements Net
                         ctx.fireExceptionCaught(e);
                     },
                     () -> {
-
+                        fileChunkConsumer.finalizeDataTransfer();
+                        ImapRequestLineReader reader = new 
NettyStreamImapRequestLineReader(ctx.channel(), fileChunkConsumer.getFile(), 
RETRY);
+
+                        try {
+                            parseImapMessage(ctx, null, attachment, 
Pair.of(reader, size), readerIndex)
+                                .ifPresent(message -> {
+                                    ctx.fireChannelRead(message);
+                                    // Remove ongoing subscription: now on 
lifecycle of the message will be managed by ImapChannelUpstreamHandler.
+                                    // Not doing this causes IDLEd IMAP 
connections to clear IMAP append literal while they are processed.
+                                    attachment.remove(SUBSCRIPTION);
+                                });
+                        } catch (DecodingException e) {
+                            ctx.fireExceptionCaught(e);
+                        }
                     });
             attachment.put(SUBSCRIPTION, (Disposable) () -> {
                 // Clear the file if the connection is reset while buffering 
the litteral.
@@ -259,14 +256,16 @@ public class ImapRequestFrameDecoder extends 
ByteToMessageDecoder implements Net
     static class FileChunkConsumer implements Consumer<byte[]> {
         private final int size;
         private final AtomicInteger written = new AtomicInteger(0);
-        private final BiConsumer<File, Integer> callback;
         private final AtomicBoolean initialized = new AtomicBoolean(false);
         private OutputStream outputStream;
-        private File f;
+        private File file;
 
-        FileChunkConsumer(int size, BiConsumer<File, Integer> callback) {
+        FileChunkConsumer(int size) {
             this.size = size;
-            this.callback = callback;
+        }
+
+        public File getFile() {
+            return file;
         }
 
         @Override
@@ -276,17 +275,12 @@ public class ImapRequestFrameDecoder extends 
ByteToMessageDecoder implements Net
             }
 
             writeChunk(next);
-
-            // Check if all needed data was streamed to the file.
-            if (isComplete()) {
-                finalizeDataTransfer();
-            }
         }
 
         private void initialize() {
             try {
-                f = Files.createTempFile("imap-literal", ".tmp").toFile();
-                outputStream = new FileOutputStream(f, true);
+                file = Files.createTempFile("imap-literal", ".tmp").toFile();
+                outputStream = new FileOutputStream(file, true);
                 initialized.set(true);
             } catch (IOException e) {
                 throw new RuntimeException(e);
@@ -308,9 +302,6 @@ public class ImapRequestFrameDecoder extends 
ByteToMessageDecoder implements Net
             }
         }
 
-        private boolean isComplete() {
-            return written.get() == size;
-        }
 
         private void finalizeDataTransfer() {
             try {
@@ -319,8 +310,6 @@ public class ImapRequestFrameDecoder extends 
ByteToMessageDecoder implements Net
             } catch (IOException ignored) {
                 //ignore exception during close
             }
-
-            callback.accept(f, written.get());
         }
 
         void discard() {
@@ -328,8 +317,8 @@ public class ImapRequestFrameDecoder extends 
ByteToMessageDecoder implements Net
                 if (outputStream != null) {
                     outputStream.close();
                 }
-                if (f != null) {
-                    Files.delete(f.toPath());
+                if (file != null) {
+                    Files.delete(file.toPath());
                 }
             })).subscribeOn(Schedulers.boundedElastic())
                 .subscribe();


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to