This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 102cc9cacc [ENHANCEMENT] AESBlobStoreDAO::readBytes: remove 1 file copy
102cc9cacc is described below
commit 102cc9caccc6f61c9c066fa028edc5953f99d001
Author: Benoit TELLIER <[email protected]>
AuthorDate: Tue Apr 2 00:08:12 2024 +0200
[ENHANCEMENT] AESBlobStoreDAO::readBytes: remove 1 file copy
---
.../org/apache/james/blob/aes/AESBlobStoreDAO.java | 22 +++++-----------------
1 file changed, 5 insertions(+), 17 deletions(-)
diff --git
a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
index a73495e375..b8d7478f0e 100644
---
a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
+++
b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
@@ -23,8 +23,6 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
import java.security.GeneralSecurityException;
import java.util.Collection;
import java.util.Optional;
@@ -36,6 +34,7 @@ import org.apache.james.blob.api.BlobStoreDAO;
import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.ObjectNotFoundException;
import org.apache.james.blob.api.ObjectStoreIOException;
+import org.apache.james.util.ReactorUtils;
import org.apache.james.util.Size;
import org.reactivestreams.Publisher;
import org.slf4j.LoggerFactory;
@@ -104,18 +103,12 @@ public class AESBlobStoreDAO implements BlobStoreDAO {
throw new RuntimeException(blobId.asString() + " exceeded maximum
blob size");
}
- FileBackedOutputStream encryptedContent = new
FileBackedOutputStream(FILE_THRESHOLD_DECRYPT);
- WritableByteChannel channel = Channels.newChannel(encryptedContent);
-
- return Flux.from(ciphertext.getContent())
- .publishOn(Schedulers.boundedElastic())
- .doOnNext(Throwing.consumer(channel::write))
- .then(Mono.fromCallable(() -> {
+ return Mono.fromCallable(() -> {
try {
FileBackedOutputStream decryptedContent = new
FileBackedOutputStream(FILE_THRESHOLD_DECRYPT);
try {
CountingOutputStream countingOutputStream = new
CountingOutputStream(decryptedContent);
- try (InputStream ciphertextStream =
encryptedContent.asByteSource().openStream()) {
+ try (InputStream ciphertextStream =
ReactorUtils.toInputStream(Flux.from(ciphertext.getContent()))) {
decrypt(ciphertextStream).transferTo(countingOutputStream);
}
try (InputStream decryptedStream =
decryptedContent.asByteSource().openStream()) {
@@ -130,12 +123,7 @@ public class AESBlobStoreDAO implements BlobStoreDAO {
.error("OOM reading {}. Blob size read so far {}
bytes.", blobId.asString(), ciphertext.getSize());
throw error;
}
- }))
- .doFinally(Throwing.consumer(any -> {
- channel.close();
- encryptedContent.reset();
- encryptedContent.close();
- }));
+ });
}
@Override
@@ -157,7 +145,7 @@ public class AESBlobStoreDAO implements BlobStoreDAO {
public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
return Mono.from(underlying.readAsByteSource(bucketName, blobId))
.flatMap(reactiveByteSource ->
decryptReactiveByteSource(reactiveByteSource, blobId))
- .subscribeOn(Schedulers.boundedElastic());
+ .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]