This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 802c18e907a2ccfb6700e4fa8e80df1eef2aa1ba Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Mon Feb 26 17:19:00 2024 +0100 JAMES-4007 Avoid leaking litteral flux The oncomplete was never called causing sinks and flux to hang around --- .../james/imapserver/netty/ImapRequestFrameDecoder.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java index f4f0717a59..ddee979b0f 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java @@ -205,14 +205,14 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net } private void uploadToAFile(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, int size, int readerIndex) throws IOException { - Sinks.Many<byte[]> sink; + Pair<Sinks.Many<byte[]>, AtomicInteger> sink; // check if we have created a temporary file already or if // we need to create a new one if (attachment.containsKey(SINK)) { - sink = (Sinks.Many<byte[]>) attachment.get(SINK); + sink = (Pair<Sinks.Many<byte[]>, AtomicInteger>) attachment.get(SINK); } else { - sink = Sinks.many().unicast().onBackpressureBuffer(); + sink = Pair.of(Sinks.many().unicast().onBackpressureBuffer(), new AtomicInteger(0)); attachment.put(SINK, sink); FileChunkConsumer fileChunkConsumer = new FileChunkConsumer(size, @@ -231,7 +231,7 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net ctx.fireExceptionCaught(e); } }); - Disposable subscribe = sink.asFlux() + Disposable subscribe = sink.getLeft().asFlux() .publishOn(Schedulers.boundedElastic()) .subscribe(fileChunkConsumer, e -> { @@ -250,7 +250,10 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net int readableBytes = in.readableBytes(); byte[] bytes = new byte[readableBytes]; in.readBytes(bytes); - sink.emitNext(bytes, FAIL_FAST); + sink.getLeft().emitNext(bytes, FAIL_FAST); + if (sink.getRight().addAndGet(readableBytes) >= size) { + sink.getLeft().tryEmitComplete(); + } } static class FileChunkConsumer implements Consumer<byte[]> { --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org