This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5d85bacd2f398b89fdac74c408cc8878648eb3d4 Author: Benoit TELLIER <[email protected]> AuthorDate: Wed Apr 3 14:01:18 2024 +0200 [FIX] Solve Netty native buffer leak When using the AES blobStore, switching the S3 object consumption out of the eventLoop causes the allocation of ByteBuf items onto the boundedElastic scheduler. As ByteBuf allocation relies on ThreadLocal, this in turn causes the instantiation of a way-too-large count of Netty ByteBuf arena and can be perceived as a Native memory leak (far over 1GB allocation, while before usage was limited to ~150MB). As we cannot rely on the previous patch that did block on the S3 driver Netty event loop, and as we cannot keep the current native memory over-consumption, this patch disables the intermediary file buffering when using the AESBlobStore. This was introduced in the first place to mitigate the lack of IMAP backpressure. That being solved we can afford paying a bit of HEAP onto the AES blob store. Note the approximate pre-sizing of the output stream in order to limit allocation. --- .../org/apache/james/blob/aes/AESBlobStoreDAO.java | 50 ++++++---------------- 1 file changed, 12 insertions(+), 38 deletions(-) diff --git a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java index e9562c014d..cabbee8d7f 100644 --- a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java +++ b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java @@ -28,16 +28,15 @@ import java.util.Collection; import java.util.Optional; import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream; import org.apache.commons.lang3.tuple.Pair; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.BucketName; import org.apache.james.blob.api.ObjectNotFoundException; import org.apache.james.blob.api.ObjectStoreIOException; -import org.apache.james.util.ReactorUtils; import org.apache.james.util.Size; import org.reactivestreams.Publisher; -import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; import com.google.common.base.Preconditions; @@ -47,7 +46,6 @@ import com.google.common.io.CountingOutputStream; import com.google.common.io.FileBackedOutputStream; import com.google.crypto.tink.subtle.AesGcmHkdfStreaming; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -60,10 +58,6 @@ public class AESBlobStoreDAO implements BlobStoreDAO { .map(s -> Size.parse(s, Size.Unit.NoUnit)) .map(s -> (int) s.asBytes()) .orElse(100 * 1024 * 1024); - public static final int FILE_THRESHOLD_DECRYPT = Optional.ofNullable(System.getProperty("james.blob.aes.file.threshold.decrypt")) - .map(s -> Size.parse(s, Size.Unit.NoUnit)) - .map(s -> (int) s.asBytes()) - .orElse(512 * 1024); private final BlobStoreDAO underlying; private final AesGcmHkdfStreaming streamingAead; @@ -98,34 +92,6 @@ public class AESBlobStoreDAO implements BlobStoreDAO { } } - public Mono<byte[]> decryptReactiveByteSource(ReactiveByteSource ciphertext, BlobId blobId) { - if (ciphertext.getSize() > MAXIMUM_BLOB_SIZE) { - throw new RuntimeException(blobId.asString() + " exceeded maximum blob size"); - } - - return Mono.fromCallable(() -> { - try { - FileBackedOutputStream decryptedContent = new FileBackedOutputStream(FILE_THRESHOLD_DECRYPT); - try { - CountingOutputStream countingOutputStream = new CountingOutputStream(decryptedContent); - try (InputStream ciphertextStream = ReactorUtils.toInputStream(Flux.from(ciphertext.getContent()))) { - decrypt(ciphertextStream).transferTo(countingOutputStream); - } - try (InputStream decryptedStream = decryptedContent.asByteSource().openStream()) { - return IOUtils.toByteArray(decryptedStream, countingOutputStream.getCount()); - } - } finally { - decryptedContent.reset(); - decryptedContent.close(); - } - } catch (OutOfMemoryError error) { - LoggerFactory.getLogger(AESBlobStoreDAO.class) - .error("OOM reading {}. Blob size read so far {} bytes.", blobId.asString(), ciphertext.getSize()); - throw error; - } - }); - } - @Override public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { try { @@ -143,9 +109,17 @@ public class AESBlobStoreDAO implements BlobStoreDAO { @Override public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) { - return Mono.from(underlying.readAsByteSource(bucketName, blobId)) - .publishOn(ReactorUtils.BLOCKING_CALL_WRAPPER) - .flatMap(reactiveByteSource -> decryptReactiveByteSource(reactiveByteSource, blobId)); + return Mono.from(underlying.readBytes(bucketName, blobId)) + .map(Throwing.function(bytes -> { + InputStream inputStream = decrypt(new ByteArrayInputStream(bytes)); + int aesPadding = 128; + try (UnsynchronizedByteArrayOutputStream outputStream = UnsynchronizedByteArrayOutputStream.builder() + .setBufferSize(bytes.length + aesPadding) + .get()) { + IOUtils.copy(inputStream, outputStream); + return outputStream.toByteArray(); + } + })); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
