This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 66426a8005 JAMES-4002 AES blob store tuning to reduce memory pressure (#2021) 66426a8005 is described below commit 66426a8005be4ecbb174b95450bd945ae0fd2f73 Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Mon Feb 26 07:27:15 2024 +0100 JAMES-4002 AES blob store tuning to reduce memory pressure (#2021) - Reduce gratly memory pressure thanks to buffering - Handle OOM - turns out in this situation we might recover from them Co-authored-by: Trần Hồng Quân <55171818+quantranhong1...@users.noreply.github.com> --- .../modules/ROOT/pages/configure/blobstore.adoc | 16 +++++ .../sample-configuration/jvm.properties | 12 ++++ server/blob/blob-aes/pom.xml | 4 ++ .../org/apache/james/blob/aes/AESBlobStoreDAO.java | 76 +++++++++++++++++++--- .../org/apache/james/blob/api/BlobStoreDAO.java | 26 ++++++++ .../blob/objectstorage/aws/S3BlobStoreDAO.java | 10 +++ 6 files changed, 135 insertions(+), 9 deletions(-) diff --git a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/blobstore.adoc b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/blobstore.adoc index 3e599f03c5..ca11b9a278 100644 --- a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/blobstore.adoc +++ b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/blobstore.adoc @@ -84,6 +84,22 @@ openssl rand -base64 64 generate salt with : openssl rand -hex 16 .... +AES blob store supports the following system properties that could be configured in `jvm.properties`: + +.... +# Threshold from which we should buffer the blob to a file upon encrypting +# Unit supported: K, M, G, default to no unit +james.blob.aes.file.threshold.encrypt=100K + +# Threshold from which we should buffer the blob to a file upon decrypting +# Unit supported: K, M, G, default to no unit +james.blob.aes.file.threshold.decrypt=256K + +# Maximum size of a blob. Larger blobs will be rejected. +# Unit supported: K, M, G, default to no unit +james.blob.aes.blob.max.size=100M +.... + === Cassandra BlobStore Cache A Cassandra cache can be enabled to reduce latency when reading small blobs frequently. diff --git a/server/apps/distributed-app/sample-configuration/jvm.properties b/server/apps/distributed-app/sample-configuration/jvm.properties index 8cf160fce6..2e78d542f8 100644 --- a/server/apps/distributed-app/sample-configuration/jvm.properties +++ b/server/apps/distributed-app/sample-configuration/jvm.properties @@ -62,3 +62,15 @@ jmx.remote.x.mlet.allow.getMBeansFromURL=false # Default charset to use in JMAP to present text body parts # james.jmap.default.charset=US-ASCII + +# Threshold from which we should buffer the blob to a file upon encrypting +# Unit supported: K, M, G, default to no unit +#james.blob.aes.file.threshold.encrypt=100K + +# Threshold from which we should buffer the blob to a file upon decrypting +# Unit supported: K, M, G, default to no unit +#james.blob.aes.file.threshold.decrypt=256K + +# Maximum size of a blob. Larger blobs will be rejected. +# Unit supported: K, M, G, default to no unit +#james.blob.aes.blob.max.size=100M diff --git a/server/blob/blob-aes/pom.xml b/server/blob/blob-aes/pom.xml index 3de703d6af..842bde3316 100644 --- a/server/blob/blob-aes/pom.xml +++ b/server/blob/blob-aes/pom.xml @@ -66,6 +66,10 @@ <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> + <dependency> + <groupId>org.apache.james</groupId> + <artifactId>james-server-util</artifactId> + </dependency> </dependencies> diff --git a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java index 3d75499fd4..dfd2750f11 100644 --- a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java +++ b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java @@ -23,8 +23,11 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import java.security.GeneralSecurityException; import java.util.Collection; +import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.james.blob.api.BlobId; @@ -32,20 +35,35 @@ import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.BucketName; import org.apache.james.blob.api.ObjectNotFoundException; import org.apache.james.blob.api.ObjectStoreIOException; +import org.apache.james.util.Size; import org.reactivestreams.Publisher; +import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; import com.google.common.base.Preconditions; import com.google.common.io.ByteSource; +import com.google.common.io.ByteStreams; +import com.google.common.io.CountingOutputStream; import com.google.common.io.FileBackedOutputStream; import com.google.crypto.tink.subtle.AesGcmHkdfStreaming; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class AESBlobStoreDAO implements BlobStoreDAO { - // For now, aligned with with MimeMessageInputStreamSource file threshold, detailed benchmarking might be conducted to challenge this choice - public static final int FILE_THRESHOLD_100_KB = 100 * 1024; + public static final int FILE_THRESHOLD_ENCRYPT = Optional.ofNullable(System.getProperty("james.blob.aes.file.threshold.encrypt")) + .map(s -> Size.parse(s, Size.Unit.NoUnit)) + .map(s -> (int) s.asBytes()) + .orElse(100 * 1024); + public static final int MAXIMUM_BLOB_SIZE = Optional.ofNullable(System.getProperty("james.blob.aes.blob.max.size")) + .map(s -> Size.parse(s, Size.Unit.NoUnit)) + .map(s -> (int) s.asBytes()) + .orElse(100 * 1024 * 1024); + public static final int FILE_THRESHOLD_DECRYPT = Optional.ofNullable(System.getProperty("james.blob.aes.file.threshold.decrypt")) + .map(s -> Size.parse(s, Size.Unit.NoUnit)) + .map(s -> (int) s.asBytes()) + .orElse(512 * 1024); private final BlobStoreDAO underlying; private final AesGcmHkdfStreaming streamingAead; @@ -55,7 +73,7 @@ public class AESBlobStoreDAO implements BlobStoreDAO { } public FileBackedOutputStream encrypt(InputStream input) { - try (FileBackedOutputStream encryptedContent = new FileBackedOutputStream(FILE_THRESHOLD_100_KB)) { + try (FileBackedOutputStream encryptedContent = new FileBackedOutputStream(FILE_THRESHOLD_ENCRYPT)) { OutputStream outputStream = streamingAead.newEncryptingStream(encryptedContent, PBKDF2StreamingAeadFactory.EMPTY_ASSOCIATED_DATA); input.transferTo(outputStream); outputStream.close(); @@ -68,12 +86,52 @@ public class AESBlobStoreDAO implements BlobStoreDAO { public InputStream decrypt(InputStream ciphertext) throws IOException { // We break symmetry and avoid allocating resources like files as we are not able, in higher level APIs (mailbox) to do resource cleanup. try { - return streamingAead.newDecryptingStream(ciphertext, PBKDF2StreamingAeadFactory.EMPTY_ASSOCIATED_DATA); + return ByteStreams.limit( + streamingAead.newDecryptingStream(ciphertext, PBKDF2StreamingAeadFactory.EMPTY_ASSOCIATED_DATA), + MAXIMUM_BLOB_SIZE); } catch (GeneralSecurityException e) { throw new IOException("Incorrect crypto setup", e); } } + public Mono<byte[]> decryptReactiveByteSource(ReactiveByteSource ciphertext, BlobId blobId) { + if (ciphertext.getSize() > MAXIMUM_BLOB_SIZE) { + throw new RuntimeException(blobId.asString() + " exceeded maximum blob size"); + } + + FileBackedOutputStream encryptedContent = new FileBackedOutputStream(FILE_THRESHOLD_DECRYPT); + WritableByteChannel channel = Channels.newChannel(encryptedContent); + + return Flux.from(ciphertext.getContent()) + .doOnNext(Throwing.consumer(channel::write)) + .then(Mono.fromCallable(() -> { + try { + FileBackedOutputStream decryptedContent = new FileBackedOutputStream(FILE_THRESHOLD_DECRYPT); + try { + CountingOutputStream countingOutputStream = new CountingOutputStream(decryptedContent); + try (InputStream ciphertextStream = encryptedContent.asByteSource().openStream()) { + decrypt(ciphertextStream).transferTo(countingOutputStream); + } + try (InputStream decryptedStream = decryptedContent.asByteSource().openStream()) { + return IOUtils.toByteArray(decryptedStream, countingOutputStream.getCount()); + } + } finally { + decryptedContent.reset(); + decryptedContent.close(); + } + } catch (OutOfMemoryError error) { + LoggerFactory.getLogger(AESBlobStoreDAO.class) + .error("OOM reading {}. Blob size read so far {} bytes.", blobId.asString(), ciphertext.getSize()); + throw error; + } + })) + .doFinally(Throwing.consumer(any -> { + channel.close(); + encryptedContent.reset(); + encryptedContent.close(); + })); + } + @Override public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { try { @@ -91,10 +149,9 @@ public class AESBlobStoreDAO implements BlobStoreDAO { @Override public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) { - return Mono.from(underlying.readBytes(bucketName, blobId)) - .map(ByteArrayInputStream::new) - .map(Throwing.function(this::decrypt)) - .map(Throwing.function(IOUtils::toByteArray)); + return Mono.from(underlying.readAsByteSource(bucketName, blobId)) + .flatMap(reactiveByteSource -> decryptReactiveByteSource(reactiveByteSource, blobId)) + .subscribeOn(Schedulers.boundedElastic()); } @Override @@ -128,7 +185,8 @@ public class AESBlobStoreDAO implements BlobStoreDAO { return Mono.using(content::openStream, in -> Mono.from(save(bucketName, blobId, in)), - Throwing.consumer(InputStream::close)); + Throwing.consumer(InputStream::close)) + .subscribeOn(Schedulers.boundedElastic()); } @Override diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java index ac962047ca..98312dd0c2 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java @@ -20,6 +20,7 @@ package org.apache.james.blob.api; import java.io.InputStream; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collection; @@ -27,7 +28,27 @@ import org.reactivestreams.Publisher; import com.google.common.io.ByteSource; +import reactor.core.publisher.Mono; + public interface BlobStoreDAO { + class ReactiveByteSource { + private final long size; + private final Publisher<ByteBuffer> content; + + public ReactiveByteSource(long size, Publisher<ByteBuffer> content) { + this.size = size; + this.content = content; + } + + public long getSize() { + return size; + } + + public Publisher<ByteBuffer> getContent() { + return content; + } + } + /** * Reads a Blob based on its BucketName and its BlobId. @@ -37,6 +58,11 @@ public interface BlobStoreDAO { */ InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException; + default Publisher<ReactiveByteSource> readAsByteSource(BucketName bucketName, BlobId blobId) { + return Mono.from(readBytes(bucketName, blobId)) + .map(bytes -> new ReactiveByteSource(bytes.length, Mono.just(ByteBuffer.wrap(bytes)))); + } + Publisher<InputStream> readReactive(BucketName bucketName, BlobId blobId); /** diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java index 2595558b81..8922f793fc 100644 --- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java +++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java @@ -235,6 +235,16 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable { .map(res -> ReactorUtils.toInputStream(res.flux)); } + @Override + public Publisher<ReactiveByteSource> readAsByteSource(BucketName bucketName, BlobId blobId) { + BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); + + return getObject(resolvedBucketName, blobId) + .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e)) + .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + resolvedBucketName.asString(), e)) + .map(res -> new ReactiveByteSource(res.sdkResponse.contentLength(), res.flux)); + } + private static class FluxResponse { final CompletableFuture<FluxResponse> supportingCompletableFuture = new CompletableFuture<>(); GetObjectResponse sdkResponse; --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org