This is an automated email from the ASF dual-hosted git repository.

hqtran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 6b0f830c32199bec43f7ebc7290dfd182732fbc1
Author: Quan Tran <[email protected]>
AuthorDate: Thu Apr 9 16:04:01 2026 +0700

    JAMES-4182 Implement blob metadata storage for S3
---
 .../blob/objectstorage/aws/S3BlobStoreDAO.java     | 65 ++++++++++++++--------
 .../blob/objectstorage/aws/S3BlobStoreDAOTest.java |  3 +-
 .../james/blob/objectstorage/aws/S3MinioTest.java  |  3 +-
 3 files changed, 46 insertions(+), 25 deletions(-)

diff --git 
a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
 
b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index cf942a986c..1ed6a133ab 100644
--- 
a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ 
b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 
@@ -47,6 +48,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.io.ByteSource;
 import com.google.common.io.FileBackedOutputStream;
 
@@ -54,7 +56,6 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import reactor.util.retry.RetryBackoffSpec;
-import software.amazon.awssdk.core.BytesWrapper;
 import software.amazon.awssdk.core.ResponseBytes;
 import software.amazon.awssdk.core.async.AsyncRequestBody;
 import software.amazon.awssdk.core.async.AsyncResponseTransformer;
@@ -139,11 +140,14 @@ public class S3BlobStoreDAO implements BlobStoreDAO {
     public InputStreamBlob read(BucketName bucketName, BlobId blobId) throws 
ObjectStoreIOException, ObjectNotFoundException {
         BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
 
-        return 
InputStreamBlob.of(ReactorUtils.toInputStream(getObject(resolvedBucketName, 
blobId)
+        FluxResponse response = getObject(resolvedBucketName, blobId)
             .onErrorMap(NoSuchBucketException.class, e -> new 
ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e))
             .onErrorMap(NoSuchKeyException.class, e -> new 
ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + 
resolvedBucketName.asString(), e))
-            .block()
-            .flux));
+            .block();
+
+        return InputStreamBlob.of(
+            ReactorUtils.toInputStream(response.flux),
+            asBlobMetadata(response.sdkResponse.metadata()));
     }
 
     @Override
@@ -154,7 +158,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO {
             .onErrorMap(NoSuchBucketException.class, e -> new 
ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e))
             .onErrorMap(NoSuchKeyException.class, e -> new 
ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + 
resolvedBucketName.asString(), e))
             .publishOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
-            .map(res -> 
InputStreamBlob.of(ReactorUtils.toInputStream(res.flux)));
+            .map(res -> 
InputStreamBlob.of(ReactorUtils.toInputStream(res.flux), 
asBlobMetadata(res.sdkResponse.metadata())));
     }
 
     private static class FluxResponse {
@@ -215,9 +219,8 @@ public class S3BlobStoreDAO implements BlobStoreDAO {
             .onErrorMap(NoSuchBucketException.class, e -> new 
ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e))
             .onErrorMap(NoSuchKeyException.class, e -> new 
ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + 
resolvedBucketName.asString(), e))
             .publishOn(Schedulers.parallel())
-            .map(BytesWrapper::asByteArrayUnsafe)
-            .onErrorMap(e -> e.getCause() instanceof OutOfMemoryError, 
Throwable::getCause)
-            .map(BytesBlob::of);
+            .map(responseBytes -> 
BytesBlob.of(responseBytes.asByteArrayUnsafe(), 
asBlobMetadata(responseBytes.response().metadata())))
+            .onErrorMap(e -> e.getCause() instanceof OutOfMemoryError, 
Throwable::getCause);
     }
 
     private Mono<ResponseBytes<GetObjectResponse>> getObjectBytes(BucketName 
bucketName, BlobId blobId) {
@@ -257,16 +260,16 @@ public class S3BlobStoreDAO implements BlobStoreDAO {
     @Override
     public Publisher<Void> save(BucketName bucketName, BlobId blobId, Blob 
blob) {
         return switch (blob) {
-            case BytesBlob bytesBlob -> save(bucketName, blobId, 
bytesBlob.payload());
-            case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, 
inputStreamBlob.payload());
-            case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, 
byteSourceBlob.payload());
+            case BytesBlob bytesBlob -> save(bucketName, blobId, 
bytesBlob.payload(), bytesBlob.metadata());
+            case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, 
inputStreamBlob.payload(), inputStreamBlob.metadata());
+            case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, 
byteSourceBlob.payload(), byteSourceBlob.metadata());
         };
     }
 
-    public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
+    private Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data, 
BlobMetadata blobMetadata) {
         BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
 
-        return buildPutObjectRequestBuilder(resolvedBucketName, data.length, 
blobId)
+        return buildPutObjectRequestBuilder(resolvedBucketName, data.length, 
blobId, blobMetadata)
             .flatMap(putObjectRequest -> Mono.fromFuture(() ->
                     client.putObject(putObjectRequest.build(), 
AsyncRequestBody.fromBytes(data)))
                 .retryWhen(createBucketOnRetry(resolvedBucketName))
@@ -274,32 +277,32 @@ public class S3BlobStoreDAO implements BlobStoreDAO {
             .then();
     }
 
-    public Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream 
inputStream) {
+    private Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream 
inputStream, BlobMetadata blobMetadata) {
         Preconditions.checkNotNull(inputStream);
 
-        return uploadUsingFile(bucketName, blobId, inputStream);
+        return uploadUsingFile(bucketName, blobId, inputStream, blobMetadata);
     }
 
-    private Mono<Void> uploadUsingFile(BucketName bucketName, BlobId blobId, 
InputStream inputStream) {
+    private Mono<Void> uploadUsingFile(BucketName bucketName, BlobId blobId, 
InputStream inputStream, BlobMetadata blobMetadata) {
         return Mono.using(
             () -> new FileBackedOutputStream(FILE_THRESHOLD),
             fileBackedOutputStream ->
                 Mono.fromCallable(() -> IOUtils.copy(inputStream, 
fileBackedOutputStream))
-                    .flatMap(size -> save(bucketName, blobId, new 
FileBackedOutputStreamByteSource(fileBackedOutputStream, size))),
+                    .flatMap(size -> save(bucketName, blobId, new 
FileBackedOutputStreamByteSource(fileBackedOutputStream, size), blobMetadata)),
             Throwing.consumer(FileBackedOutputStream::reset),
             LAZY)
             .onErrorMap(IOException.class, e -> new 
ObjectStoreIOException("Error saving blob", e))
             .publishOn(Schedulers.parallel());
     }
 
-    public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource 
content) {
+    private Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource 
content, BlobMetadata metadata) {
         BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
 
         return Mono.fromCallable(content::size)
             .subscribeOn(Schedulers.boundedElastic())
             .flatMap(contentLength ->
                 
Mono.usingWhen(Mono.fromCallable(content::openStream).subscribeOn(Schedulers.boundedElastic()),
-                    stream -> save(resolvedBucketName, blobId, stream, 
contentLength),
+                    stream -> save(resolvedBucketName, blobId, stream, 
contentLength, metadata),
                     stream -> 
Mono.fromRunnable(Throwing.runnable(stream::close))))
             .retryWhen(createBucketOnRetry(resolvedBucketName))
             .onErrorMap(IOException.class, e -> new 
ObjectStoreIOException("Error saving blob", e))
@@ -308,20 +311,21 @@ public class S3BlobStoreDAO implements BlobStoreDAO {
             .then();
     }
 
-    private Mono<PutObjectResponse> save(BucketName resolvedBucketName, BlobId 
blobId, InputStream stream, long contentLength) {
+    private Mono<PutObjectResponse> save(BucketName resolvedBucketName, BlobId 
blobId, InputStream stream, long contentLength, BlobMetadata metadata) {
         int chunkSize = Math.min((int) contentLength, CHUNK_SIZE);
 
-        return buildPutObjectRequestBuilder(resolvedBucketName, contentLength, 
blobId)
+        return buildPutObjectRequestBuilder(resolvedBucketName, contentLength, 
blobId, metadata)
             .flatMap(putObjectRequest -> Mono.fromFuture(() -> 
client.putObject(putObjectRequest.build(),
                 AsyncRequestBody.fromPublisher(chunkStream(chunkSize, stream)
                     .subscribeOn(Schedulers.boundedElastic())))));
     }
 
-    private Mono<PutObjectRequest.Builder> 
buildPutObjectRequestBuilder(BucketName bucketName, long contentLength, BlobId 
blobId) {
+    private Mono<PutObjectRequest.Builder> 
buildPutObjectRequestBuilder(BucketName bucketName, long contentLength, BlobId 
blobId, BlobMetadata blobMetadata) {
         PutObjectRequest.Builder baseBuilder = PutObjectRequest.builder()
             .bucket(bucketName.asString())
             .key(blobId.asString())
-            .contentLength(contentLength);
+            .contentLength(contentLength)
+            .metadata(asS3Metadata(blobMetadata));
 
         if (s3RequestOption.ssec().enable()) {
             return 
Mono.from(s3RequestOption.ssec().sseCustomerKeyFactory().get().generate(bucketName,
 blobId))
@@ -334,6 +338,21 @@ public class S3BlobStoreDAO implements BlobStoreDAO {
         return Mono.just(baseBuilder);
     }
 
+    private Map<String, String> asS3Metadata(BlobMetadata metadata) {
+        return metadata.underlyingMap()
+            .entrySet()
+            .stream()
+            .collect(ImmutableMap.toImmutableMap(entry -> 
entry.getKey().name(), entry -> entry.getValue().value()));
+    }
+
+    private BlobMetadata asBlobMetadata(Map<String, String> metadata) {
+        return new BlobMetadata(metadata.entrySet()
+            .stream()
+            .collect(ImmutableMap.toImmutableMap(
+                entry -> new BlobMetadataName(entry.getKey()),
+                entry -> new BlobMetadataValue(entry.getValue()))));
+    }
+
     private Flux<ByteBuffer> chunkStream(int chunkSize, InputStream stream) {
         if (chunkSize == 0) {
             return Flux.empty();
diff --git 
a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java
 
b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java
index 684bb61971..0c271a9e95 100644
--- 
a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java
+++ 
b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java
@@ -34,6 +34,7 @@ import java.util.stream.IntStream;
 import org.apache.james.blob.api.BlobStoreDAO;
 import org.apache.james.blob.api.BlobStoreDAOContract;
 import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.MetadataAwareBlobStoreDAOContract;
 import org.apache.james.blob.api.ObjectNotFoundException;
 import org.apache.james.blob.api.TestBlobId;
 import org.apache.james.metrics.api.NoopGaugeRegistry;
@@ -49,7 +50,7 @@ import reactor.core.publisher.Mono;
 import reactor.util.retry.Retry;
 
 @ExtendWith(DockerAwsS3Extension.class)
-public class S3BlobStoreDAOTest implements BlobStoreDAOContract {
+public class S3BlobStoreDAOTest implements BlobStoreDAOContract, 
MetadataAwareBlobStoreDAOContract {
     private static final BucketName fallbackBucket = BucketName.of("fallback");
 
     private static S3BlobStoreDAO testee;
diff --git 
a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3MinioTest.java
 
b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3MinioTest.java
index dc4a4e3ba3..a58a872408 100644
--- 
a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3MinioTest.java
+++ 
b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3MinioTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStoreDAO;
 import org.apache.james.blob.api.BlobStoreDAOContract;
+import org.apache.james.blob.api.MetadataAwareBlobStoreDAOContract;
 import org.apache.james.blob.api.TestBlobId;
 import org.apache.james.metrics.api.NoopGaugeRegistry;
 import org.apache.james.metrics.tests.RecordingMetricFactory;
@@ -47,7 +48,7 @@ import reactor.core.publisher.Mono;
 import reactor.util.retry.Retry;
 import software.amazon.awssdk.services.s3.model.S3Exception;
 
-public class S3MinioTest implements BlobStoreDAOContract {
+public class S3MinioTest implements BlobStoreDAOContract, 
MetadataAwareBlobStoreDAOContract {
 
     @RegisterExtension
     static S3MinioExtension minoExtension = new S3MinioExtension();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to