This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 66426a8005 JAMES-4002 AES blob store tuning to reduce memory pressure 
(#2021)
66426a8005 is described below

commit 66426a8005be4ecbb174b95450bd945ae0fd2f73
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Mon Feb 26 07:27:15 2024 +0100

    JAMES-4002 AES blob store tuning to reduce memory pressure (#2021)
    
    
     - Reduce gratly memory pressure thanks to buffering
     - Handle OOM - turns out in this situation we might recover from them
    
    Co-authored-by: Trần Hồng Quân 
<55171818+quantranhong1...@users.noreply.github.com>
---
 .../modules/ROOT/pages/configure/blobstore.adoc    | 16 +++++
 .../sample-configuration/jvm.properties            | 12 ++++
 server/blob/blob-aes/pom.xml                       |  4 ++
 .../org/apache/james/blob/aes/AESBlobStoreDAO.java | 76 +++++++++++++++++++---
 .../org/apache/james/blob/api/BlobStoreDAO.java    | 26 ++++++++
 .../blob/objectstorage/aws/S3BlobStoreDAO.java     | 10 +++
 6 files changed, 135 insertions(+), 9 deletions(-)

diff --git 
a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/blobstore.adoc 
b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/blobstore.adoc
index 3e599f03c5..ca11b9a278 100644
--- 
a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/blobstore.adoc
+++ 
b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/blobstore.adoc
@@ -84,6 +84,22 @@ openssl rand -base64 64
 generate salt with : openssl rand -hex 16
 ....
 
+AES blob store supports the following system properties that could be 
configured in `jvm.properties`:
+
+....
+# Threshold from which we should buffer the blob to a file upon encrypting
+# Unit supported: K, M, G, default to no unit
+james.blob.aes.file.threshold.encrypt=100K
+
+# Threshold from which we should buffer the blob to a file upon decrypting
+# Unit supported: K, M, G, default to no unit
+james.blob.aes.file.threshold.decrypt=256K
+
+# Maximum size of a blob. Larger blobs will be rejected.
+# Unit supported: K, M, G, default to no unit
+james.blob.aes.blob.max.size=100M
+....
+
 === Cassandra BlobStore Cache
 
 A Cassandra cache can be enabled to reduce latency when reading small blobs 
frequently.
diff --git a/server/apps/distributed-app/sample-configuration/jvm.properties 
b/server/apps/distributed-app/sample-configuration/jvm.properties
index 8cf160fce6..2e78d542f8 100644
--- a/server/apps/distributed-app/sample-configuration/jvm.properties
+++ b/server/apps/distributed-app/sample-configuration/jvm.properties
@@ -62,3 +62,15 @@ jmx.remote.x.mlet.allow.getMBeansFromURL=false
 
 # Default charset to use in JMAP to present text body parts
 # james.jmap.default.charset=US-ASCII
+
+# Threshold from which we should buffer the blob to a file upon encrypting
+# Unit supported: K, M, G, default to no unit
+#james.blob.aes.file.threshold.encrypt=100K
+
+# Threshold from which we should buffer the blob to a file upon decrypting
+# Unit supported: K, M, G, default to no unit
+#james.blob.aes.file.threshold.decrypt=256K
+
+# Maximum size of a blob. Larger blobs will be rejected.
+# Unit supported: K, M, G, default to no unit
+#james.blob.aes.blob.max.size=100M
diff --git a/server/blob/blob-aes/pom.xml b/server/blob/blob-aes/pom.xml
index 3de703d6af..842bde3316 100644
--- a/server/blob/blob-aes/pom.xml
+++ b/server/blob/blob-aes/pom.xml
@@ -66,6 +66,10 @@
             <groupId>io.projectreactor</groupId>
             <artifactId>reactor-core</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.james</groupId>
+            <artifactId>james-server-util</artifactId>
+        </dependency>
     </dependencies>
 
 
diff --git 
a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
 
b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
index 3d75499fd4..dfd2750f11 100644
--- 
a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
+++ 
b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
@@ -23,8 +23,11 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 import java.security.GeneralSecurityException;
 import java.util.Collection;
+import java.util.Optional;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.james.blob.api.BlobId;
@@ -32,20 +35,35 @@ import org.apache.james.blob.api.BlobStoreDAO;
 import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.ObjectNotFoundException;
 import org.apache.james.blob.api.ObjectStoreIOException;
+import org.apache.james.util.Size;
 import org.reactivestreams.Publisher;
+import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Preconditions;
 import com.google.common.io.ByteSource;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
 import com.google.common.io.FileBackedOutputStream;
 import com.google.crypto.tink.subtle.AesGcmHkdfStreaming;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 public class AESBlobStoreDAO implements BlobStoreDAO {
-    // For now, aligned with with MimeMessageInputStreamSource file threshold, 
detailed benchmarking might be conducted to challenge this choice
-    public static final int FILE_THRESHOLD_100_KB = 100 * 1024;
+    public static final int FILE_THRESHOLD_ENCRYPT = 
Optional.ofNullable(System.getProperty("james.blob.aes.file.threshold.encrypt"))
+        .map(s -> Size.parse(s, Size.Unit.NoUnit))
+        .map(s -> (int) s.asBytes())
+        .orElse(100 * 1024);
+    public static final int MAXIMUM_BLOB_SIZE =  
Optional.ofNullable(System.getProperty("james.blob.aes.blob.max.size"))
+        .map(s -> Size.parse(s, Size.Unit.NoUnit))
+        .map(s -> (int) s.asBytes())
+        .orElse(100 * 1024 * 1024);
+    public static final int FILE_THRESHOLD_DECRYPT = 
Optional.ofNullable(System.getProperty("james.blob.aes.file.threshold.decrypt"))
+        .map(s -> Size.parse(s, Size.Unit.NoUnit))
+        .map(s -> (int) s.asBytes())
+        .orElse(512 * 1024);
     private final BlobStoreDAO underlying;
     private final AesGcmHkdfStreaming streamingAead;
 
@@ -55,7 +73,7 @@ public class AESBlobStoreDAO implements BlobStoreDAO {
     }
 
     public FileBackedOutputStream encrypt(InputStream input) {
-        try (FileBackedOutputStream encryptedContent = new 
FileBackedOutputStream(FILE_THRESHOLD_100_KB)) {
+        try (FileBackedOutputStream encryptedContent = new 
FileBackedOutputStream(FILE_THRESHOLD_ENCRYPT)) {
             OutputStream outputStream = 
streamingAead.newEncryptingStream(encryptedContent, 
PBKDF2StreamingAeadFactory.EMPTY_ASSOCIATED_DATA);
             input.transferTo(outputStream);
             outputStream.close();
@@ -68,12 +86,52 @@ public class AESBlobStoreDAO implements BlobStoreDAO {
     public InputStream decrypt(InputStream ciphertext) throws IOException {
         // We break symmetry and avoid allocating resources like files as we 
are not able, in higher level APIs (mailbox) to do resource cleanup.
         try {
-            return streamingAead.newDecryptingStream(ciphertext, 
PBKDF2StreamingAeadFactory.EMPTY_ASSOCIATED_DATA);
+            return ByteStreams.limit(
+                streamingAead.newDecryptingStream(ciphertext, 
PBKDF2StreamingAeadFactory.EMPTY_ASSOCIATED_DATA),
+                    MAXIMUM_BLOB_SIZE);
         } catch (GeneralSecurityException e) {
             throw new IOException("Incorrect crypto setup", e);
         }
     }
 
+    public Mono<byte[]> decryptReactiveByteSource(ReactiveByteSource 
ciphertext, BlobId blobId) {
+        if (ciphertext.getSize() > MAXIMUM_BLOB_SIZE) {
+            throw new RuntimeException(blobId.asString() + " exceeded maximum 
blob size");
+        }
+
+        FileBackedOutputStream encryptedContent = new 
FileBackedOutputStream(FILE_THRESHOLD_DECRYPT);
+        WritableByteChannel channel = Channels.newChannel(encryptedContent);
+
+        return Flux.from(ciphertext.getContent())
+            .doOnNext(Throwing.consumer(channel::write))
+            .then(Mono.fromCallable(() -> {
+                try {
+                    FileBackedOutputStream decryptedContent = new 
FileBackedOutputStream(FILE_THRESHOLD_DECRYPT);
+                    try {
+                        CountingOutputStream countingOutputStream = new 
CountingOutputStream(decryptedContent);
+                        try (InputStream ciphertextStream = 
encryptedContent.asByteSource().openStream()) {
+                            
decrypt(ciphertextStream).transferTo(countingOutputStream);
+                        }
+                        try (InputStream decryptedStream = 
decryptedContent.asByteSource().openStream()) {
+                            return IOUtils.toByteArray(decryptedStream, 
countingOutputStream.getCount());
+                        }
+                    } finally {
+                        decryptedContent.reset();
+                        decryptedContent.close();
+                    }
+                } catch (OutOfMemoryError error) {
+                    LoggerFactory.getLogger(AESBlobStoreDAO.class)
+                        .error("OOM reading {}. Blob size read so far {} 
bytes.", blobId.asString(), ciphertext.getSize());
+                    throw error;
+                }
+            }))
+            .doFinally(Throwing.consumer(any -> {
+                channel.close();
+                encryptedContent.reset();
+                encryptedContent.close();
+            }));
+    }
+
     @Override
     public InputStream read(BucketName bucketName, BlobId blobId) throws 
ObjectStoreIOException, ObjectNotFoundException {
         try {
@@ -91,10 +149,9 @@ public class AESBlobStoreDAO implements BlobStoreDAO {
 
     @Override
     public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
-        return Mono.from(underlying.readBytes(bucketName, blobId))
-            .map(ByteArrayInputStream::new)
-            .map(Throwing.function(this::decrypt))
-            .map(Throwing.function(IOUtils::toByteArray));
+        return Mono.from(underlying.readAsByteSource(bucketName, blobId))
+            .flatMap(reactiveByteSource -> 
decryptReactiveByteSource(reactiveByteSource, blobId))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     @Override
@@ -128,7 +185,8 @@ public class AESBlobStoreDAO implements BlobStoreDAO {
 
         return Mono.using(content::openStream,
             in -> Mono.from(save(bucketName, blobId, in)),
-            Throwing.consumer(InputStream::close));
+            Throwing.consumer(InputStream::close))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     @Override
diff --git 
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java
 
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java
index ac962047ca..98312dd0c2 100644
--- 
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java
+++ 
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java
@@ -20,6 +20,7 @@
 package org.apache.james.blob.api;
 
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 
@@ -27,7 +28,27 @@ import org.reactivestreams.Publisher;
 
 import com.google.common.io.ByteSource;
 
+import reactor.core.publisher.Mono;
+
 public interface BlobStoreDAO {
+    class ReactiveByteSource {
+        private final long size;
+        private final Publisher<ByteBuffer> content;
+
+        public ReactiveByteSource(long size, Publisher<ByteBuffer> content) {
+            this.size = size;
+            this.content = content;
+        }
+
+        public long getSize() {
+            return size;
+        }
+
+        public Publisher<ByteBuffer> getContent() {
+            return content;
+        }
+    }
+
 
     /**
      * Reads a Blob based on its BucketName and its BlobId.
@@ -37,6 +58,11 @@ public interface BlobStoreDAO {
      */
     InputStream read(BucketName bucketName, BlobId blobId) throws 
ObjectStoreIOException, ObjectNotFoundException;
 
+    default Publisher<ReactiveByteSource> readAsByteSource(BucketName 
bucketName, BlobId blobId) {
+        return Mono.from(readBytes(bucketName, blobId))
+            .map(bytes -> new ReactiveByteSource(bytes.length, 
Mono.just(ByteBuffer.wrap(bytes))));
+    }
+
     Publisher<InputStream> readReactive(BucketName bucketName, BlobId blobId);
 
     /**
diff --git 
a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
 
b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index 2595558b81..8922f793fc 100644
--- 
a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ 
b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -235,6 +235,16 @@ public class S3BlobStoreDAO implements BlobStoreDAO, 
Startable, Closeable {
             .map(res -> ReactorUtils.toInputStream(res.flux));
     }
 
+    @Override
+    public Publisher<ReactiveByteSource> readAsByteSource(BucketName 
bucketName, BlobId blobId) {
+        BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
+
+        return getObject(resolvedBucketName, blobId)
+            .onErrorMap(NoSuchBucketException.class, e -> new 
ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e))
+            .onErrorMap(NoSuchKeyException.class, e -> new 
ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + 
resolvedBucketName.asString(), e))
+            .map(res -> new 
ReactiveByteSource(res.sdkResponse.contentLength(), res.flux));
+    }
+
     private static class FluxResponse {
         final CompletableFuture<FluxResponse> supportingCompletableFuture = 
new CompletableFuture<>();
         GetObjectResponse sdkResponse;


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to