This is an automated email from the ASF dual-hosted git repository. hqtran pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 6b0f830c32199bec43f7ebc7290dfd182732fbc1 Author: Quan Tran <[email protected]> AuthorDate: Thu Apr 9 16:04:01 2026 +0700 JAMES-4182 Implement blob metadata storage for S3 --- .../blob/objectstorage/aws/S3BlobStoreDAO.java | 65 ++++++++++++++-------- .../blob/objectstorage/aws/S3BlobStoreDAOTest.java | 3 +- .../james/blob/objectstorage/aws/S3MinioTest.java | 3 +- 3 files changed, 46 insertions(+), 25 deletions(-) diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java index cf942a986c..1ed6a133ab 100644 --- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java +++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import java.time.Duration; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -47,6 +48,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSource; import com.google.common.io.FileBackedOutputStream; @@ -54,7 +56,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.util.retry.RetryBackoffSpec; -import software.amazon.awssdk.core.BytesWrapper; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; @@ -139,11 +140,14 @@ public class S3BlobStoreDAO implements BlobStoreDAO { public InputStreamBlob read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); - return InputStreamBlob.of(ReactorUtils.toInputStream(getObject(resolvedBucketName, blobId) + FluxResponse response = getObject(resolvedBucketName, blobId) .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e)) .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + resolvedBucketName.asString(), e)) - .block() - .flux)); + .block(); + + return InputStreamBlob.of( + ReactorUtils.toInputStream(response.flux), + asBlobMetadata(response.sdkResponse.metadata())); } @Override @@ -154,7 +158,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO { .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e)) .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + resolvedBucketName.asString(), e)) .publishOn(ReactorUtils.BLOCKING_CALL_WRAPPER) - .map(res -> InputStreamBlob.of(ReactorUtils.toInputStream(res.flux))); + .map(res -> InputStreamBlob.of(ReactorUtils.toInputStream(res.flux), asBlobMetadata(res.sdkResponse.metadata()))); } private static class FluxResponse { @@ -215,9 +219,8 @@ public class S3BlobStoreDAO implements BlobStoreDAO { .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e)) .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + resolvedBucketName.asString(), e)) .publishOn(Schedulers.parallel()) - .map(BytesWrapper::asByteArrayUnsafe) - .onErrorMap(e -> e.getCause() instanceof OutOfMemoryError, Throwable::getCause) - .map(BytesBlob::of); + .map(responseBytes -> BytesBlob.of(responseBytes.asByteArrayUnsafe(), asBlobMetadata(responseBytes.response().metadata()))) + .onErrorMap(e -> e.getCause() instanceof OutOfMemoryError, Throwable::getCause); } private Mono<ResponseBytes<GetObjectResponse>> getObjectBytes(BucketName bucketName, BlobId blobId) { @@ -257,16 +260,16 @@ public class S3BlobStoreDAO implements BlobStoreDAO { @Override public Publisher<Void> save(BucketName bucketName, BlobId blobId, Blob blob) { return switch (blob) { - case BytesBlob bytesBlob -> save(bucketName, blobId, bytesBlob.payload()); - case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, inputStreamBlob.payload()); - case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, byteSourceBlob.payload()); + case BytesBlob bytesBlob -> save(bucketName, blobId, bytesBlob.payload(), bytesBlob.metadata()); + case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, inputStreamBlob.payload(), inputStreamBlob.metadata()); + case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, byteSourceBlob.payload(), byteSourceBlob.metadata()); }; } - public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) { + private Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data, BlobMetadata blobMetadata) { BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); - return buildPutObjectRequestBuilder(resolvedBucketName, data.length, blobId) + return buildPutObjectRequestBuilder(resolvedBucketName, data.length, blobId, blobMetadata) .flatMap(putObjectRequest -> Mono.fromFuture(() -> client.putObject(putObjectRequest.build(), AsyncRequestBody.fromBytes(data))) .retryWhen(createBucketOnRetry(resolvedBucketName)) @@ -274,32 +277,32 @@ public class S3BlobStoreDAO implements BlobStoreDAO { .then(); } - public Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream) { + private Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream, BlobMetadata blobMetadata) { Preconditions.checkNotNull(inputStream); - return uploadUsingFile(bucketName, blobId, inputStream); + return uploadUsingFile(bucketName, blobId, inputStream, blobMetadata); } - private Mono<Void> uploadUsingFile(BucketName bucketName, BlobId blobId, InputStream inputStream) { + private Mono<Void> uploadUsingFile(BucketName bucketName, BlobId blobId, InputStream inputStream, BlobMetadata blobMetadata) { return Mono.using( () -> new FileBackedOutputStream(FILE_THRESHOLD), fileBackedOutputStream -> Mono.fromCallable(() -> IOUtils.copy(inputStream, fileBackedOutputStream)) - .flatMap(size -> save(bucketName, blobId, new FileBackedOutputStreamByteSource(fileBackedOutputStream, size))), + .flatMap(size -> save(bucketName, blobId, new FileBackedOutputStreamByteSource(fileBackedOutputStream, size), blobMetadata)), Throwing.consumer(FileBackedOutputStream::reset), LAZY) .onErrorMap(IOException.class, e -> new ObjectStoreIOException("Error saving blob", e)) .publishOn(Schedulers.parallel()); } - public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) { + private Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content, BlobMetadata metadata) { BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); return Mono.fromCallable(content::size) .subscribeOn(Schedulers.boundedElastic()) .flatMap(contentLength -> Mono.usingWhen(Mono.fromCallable(content::openStream).subscribeOn(Schedulers.boundedElastic()), - stream -> save(resolvedBucketName, blobId, stream, contentLength), + stream -> save(resolvedBucketName, blobId, stream, contentLength, metadata), stream -> Mono.fromRunnable(Throwing.runnable(stream::close)))) .retryWhen(createBucketOnRetry(resolvedBucketName)) .onErrorMap(IOException.class, e -> new ObjectStoreIOException("Error saving blob", e)) @@ -308,20 +311,21 @@ public class S3BlobStoreDAO implements BlobStoreDAO { .then(); } - private Mono<PutObjectResponse> save(BucketName resolvedBucketName, BlobId blobId, InputStream stream, long contentLength) { + private Mono<PutObjectResponse> save(BucketName resolvedBucketName, BlobId blobId, InputStream stream, long contentLength, BlobMetadata metadata) { int chunkSize = Math.min((int) contentLength, CHUNK_SIZE); - return buildPutObjectRequestBuilder(resolvedBucketName, contentLength, blobId) + return buildPutObjectRequestBuilder(resolvedBucketName, contentLength, blobId, metadata) .flatMap(putObjectRequest -> Mono.fromFuture(() -> client.putObject(putObjectRequest.build(), AsyncRequestBody.fromPublisher(chunkStream(chunkSize, stream) .subscribeOn(Schedulers.boundedElastic()))))); } - private Mono<PutObjectRequest.Builder> buildPutObjectRequestBuilder(BucketName bucketName, long contentLength, BlobId blobId) { + private Mono<PutObjectRequest.Builder> buildPutObjectRequestBuilder(BucketName bucketName, long contentLength, BlobId blobId, BlobMetadata blobMetadata) { PutObjectRequest.Builder baseBuilder = PutObjectRequest.builder() .bucket(bucketName.asString()) .key(blobId.asString()) - .contentLength(contentLength); + .contentLength(contentLength) + .metadata(asS3Metadata(blobMetadata)); if (s3RequestOption.ssec().enable()) { return Mono.from(s3RequestOption.ssec().sseCustomerKeyFactory().get().generate(bucketName, blobId)) @@ -334,6 +338,21 @@ public class S3BlobStoreDAO implements BlobStoreDAO { return Mono.just(baseBuilder); } + private Map<String, String> asS3Metadata(BlobMetadata metadata) { + return metadata.underlyingMap() + .entrySet() + .stream() + .collect(ImmutableMap.toImmutableMap(entry -> entry.getKey().name(), entry -> entry.getValue().value())); + } + + private BlobMetadata asBlobMetadata(Map<String, String> metadata) { + return new BlobMetadata(metadata.entrySet() + .stream() + .collect(ImmutableMap.toImmutableMap( + entry -> new BlobMetadataName(entry.getKey()), + entry -> new BlobMetadataValue(entry.getValue())))); + } + private Flux<ByteBuffer> chunkStream(int chunkSize, InputStream stream) { if (chunkSize == 0) { return Flux.empty(); diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java index 684bb61971..0c271a9e95 100644 --- a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java @@ -34,6 +34,7 @@ import java.util.stream.IntStream; import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.BlobStoreDAOContract; import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.MetadataAwareBlobStoreDAOContract; import org.apache.james.blob.api.ObjectNotFoundException; import org.apache.james.blob.api.TestBlobId; import org.apache.james.metrics.api.NoopGaugeRegistry; @@ -49,7 +50,7 @@ import reactor.core.publisher.Mono; import reactor.util.retry.Retry; @ExtendWith(DockerAwsS3Extension.class) -public class S3BlobStoreDAOTest implements BlobStoreDAOContract { +public class S3BlobStoreDAOTest implements BlobStoreDAOContract, MetadataAwareBlobStoreDAOContract { private static final BucketName fallbackBucket = BucketName.of("fallback"); private static S3BlobStoreDAO testee; diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3MinioTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3MinioTest.java index dc4a4e3ba3..a58a872408 100644 --- a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3MinioTest.java +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3MinioTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.BlobStoreDAOContract; +import org.apache.james.blob.api.MetadataAwareBlobStoreDAOContract; import org.apache.james.blob.api.TestBlobId; import org.apache.james.metrics.api.NoopGaugeRegistry; import org.apache.james.metrics.tests.RecordingMetricFactory; @@ -47,7 +48,7 @@ import reactor.core.publisher.Mono; import reactor.util.retry.Retry; import software.amazon.awssdk.services.s3.model.S3Exception; -public class S3MinioTest implements BlobStoreDAOContract { +public class S3MinioTest implements BlobStoreDAOContract, MetadataAwareBlobStoreDAOContract { @RegisterExtension static S3MinioExtension minoExtension = new S3MinioExtension(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
