This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 63b7ab0032 JAMES-4131 Configure a fallback bucket for S3 (#2719) 63b7ab0032 is described below commit 63b7ab00325ab70709a14aaea9651db92e6ac4ec Author: Rene Cordier <rcord...@linagora.com> AuthorDate: Thu May 15 19:33:22 2025 +0700 JAMES-4131 Configure a fallback bucket for S3 (#2719) --- .../servers/partials/configure/blobstore.adoc | 4 ++ .../helm-chart/james/configs/blob.properties | 4 ++ .../sample-configuration/blob.properties | 4 ++ .../sample-configuration/blob.properties | 4 ++ .../blob.properties | 4 ++ .../sample-configuration/blob.properties | 4 ++ .../org/apache/james/blob/api/BlobStoreDAO.java | 7 -- .../blob/objectstorage/aws/BucketNameResolver.java | 2 +- .../aws/S3BlobStoreConfiguration.java | 24 +++++-- .../blob/objectstorage/aws/S3BlobStoreDAO.java | 47 +++++++++---- .../blob/objectstorage/aws/S3BlobStoreDAOTest.java | 77 ++++++++++++++++++++++ .../S3BlobStoreConfigurationReader.java | 6 +- 12 files changed, 160 insertions(+), 27 deletions(-) diff --git a/docs/modules/servers/partials/configure/blobstore.adoc b/docs/modules/servers/partials/configure/blobstore.adoc index 11f27d47d4..f169bd0fa9 100644 --- a/docs/modules/servers/partials/configure/blobstore.adoc +++ b/docs/modules/servers/partials/configure/blobstore.adoc @@ -124,6 +124,10 @@ BucketPrefix is the prefix of bucket names in James BlobStore | objectstorage.namespace | BlobStore default bucket name. Most of blobs storing in BlobStore are inside the default bucket. Unless a special case like storing blobs of deleted messages. + +| objectstorage.namespace.read.fallback +| BlobStore fallback bucket name. Allows to fallback to a previous used bucket when blob is missing from the default one. +It can be useful when migrating blobs to a new bucket for example. |=== ==== SSE-C Configuration diff --git a/server/apps/distributed-app/helm-chart/james/configs/blob.properties b/server/apps/distributed-app/helm-chart/james/configs/blob.properties index 31fe02a9e5..25cc5793e1 100644 --- a/server/apps/distributed-app/helm-chart/james/configs/blob.properties +++ b/server/apps/distributed-app/helm-chart/james/configs/blob.properties @@ -41,6 +41,10 @@ cache.sizeThresholdInBytes=16 KiB # Optional, default is bucketPrefix + `default` objectstorage.namespace=james-${env:JAMES_BUCKET_SUFFIX} +# Fallback bucket name +# Optional, read this bucket when default bukcket reads fails if configured +# objectstorage.namespace.read.fallback=james-fallback + # ========================================= ObjectStorage on S3 ============================================= # Mandatory if you choose aws-s3 storage service, S3 authentication endpoint objectstorage.s3.endPoint=${env:OS_S3_ENDPOINT} diff --git a/server/apps/distributed-app/sample-configuration/blob.properties b/server/apps/distributed-app/sample-configuration/blob.properties index d5777db794..7d16de865b 100644 --- a/server/apps/distributed-app/sample-configuration/blob.properties +++ b/server/apps/distributed-app/sample-configuration/blob.properties @@ -69,6 +69,10 @@ cache.enable=false # Optional, default is bucketPrefix + `default` # objectstorage.namespace=james +# Fallback bucket name +# Optional, read this bucket when default bukcket reads fails if configured +# objectstorage.namespace.read.fallback=james-fallback + # ========================================= ObjectStorage on S3 ============================================= # Mandatory if you choose s3 storage service, S3 authentication endpoint objectstorage.s3.endPoint=http://s3.docker.test:8000/ diff --git a/server/apps/distributed-pop3-app/sample-configuration/blob.properties b/server/apps/distributed-pop3-app/sample-configuration/blob.properties index e452d01b24..311103c4f4 100644 --- a/server/apps/distributed-pop3-app/sample-configuration/blob.properties +++ b/server/apps/distributed-pop3-app/sample-configuration/blob.properties @@ -57,6 +57,10 @@ cache.enable=false # Optional, default is bucketPrefix + `default` # objectstorage.namespace=james +# Fallback bucket name +# Optional, read this bucket when default bukcket reads fails if configured +# objectstorage.namespace.read.fallback=james-fallback + # ========================================= ObjectStorage on S3 ============================================= # Mandatory if you choose s3 storage service, S3 authentication endpoint objectstorage.s3.endPoint=http://s3.docker.test:8000/ diff --git a/server/apps/postgres-app/sample-configuration-distributed/blob.properties b/server/apps/postgres-app/sample-configuration-distributed/blob.properties index 0e76163705..3f210e0acb 100644 --- a/server/apps/postgres-app/sample-configuration-distributed/blob.properties +++ b/server/apps/postgres-app/sample-configuration-distributed/blob.properties @@ -49,6 +49,10 @@ encryption.aes.enable=false # Optional, default is bucketPrefix + `default` # objectstorage.namespace=james +# Fallback bucket name +# Optional, read this bucket when default bukcket reads fails if configured +# objectstorage.namespace.read.fallback=james-fallback + # ========================================= ObjectStorage on S3 ============================================= # Mandatory if you choose s3 storage service, S3 authentication endpoint objectstorage.s3.endPoint=http://s3.docker.test:8000/ diff --git a/server/apps/scaling-pulsar-smtp/sample-configuration/blob.properties b/server/apps/scaling-pulsar-smtp/sample-configuration/blob.properties index cb00e47fca..15549d3285 100644 --- a/server/apps/scaling-pulsar-smtp/sample-configuration/blob.properties +++ b/server/apps/scaling-pulsar-smtp/sample-configuration/blob.properties @@ -67,6 +67,10 @@ cache.enable=false # Optional, default is bucketPrefix + `default` # objectstorage.namespace=james +# Fallback bucket name +# Optional, read this bucket when default bukcket reads fails if configured +# objectstorage.namespace.read.fallback=james-fallback + # ========================================= ObjectStorage on S3 ============================================= # Mandatory if you choose s3 storage service, S3 authentication endpoint #objectstorage.s3.endPoint=http://s3.docker.test:8000/ diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java index 98312dd0c2..1e11679953 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java @@ -28,8 +28,6 @@ import org.reactivestreams.Publisher; import com.google.common.io.ByteSource; -import reactor.core.publisher.Mono; - public interface BlobStoreDAO { class ReactiveByteSource { private final long size; @@ -58,11 +56,6 @@ public interface BlobStoreDAO { */ InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException; - default Publisher<ReactiveByteSource> readAsByteSource(BucketName bucketName, BlobId blobId) { - return Mono.from(readBytes(bucketName, blobId)) - .map(bytes -> new ReactiveByteSource(bytes.length, Mono.just(ByteBuffer.wrap(bytes)))); - } - Publisher<InputStream> readReactive(BucketName bucketName, BlobId blobId); /** diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/BucketNameResolver.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/BucketNameResolver.java index 7b13ad3a9d..7fee8a41d8 100644 --- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/BucketNameResolver.java +++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/BucketNameResolver.java @@ -108,7 +108,7 @@ public class BucketNameResolver { }).orElse(Optional.of(bucketName)); } - private boolean isNameSpace(BucketName bucketName) { + public boolean isNameSpace(BucketName bucketName) { return namespace .map(existingNamespace -> existingNamespace.equals(bucketName)) .orElse(false); diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreConfiguration.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreConfiguration.java index 407592bfe3..7caf486be7 100644 --- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreConfiguration.java +++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreConfiguration.java @@ -77,6 +77,7 @@ public class S3BlobStoreConfiguration { private Optional<Retry> uploadRetrySpec; private boolean ssecEnabled; private Optional<S3SSECConfiguration> ssecConfiguration = Optional.empty(); + private Optional<BucketName> fallbackBucketName; public ReadyToBuild(AwsS3AuthConfiguration specificAuthConfiguration, Region region) { this.specificAuthConfiguration = specificAuthConfiguration; @@ -89,6 +90,7 @@ public class S3BlobStoreConfiguration { this.connectionTimeout = Optional.empty(); this.inMemoryReadLimit = Optional.empty(); this.uploadRetrySpec = Optional.empty(); + this.fallbackBucketName = Optional.empty(); } public ReadyToBuild defaultBucketName(Optional<BucketName> defaultBucketName) { @@ -155,11 +157,16 @@ public class S3BlobStoreConfiguration { return this; } + public ReadyToBuild fallbackBucketName(Optional<BucketName> fallbackBucketName) { + this.fallbackBucketName = fallbackBucketName; + return this; + } + public S3BlobStoreConfiguration build() { return new S3BlobStoreConfiguration(bucketPrefix, defaultBucketName, region, specificAuthConfiguration, httpConcurrency.orElse(DEFAULT_HTTP_CONCURRENCY), inMemoryReadLimit, readTimeout, writeTimeout, connectionTimeout, uploadRetrySpec.orElse(DEFAULT_UPLOAD_RETRY_SPEC), - ssecEnabled, ssecConfiguration); + ssecEnabled, ssecConfiguration, fallbackBucketName); } } @@ -179,6 +186,7 @@ public class S3BlobStoreConfiguration { private final Retry uploadRetrySpec; private final boolean ssecEnabled; private final Optional<S3SSECConfiguration> ssecConfiguration; + private final Optional<BucketName> fallbackNamespace; private final Optional<Duration> readTimeout; private final Optional<Duration> writeTimeout; @@ -196,7 +204,8 @@ public class S3BlobStoreConfiguration { Optional<Duration> connectionTimeout, Retry uploadRetrySpec, boolean ssecEnabled, - Optional<S3SSECConfiguration> ssecConfiguration) { + Optional<S3SSECConfiguration> ssecConfiguration, + Optional<BucketName> fallbackNamespace) { this.bucketPrefix = bucketPrefix; this.namespace = namespace; this.region = region; @@ -209,6 +218,7 @@ public class S3BlobStoreConfiguration { this.uploadRetrySpec = uploadRetrySpec; this.ssecEnabled = ssecEnabled; this.ssecConfiguration = ssecConfiguration; + this.fallbackNamespace = fallbackNamespace; } public Optional<Long> getInMemoryReadLimit() { @@ -259,6 +269,10 @@ public class S3BlobStoreConfiguration { return ssecConfiguration; } + public Optional<BucketName> getFallbackNamespace() { + return fallbackNamespace; + } + @Override public final boolean equals(Object o) { if (o instanceof S3BlobStoreConfiguration that) { @@ -273,7 +287,8 @@ public class S3BlobStoreConfiguration { && Objects.equals(this.uploadRetrySpec, that.uploadRetrySpec) && Objects.equals(this.specificAuthConfiguration, that.specificAuthConfiguration) && Objects.equals(this.ssecEnabled, that.ssecEnabled) - && Objects.equals(this.ssecConfiguration, that.ssecConfiguration); + && Objects.equals(this.ssecConfiguration, that.ssecConfiguration) + && Objects.equals(this.fallbackNamespace, that.fallbackNamespace); } return false; } @@ -282,7 +297,7 @@ public class S3BlobStoreConfiguration { public final int hashCode() { return Objects.hash(namespace, bucketPrefix, httpConcurrency, specificAuthConfiguration, readTimeout, writeTimeout, connectionTimeout, uploadRetrySpec, ssecConfiguration, region, - inMemoryReadLimit, ssecEnabled); + inMemoryReadLimit, ssecEnabled, fallbackNamespace); } @Override @@ -300,6 +315,7 @@ public class S3BlobStoreConfiguration { .add("uploadRetrySpec", uploadRetrySpec) .add("ssecEnabled", ssecEnabled) .add("ssecConfiguration", ssecConfiguration) + .add("fallbackNamespace", fallbackNamespace) .toString(); } } diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java index b11ef881d5..737393b0c9 100644 --- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java +++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java @@ -55,6 +55,7 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.util.retry.RetryBackoffSpec; import software.amazon.awssdk.core.BytesWrapper; +import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.async.SdkPublisher; @@ -115,6 +116,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO { private final S3BlobStoreConfiguration configuration; private final BlobId.Factory blobIdFactory; private final S3RequestOption s3RequestOption; + private final java.util.Optional<BucketName> fallbackNamespace; @Inject public S3BlobStoreDAO(S3ClientFactory s3ClientFactory, @@ -125,6 +127,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO { this.client = s3ClientFactory.get(); this.blobIdFactory = blobIdFactory; this.s3RequestOption = s3RequestOption; + this.fallbackNamespace = configuration.getFallbackNamespace(); bucketNameResolver = BucketNameResolver.builder() .prefix(configuration.getBucketPrefix()) @@ -154,16 +157,6 @@ public class S3BlobStoreDAO implements BlobStoreDAO { .map(res -> ReactorUtils.toInputStream(res.flux)); } - @Override - public Publisher<ReactiveByteSource> readAsByteSource(BucketName bucketName, BlobId blobId) { - BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); - - return getObject(resolvedBucketName, blobId) - .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e)) - .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + resolvedBucketName.asString(), e)) - .map(res -> new ReactiveByteSource(res.sdkResponse.contentLength(), res.flux)); - } - private static class FluxResponse { final CompletableFuture<FluxResponse> supportingCompletableFuture = new CompletableFuture<>(); GetObjectResponse sdkResponse; @@ -171,6 +164,17 @@ public class S3BlobStoreDAO implements BlobStoreDAO { } private Mono<FluxResponse> getObject(BucketName bucketName, BlobId blobId) { + return getObjectFromStore(bucketName, blobId) + .onErrorResume(e -> e instanceof NoSuchKeyException || e instanceof NoSuchBucketException, e -> { + if (fallbackNamespace.isPresent() && bucketNameResolver.isNameSpace(bucketName)) { + BucketName resolvedFallbackBucketName = bucketNameResolver.resolve(fallbackNamespace.get()); + return getObjectFromStore(resolvedFallbackBucketName, blobId); + } + return Mono.error(e); + }); + } + + private Mono<FluxResponse> getObjectFromStore(BucketName bucketName, BlobId blobId) { return buildGetObjectRequestBuilder(bucketName, blobId) .flatMap(getObjectRequestBuilder -> Mono.fromFuture(() -> client.getObject(getObjectRequestBuilder.build(), @@ -208,14 +212,29 @@ public class S3BlobStoreDAO implements BlobStoreDAO { public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) { BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName); - return buildGetObjectRequestBuilder(resolvedBucketName, blobId) - .flatMap(putObjectRequest -> Mono.fromFuture(() -> - client.getObject(putObjectRequest.build(), new MinimalCopyBytesResponseTransformer(configuration, blobId))) + return getObjectBytes(resolvedBucketName, blobId) .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e)) .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + resolvedBucketName.asString(), e)) .publishOn(Schedulers.parallel()) .map(BytesWrapper::asByteArrayUnsafe) - .onErrorMap(e -> e.getCause() instanceof OutOfMemoryError, Throwable::getCause)); + .onErrorMap(e -> e.getCause() instanceof OutOfMemoryError, Throwable::getCause); + } + + private Mono<ResponseBytes<GetObjectResponse>> getObjectBytes(BucketName bucketName, BlobId blobId) { + return getObjectBytesFromStore(bucketName, blobId) + .onErrorResume(e -> e instanceof NoSuchKeyException || e instanceof NoSuchBucketException, e -> { + if (fallbackNamespace.isPresent() && bucketNameResolver.isNameSpace(bucketName)) { + BucketName resolvedFallbackBucketName = bucketNameResolver.resolve(fallbackNamespace.get()); + return getObjectBytesFromStore(resolvedFallbackBucketName, blobId); + } + return Mono.error(e); + }); + } + + private Mono<ResponseBytes<GetObjectResponse>> getObjectBytesFromStore(BucketName bucketName, BlobId blobId) { + return buildGetObjectRequestBuilder(bucketName, blobId) + .flatMap(putObjectRequest -> Mono.fromFuture(() -> + client.getObject(putObjectRequest.build(), new MinimalCopyBytesResponseTransformer(configuration, blobId)))); } private Mono<GetObjectRequest.Builder> buildGetObjectRequestBuilder(BucketName bucketName, BlobId blobId) { diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java index ad84633426..6ef690eff8 100644 --- a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java @@ -24,7 +24,9 @@ import static org.apache.james.blob.objectstorage.aws.JamesS3MetricPublisher.DEF import static org.apache.james.blob.objectstorage.aws.S3BlobStoreConfiguration.UPLOAD_RETRY_EXCEPTION_PREDICATE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Optional; @@ -32,6 +34,8 @@ import java.util.stream.IntStream; import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.BlobStoreDAOContract; +import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.ObjectNotFoundException; import org.apache.james.blob.api.TestBlobId; import org.apache.james.metrics.api.NoopGaugeRegistry; import org.apache.james.metrics.tests.RecordingMetricFactory; @@ -49,6 +53,8 @@ import reactor.util.retry.Retry; @ExtendWith(DockerAwsS3Extension.class) public class S3BlobStoreDAOTest implements BlobStoreDAOContract { + private static final BucketName fallbackBucket = BucketName.of("fallback"); + private static S3BlobStoreDAO testee; private static S3ClientFactory s3ClientFactory; @@ -65,6 +71,8 @@ public class S3BlobStoreDAOTest implements BlobStoreDAOContract { .region(dockerAwsS3.dockerAwsS3().region()) .uploadRetrySpec(Optional.of(Retry.backoff(3, java.time.Duration.ofSeconds(1)) .filter(UPLOAD_RETRY_EXCEPTION_PREDICATE))) + .defaultBucketName(BucketName.DEFAULT) + .fallbackBucketName(Optional.of(fallbackBucket)) .build(); s3ClientFactory = new S3ClientFactory(s3Configuration, () -> new JamesS3MetricPublisher(new RecordingMetricFactory(), new NoopGaugeRegistry(), @@ -120,4 +128,73 @@ public class S3BlobStoreDAOTest implements BlobStoreDAOContract { } })).doesNotThrowAnyException(); } + + @Test + void readShouldFallbackToDefinedBucketWhenFailingOnDefaultOne() { + BlobStoreDAO store = testee(); + + TestBlobId blobId = new TestBlobId("id"); + Mono.from(store.save(fallbackBucket, blobId, ByteSource.wrap(ELEVEN_KILOBYTES))).block(); + + InputStream read = store.read(BucketName.DEFAULT, blobId); + + assertThat(read).hasSameContentAs(new ByteArrayInputStream(ELEVEN_KILOBYTES)); + } + + @Test + void readReactiveShouldFallbackToDefinedBucketWhenFailingOnDefaultOne() { + BlobStoreDAO store = testee(); + + TestBlobId blobId = new TestBlobId("id"); + Mono.from(store.save(fallbackBucket, blobId, ByteSource.wrap(ELEVEN_KILOBYTES))).block(); + + InputStream read = Mono.from(store.readReactive(BucketName.DEFAULT, blobId)).block(); + + assertThat(read).hasSameContentAs(new ByteArrayInputStream(ELEVEN_KILOBYTES)); + } + + @Test + void readBytesShouldFallbackToDefinedBucketWhenFailingOnDefaultOne() { + BlobStoreDAO store = testee(); + + TestBlobId blobId = new TestBlobId("id"); + Mono.from(store.save(fallbackBucket, blobId, ByteSource.wrap(ELEVEN_KILOBYTES))).block(); + + byte[] bytes = Mono.from(store.readBytes(BucketName.DEFAULT, blobId)).block(); + + assertThat(bytes).isEqualTo(ELEVEN_KILOBYTES); + } + + @Test + void shouldNotReadOnFallbackBucketWhenNotReadingOnDefaultOne() { + BlobStoreDAO store = testee(); + + TestBlobId blobId = new TestBlobId("id"); + Mono.from(store.save(TEST_BUCKET_NAME, blobId, ByteSource.wrap(ELEVEN_KILOBYTES))).block(); + + assertThatThrownBy(() -> store.read(BucketName.DEFAULT, blobId)) + .isExactlyInstanceOf(ObjectNotFoundException.class); + } + + @Test + void shouldNotReadReactiveOnFallbackBucketWhenNotReadingOnDefaultOne() { + BlobStoreDAO store = testee(); + + TestBlobId blobId = new TestBlobId("id"); + Mono.from(store.save(TEST_BUCKET_NAME, blobId, ByteSource.wrap(ELEVEN_KILOBYTES))).block(); + + assertThatThrownBy(() -> Mono.from(store.readReactive(BucketName.DEFAULT, blobId)).block()) + .isExactlyInstanceOf(ObjectNotFoundException.class); + } + + @Test + void shouldNotReadBytesOnFallbackBucketWhenNotReadingOnDefaultOne() { + BlobStoreDAO store = testee(); + + TestBlobId blobId = new TestBlobId("id"); + Mono.from(store.save(TEST_BUCKET_NAME, blobId, ByteSource.wrap(ELEVEN_KILOBYTES))).block(); + + assertThatThrownBy(() -> Mono.from(store.readBytes(BucketName.DEFAULT, blobId)).block()) + .isExactlyInstanceOf(ObjectNotFoundException.class); + } } diff --git a/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreConfigurationReader.java b/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreConfigurationReader.java index a760ad0d28..775f35de1e 100644 --- a/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreConfigurationReader.java +++ b/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreConfigurationReader.java @@ -51,6 +51,7 @@ public class S3BlobStoreConfigurationReader { private static final String OBJECTSTORAGE_S3_UPLOAD_RETRY_MAX_ATTEMPTS = "objectstorage.s3.upload.retry.maxAttempts"; private static final String OBJECTSTORAGE_S3_UPLOAD_RETRY_BACKOFF_DURATION_MILLIS = "objectstorage.s3.upload.retry.backoffDurationMillis"; private static final String OBJECTSTORAGE_S3_ENCRYPTION_SSEC_ENABLE_PROPERTY = "encryption.s3.sse.c.enable"; + private static final String OBJECTSTORAGE_NAMESPACE_READ_FALLBACK = "objectstorage.namespace.read.fallback"; public static S3BlobStoreConfiguration from(Configuration configuration) throws ConfigurationException { Optional<Integer> httpConcurrency = Optional.ofNullable(configuration.getInteger(OBJECTSTORAGE_S3_HTTP_CONCURRENCY, null)); @@ -78,6 +79,8 @@ public class S3BlobStoreConfigurationReader { boolean ssecEnabled = configuration.getBoolean(OBJECTSTORAGE_S3_ENCRYPTION_SSEC_ENABLE_PROPERTY, false); + Optional<String> fallbackNamespace = Optional.ofNullable(configuration.getString(OBJECTSTORAGE_NAMESPACE_READ_FALLBACK, null)); + S3BlobStoreConfiguration.Builder.ReadyToBuild configBuilder = S3BlobStoreConfiguration.builder() .authConfiguration(AwsS3ConfigurationReader.from(configuration)) .region(region) @@ -88,7 +91,8 @@ public class S3BlobStoreConfigurationReader { .readTimeout(readTimeout) .writeTimeout(writeTimeout) .connectionTimeout(connectionTimeout) - .uploadRetrySpec(uploadRetrySpec); + .uploadRetrySpec(uploadRetrySpec) + .fallbackBucketName(fallbackNamespace.map(BucketName::of)); if (ssecEnabled) { configBuilder.ssecEnabled().ssecConfiguration(configuration); --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org