This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5d85bacd2f398b89fdac74c408cc8878648eb3d4
Author: Benoit TELLIER <[email protected]>
AuthorDate: Wed Apr 3 14:01:18 2024 +0200

    [FIX] Solve Netty native buffer leak
    
    When using the AES blobStore, switching the S3 object consumption out of
    the eventLoop causes the allocation of ByteBuf items onto the boundedElastic
    scheduler. As ByteBuf allocation relies on ThreadLocal, this in turn causes
    the instantiation of a way-too-large count of Netty ByteBuf arena and can
    be perceived as a Native memory leak (far over 1GB allocation, while before
    usage was limited to ~150MB).
    
    As we cannot rely on the previous patch that did block on the S3 driver
    Netty event loop, and as we cannot keep the current native memory
    over-consumption, this patch disables the intermediary file buffering
    when using the AESBlobStore. This was introduced in the first place
    to mitigate the lack of IMAP backpressure. That being solved we can afford
    paying a bit of HEAP onto the AES blob store.
    
    Note the approximate pre-sizing of the output stream in order to limit
    allocation.
---
 .../org/apache/james/blob/aes/AESBlobStoreDAO.java | 50 ++++++----------------
 1 file changed, 12 insertions(+), 38 deletions(-)

diff --git 
a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
 
b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
index e9562c014d..cabbee8d7f 100644
--- 
a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
+++ 
b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
@@ -28,16 +28,15 @@ import java.util.Collection;
 import java.util.Optional;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStoreDAO;
 import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.ObjectNotFoundException;
 import org.apache.james.blob.api.ObjectStoreIOException;
-import org.apache.james.util.ReactorUtils;
 import org.apache.james.util.Size;
 import org.reactivestreams.Publisher;
-import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Preconditions;
@@ -47,7 +46,6 @@ import com.google.common.io.CountingOutputStream;
 import com.google.common.io.FileBackedOutputStream;
 import com.google.crypto.tink.subtle.AesGcmHkdfStreaming;
 
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
@@ -60,10 +58,6 @@ public class AESBlobStoreDAO implements BlobStoreDAO {
         .map(s -> Size.parse(s, Size.Unit.NoUnit))
         .map(s -> (int) s.asBytes())
         .orElse(100 * 1024 * 1024);
-    public static final int FILE_THRESHOLD_DECRYPT = 
Optional.ofNullable(System.getProperty("james.blob.aes.file.threshold.decrypt"))
-        .map(s -> Size.parse(s, Size.Unit.NoUnit))
-        .map(s -> (int) s.asBytes())
-        .orElse(512 * 1024);
     private final BlobStoreDAO underlying;
     private final AesGcmHkdfStreaming streamingAead;
 
@@ -98,34 +92,6 @@ public class AESBlobStoreDAO implements BlobStoreDAO {
         }
     }
 
-    public Mono<byte[]> decryptReactiveByteSource(ReactiveByteSource 
ciphertext, BlobId blobId) {
-        if (ciphertext.getSize() > MAXIMUM_BLOB_SIZE) {
-            throw new RuntimeException(blobId.asString() + " exceeded maximum 
blob size");
-        }
-
-        return Mono.fromCallable(() -> {
-                try {
-                    FileBackedOutputStream decryptedContent = new 
FileBackedOutputStream(FILE_THRESHOLD_DECRYPT);
-                    try {
-                        CountingOutputStream countingOutputStream = new 
CountingOutputStream(decryptedContent);
-                        try (InputStream ciphertextStream = 
ReactorUtils.toInputStream(Flux.from(ciphertext.getContent()))) {
-                            
decrypt(ciphertextStream).transferTo(countingOutputStream);
-                        }
-                        try (InputStream decryptedStream = 
decryptedContent.asByteSource().openStream()) {
-                            return IOUtils.toByteArray(decryptedStream, 
countingOutputStream.getCount());
-                        }
-                    } finally {
-                        decryptedContent.reset();
-                        decryptedContent.close();
-                    }
-                } catch (OutOfMemoryError error) {
-                    LoggerFactory.getLogger(AESBlobStoreDAO.class)
-                        .error("OOM reading {}. Blob size read so far {} 
bytes.", blobId.asString(), ciphertext.getSize());
-                    throw error;
-                }
-            });
-    }
-
     @Override
     public InputStream read(BucketName bucketName, BlobId blobId) throws 
ObjectStoreIOException, ObjectNotFoundException {
         try {
@@ -143,9 +109,17 @@ public class AESBlobStoreDAO implements BlobStoreDAO {
 
     @Override
     public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
-        return Mono.from(underlying.readAsByteSource(bucketName, blobId))
-            .publishOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
-            .flatMap(reactiveByteSource -> 
decryptReactiveByteSource(reactiveByteSource, blobId));
+        return Mono.from(underlying.readBytes(bucketName, blobId))
+            .map(Throwing.function(bytes -> {
+                InputStream inputStream = decrypt(new 
ByteArrayInputStream(bytes));
+                int aesPadding = 128;
+                try (UnsynchronizedByteArrayOutputStream outputStream = 
UnsynchronizedByteArrayOutputStream.builder()
+                    .setBufferSize(bytes.length + aesPadding)
+                    .get()) {
+                    IOUtils.copy(inputStream, outputStream);
+                    return outputStream.toByteArray();
+                }
+            }));
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to