This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 802c18e907a2ccfb6700e4fa8e80df1eef2aa1ba
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Mon Feb 26 17:19:00 2024 +0100

    JAMES-4007 Avoid leaking litteral flux
    
    The oncomplete was never called causing sinks and flux to
    hang around
---
 .../james/imapserver/netty/ImapRequestFrameDecoder.java     | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
index f4f0717a59..ddee979b0f 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
@@ -205,14 +205,14 @@ public class ImapRequestFrameDecoder extends 
ByteToMessageDecoder implements Net
     }
 
     private void uploadToAFile(ChannelHandlerContext ctx, ByteBuf in, 
Map<String, Object> attachment, int size, int readerIndex) throws IOException {
-        Sinks.Many<byte[]> sink;
+        Pair<Sinks.Many<byte[]>, AtomicInteger> sink;
 
         // check if we have created a temporary file already or if
         // we need to create a new one
         if (attachment.containsKey(SINK)) {
-            sink = (Sinks.Many<byte[]>) attachment.get(SINK);
+            sink = (Pair<Sinks.Many<byte[]>, AtomicInteger>) 
attachment.get(SINK);
         } else {
-            sink = Sinks.many().unicast().onBackpressureBuffer();
+            sink = Pair.of(Sinks.many().unicast().onBackpressureBuffer(), new 
AtomicInteger(0));
             attachment.put(SINK, sink);
 
             FileChunkConsumer fileChunkConsumer = new FileChunkConsumer(size,
@@ -231,7 +231,7 @@ public class ImapRequestFrameDecoder extends 
ByteToMessageDecoder implements Net
                         ctx.fireExceptionCaught(e);
                     }
                 });
-            Disposable subscribe = sink.asFlux()
+            Disposable subscribe = sink.getLeft().asFlux()
                 .publishOn(Schedulers.boundedElastic())
                 .subscribe(fileChunkConsumer,
                     e -> {
@@ -250,7 +250,10 @@ public class ImapRequestFrameDecoder extends 
ByteToMessageDecoder implements Net
         int readableBytes = in.readableBytes();
         byte[] bytes = new byte[readableBytes];
         in.readBytes(bytes);
-        sink.emitNext(bytes, FAIL_FAST);
+        sink.getLeft().emitNext(bytes, FAIL_FAST);
+        if (sink.getRight().addAndGet(readableBytes) >= size) {
+            sink.getLeft().tryEmitComplete();
+        }
     }
 
     static class FileChunkConsumer implements Consumer<byte[]> {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to