This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 63b7ab0032 JAMES-4131 Configure a fallback bucket for S3 (#2719)
63b7ab0032 is described below

commit 63b7ab00325ab70709a14aaea9651db92e6ac4ec
Author: Rene Cordier <rcord...@linagora.com>
AuthorDate: Thu May 15 19:33:22 2025 +0700

    JAMES-4131 Configure a fallback bucket for S3 (#2719)
---
 .../servers/partials/configure/blobstore.adoc      |  4 ++
 .../helm-chart/james/configs/blob.properties       |  4 ++
 .../sample-configuration/blob.properties           |  4 ++
 .../sample-configuration/blob.properties           |  4 ++
 .../blob.properties                                |  4 ++
 .../sample-configuration/blob.properties           |  4 ++
 .../org/apache/james/blob/api/BlobStoreDAO.java    |  7 --
 .../blob/objectstorage/aws/BucketNameResolver.java |  2 +-
 .../aws/S3BlobStoreConfiguration.java              | 24 +++++--
 .../blob/objectstorage/aws/S3BlobStoreDAO.java     | 47 +++++++++----
 .../blob/objectstorage/aws/S3BlobStoreDAOTest.java | 77 ++++++++++++++++++++++
 .../S3BlobStoreConfigurationReader.java            |  6 +-
 12 files changed, 160 insertions(+), 27 deletions(-)

diff --git a/docs/modules/servers/partials/configure/blobstore.adoc 
b/docs/modules/servers/partials/configure/blobstore.adoc
index 11f27d47d4..f169bd0fa9 100644
--- a/docs/modules/servers/partials/configure/blobstore.adoc
+++ b/docs/modules/servers/partials/configure/blobstore.adoc
@@ -124,6 +124,10 @@ BucketPrefix is the prefix of bucket names in James 
BlobStore
 | objectstorage.namespace
 | BlobStore default bucket name. Most of blobs storing in BlobStore are inside 
the default bucket.
 Unless a special case like storing blobs of deleted messages.
+
+| objectstorage.namespace.read.fallback
+| BlobStore fallback bucket name. Allows to fallback to a previous used bucket 
when blob is missing from the default one.
+It can be useful when migrating blobs to a new bucket for example.
 |===
 
 ==== SSE-C Configuration
diff --git 
a/server/apps/distributed-app/helm-chart/james/configs/blob.properties 
b/server/apps/distributed-app/helm-chart/james/configs/blob.properties
index 31fe02a9e5..25cc5793e1 100644
--- a/server/apps/distributed-app/helm-chart/james/configs/blob.properties
+++ b/server/apps/distributed-app/helm-chart/james/configs/blob.properties
@@ -41,6 +41,10 @@ cache.sizeThresholdInBytes=16 KiB
 # Optional, default is bucketPrefix + `default`
 objectstorage.namespace=james-${env:JAMES_BUCKET_SUFFIX}
 
+# Fallback bucket name
+# Optional, read this bucket when default bukcket reads fails if configured
+# objectstorage.namespace.read.fallback=james-fallback
+
 # ========================================= ObjectStorage on S3 
=============================================
 # Mandatory if you choose aws-s3 storage service, S3 authentication endpoint
 objectstorage.s3.endPoint=${env:OS_S3_ENDPOINT}
diff --git a/server/apps/distributed-app/sample-configuration/blob.properties 
b/server/apps/distributed-app/sample-configuration/blob.properties
index d5777db794..7d16de865b 100644
--- a/server/apps/distributed-app/sample-configuration/blob.properties
+++ b/server/apps/distributed-app/sample-configuration/blob.properties
@@ -69,6 +69,10 @@ cache.enable=false
 # Optional, default is bucketPrefix + `default`
 # objectstorage.namespace=james
 
+# Fallback bucket name
+# Optional, read this bucket when default bukcket reads fails if configured
+# objectstorage.namespace.read.fallback=james-fallback
+
 # ========================================= ObjectStorage on S3 
=============================================
 # Mandatory if you choose s3 storage service, S3 authentication endpoint
 objectstorage.s3.endPoint=http://s3.docker.test:8000/
diff --git 
a/server/apps/distributed-pop3-app/sample-configuration/blob.properties 
b/server/apps/distributed-pop3-app/sample-configuration/blob.properties
index e452d01b24..311103c4f4 100644
--- a/server/apps/distributed-pop3-app/sample-configuration/blob.properties
+++ b/server/apps/distributed-pop3-app/sample-configuration/blob.properties
@@ -57,6 +57,10 @@ cache.enable=false
 # Optional, default is bucketPrefix + `default`
 # objectstorage.namespace=james
 
+# Fallback bucket name
+# Optional, read this bucket when default bukcket reads fails if configured
+# objectstorage.namespace.read.fallback=james-fallback
+
 # ========================================= ObjectStorage on S3 
=============================================
 # Mandatory if you choose s3 storage service, S3 authentication endpoint
 objectstorage.s3.endPoint=http://s3.docker.test:8000/
diff --git 
a/server/apps/postgres-app/sample-configuration-distributed/blob.properties 
b/server/apps/postgres-app/sample-configuration-distributed/blob.properties
index 0e76163705..3f210e0acb 100644
--- a/server/apps/postgres-app/sample-configuration-distributed/blob.properties
+++ b/server/apps/postgres-app/sample-configuration-distributed/blob.properties
@@ -49,6 +49,10 @@ encryption.aes.enable=false
 # Optional, default is bucketPrefix + `default`
 # objectstorage.namespace=james
 
+# Fallback bucket name
+# Optional, read this bucket when default bukcket reads fails if configured
+# objectstorage.namespace.read.fallback=james-fallback
+
 # ========================================= ObjectStorage on S3 
=============================================
 # Mandatory if you choose s3 storage service, S3 authentication endpoint
 objectstorage.s3.endPoint=http://s3.docker.test:8000/
diff --git 
a/server/apps/scaling-pulsar-smtp/sample-configuration/blob.properties 
b/server/apps/scaling-pulsar-smtp/sample-configuration/blob.properties
index cb00e47fca..15549d3285 100644
--- a/server/apps/scaling-pulsar-smtp/sample-configuration/blob.properties
+++ b/server/apps/scaling-pulsar-smtp/sample-configuration/blob.properties
@@ -67,6 +67,10 @@ cache.enable=false
 # Optional, default is bucketPrefix + `default`
 # objectstorage.namespace=james
 
+# Fallback bucket name
+# Optional, read this bucket when default bukcket reads fails if configured
+# objectstorage.namespace.read.fallback=james-fallback
+
 # ========================================= ObjectStorage on S3 
=============================================
 # Mandatory if you choose s3 storage service, S3 authentication endpoint
 #objectstorage.s3.endPoint=http://s3.docker.test:8000/
diff --git 
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java
 
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java
index 98312dd0c2..1e11679953 100644
--- 
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java
+++ 
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java
@@ -28,8 +28,6 @@ import org.reactivestreams.Publisher;
 
 import com.google.common.io.ByteSource;
 
-import reactor.core.publisher.Mono;
-
 public interface BlobStoreDAO {
     class ReactiveByteSource {
         private final long size;
@@ -58,11 +56,6 @@ public interface BlobStoreDAO {
      */
     InputStream read(BucketName bucketName, BlobId blobId) throws 
ObjectStoreIOException, ObjectNotFoundException;
 
-    default Publisher<ReactiveByteSource> readAsByteSource(BucketName 
bucketName, BlobId blobId) {
-        return Mono.from(readBytes(bucketName, blobId))
-            .map(bytes -> new ReactiveByteSource(bytes.length, 
Mono.just(ByteBuffer.wrap(bytes))));
-    }
-
     Publisher<InputStream> readReactive(BucketName bucketName, BlobId blobId);
 
     /**
diff --git 
a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/BucketNameResolver.java
 
b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/BucketNameResolver.java
index 7b13ad3a9d..7fee8a41d8 100644
--- 
a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/BucketNameResolver.java
+++ 
b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/BucketNameResolver.java
@@ -108,7 +108,7 @@ public class BucketNameResolver {
         }).orElse(Optional.of(bucketName));
     }
 
-    private boolean isNameSpace(BucketName bucketName) {
+    public boolean isNameSpace(BucketName bucketName) {
         return namespace
             .map(existingNamespace -> existingNamespace.equals(bucketName))
             .orElse(false);
diff --git 
a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreConfiguration.java
 
b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreConfiguration.java
index 407592bfe3..7caf486be7 100644
--- 
a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreConfiguration.java
+++ 
b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreConfiguration.java
@@ -77,6 +77,7 @@ public class S3BlobStoreConfiguration {
             private Optional<Retry> uploadRetrySpec;
             private boolean ssecEnabled;
             private Optional<S3SSECConfiguration> ssecConfiguration = 
Optional.empty();
+            private Optional<BucketName> fallbackBucketName;
 
             public ReadyToBuild(AwsS3AuthConfiguration 
specificAuthConfiguration, Region region) {
                 this.specificAuthConfiguration = specificAuthConfiguration;
@@ -89,6 +90,7 @@ public class S3BlobStoreConfiguration {
                 this.connectionTimeout = Optional.empty();
                 this.inMemoryReadLimit = Optional.empty();
                 this.uploadRetrySpec = Optional.empty();
+                this.fallbackBucketName = Optional.empty();
             }
 
             public ReadyToBuild defaultBucketName(Optional<BucketName> 
defaultBucketName) {
@@ -155,11 +157,16 @@ public class S3BlobStoreConfiguration {
                 return this;
             }
 
+            public ReadyToBuild fallbackBucketName(Optional<BucketName> 
fallbackBucketName) {
+                this.fallbackBucketName = fallbackBucketName;
+                return this;
+            }
+
             public S3BlobStoreConfiguration build() {
                 return new S3BlobStoreConfiguration(bucketPrefix, 
defaultBucketName, region,
                     specificAuthConfiguration, 
httpConcurrency.orElse(DEFAULT_HTTP_CONCURRENCY),
                     inMemoryReadLimit, readTimeout, writeTimeout, 
connectionTimeout, uploadRetrySpec.orElse(DEFAULT_UPLOAD_RETRY_SPEC),
-                    ssecEnabled, ssecConfiguration);
+                    ssecEnabled, ssecConfiguration, fallbackBucketName);
             }
         }
 
@@ -179,6 +186,7 @@ public class S3BlobStoreConfiguration {
     private final Retry uploadRetrySpec;
     private final boolean ssecEnabled;
     private final Optional<S3SSECConfiguration> ssecConfiguration;
+    private final Optional<BucketName> fallbackNamespace;
 
     private final Optional<Duration> readTimeout;
     private final Optional<Duration> writeTimeout;
@@ -196,7 +204,8 @@ public class S3BlobStoreConfiguration {
                              Optional<Duration> connectionTimeout,
                              Retry uploadRetrySpec,
                              boolean ssecEnabled,
-                             Optional<S3SSECConfiguration> ssecConfiguration) {
+                             Optional<S3SSECConfiguration> ssecConfiguration,
+                             Optional<BucketName> fallbackNamespace) {
         this.bucketPrefix = bucketPrefix;
         this.namespace = namespace;
         this.region = region;
@@ -209,6 +218,7 @@ public class S3BlobStoreConfiguration {
         this.uploadRetrySpec = uploadRetrySpec;
         this.ssecEnabled = ssecEnabled;
         this.ssecConfiguration = ssecConfiguration;
+        this.fallbackNamespace = fallbackNamespace;
     }
 
     public Optional<Long> getInMemoryReadLimit() {
@@ -259,6 +269,10 @@ public class S3BlobStoreConfiguration {
         return ssecConfiguration;
     }
 
+    public Optional<BucketName> getFallbackNamespace() {
+        return fallbackNamespace;
+    }
+
     @Override
     public final boolean equals(Object o) {
         if (o instanceof S3BlobStoreConfiguration that) {
@@ -273,7 +287,8 @@ public class S3BlobStoreConfiguration {
                 && Objects.equals(this.uploadRetrySpec, that.uploadRetrySpec)
                 && Objects.equals(this.specificAuthConfiguration, 
that.specificAuthConfiguration)
                 && Objects.equals(this.ssecEnabled, that.ssecEnabled)
-                && Objects.equals(this.ssecConfiguration, 
that.ssecConfiguration);
+                && Objects.equals(this.ssecConfiguration, 
that.ssecConfiguration)
+                && Objects.equals(this.fallbackNamespace, 
that.fallbackNamespace);
         }
         return false;
     }
@@ -282,7 +297,7 @@ public class S3BlobStoreConfiguration {
     public final int hashCode() {
         return Objects.hash(namespace, bucketPrefix, httpConcurrency, 
specificAuthConfiguration,
             readTimeout, writeTimeout, connectionTimeout, uploadRetrySpec, 
ssecConfiguration, region,
-            inMemoryReadLimit, ssecEnabled);
+            inMemoryReadLimit, ssecEnabled, fallbackNamespace);
     }
 
     @Override
@@ -300,6 +315,7 @@ public class S3BlobStoreConfiguration {
             .add("uploadRetrySpec", uploadRetrySpec)
             .add("ssecEnabled", ssecEnabled)
             .add("ssecConfiguration", ssecConfiguration)
+            .add("fallbackNamespace", fallbackNamespace)
             .toString();
     }
 }
diff --git 
a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
 
b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index b11ef881d5..737393b0c9 100644
--- 
a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ 
b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -55,6 +55,7 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import reactor.util.retry.RetryBackoffSpec;
 import software.amazon.awssdk.core.BytesWrapper;
+import software.amazon.awssdk.core.ResponseBytes;
 import software.amazon.awssdk.core.async.AsyncRequestBody;
 import software.amazon.awssdk.core.async.AsyncResponseTransformer;
 import software.amazon.awssdk.core.async.SdkPublisher;
@@ -115,6 +116,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO {
     private final S3BlobStoreConfiguration configuration;
     private final BlobId.Factory blobIdFactory;
     private final S3RequestOption s3RequestOption;
+    private final java.util.Optional<BucketName> fallbackNamespace;
 
     @Inject
     public S3BlobStoreDAO(S3ClientFactory s3ClientFactory,
@@ -125,6 +127,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO {
         this.client = s3ClientFactory.get();
         this.blobIdFactory = blobIdFactory;
         this.s3RequestOption = s3RequestOption;
+        this.fallbackNamespace = configuration.getFallbackNamespace();
 
         bucketNameResolver = BucketNameResolver.builder()
             .prefix(configuration.getBucketPrefix())
@@ -154,16 +157,6 @@ public class S3BlobStoreDAO implements BlobStoreDAO {
             .map(res -> ReactorUtils.toInputStream(res.flux));
     }
 
-    @Override
-    public Publisher<ReactiveByteSource> readAsByteSource(BucketName 
bucketName, BlobId blobId) {
-        BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
-
-        return getObject(resolvedBucketName, blobId)
-            .onErrorMap(NoSuchBucketException.class, e -> new 
ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e))
-            .onErrorMap(NoSuchKeyException.class, e -> new 
ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + 
resolvedBucketName.asString(), e))
-            .map(res -> new 
ReactiveByteSource(res.sdkResponse.contentLength(), res.flux));
-    }
-
     private static class FluxResponse {
         final CompletableFuture<FluxResponse> supportingCompletableFuture = 
new CompletableFuture<>();
         GetObjectResponse sdkResponse;
@@ -171,6 +164,17 @@ public class S3BlobStoreDAO implements BlobStoreDAO {
     }
 
     private Mono<FluxResponse> getObject(BucketName bucketName, BlobId blobId) 
{
+        return getObjectFromStore(bucketName, blobId)
+            .onErrorResume(e -> e instanceof NoSuchKeyException || e 
instanceof NoSuchBucketException, e -> {
+                if (fallbackNamespace.isPresent() && 
bucketNameResolver.isNameSpace(bucketName)) {
+                    BucketName resolvedFallbackBucketName = 
bucketNameResolver.resolve(fallbackNamespace.get());
+                    return getObjectFromStore(resolvedFallbackBucketName, 
blobId);
+                }
+                return Mono.error(e);
+            });
+    }
+
+    private Mono<FluxResponse> getObjectFromStore(BucketName bucketName, 
BlobId blobId) {
         return buildGetObjectRequestBuilder(bucketName, blobId)
             .flatMap(getObjectRequestBuilder -> Mono.fromFuture(() ->
                     client.getObject(getObjectRequestBuilder.build(),
@@ -208,14 +212,29 @@ public class S3BlobStoreDAO implements BlobStoreDAO {
     public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
         BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
 
-        return buildGetObjectRequestBuilder(resolvedBucketName, blobId)
-            .flatMap(putObjectRequest -> Mono.fromFuture(() ->
-                    client.getObject(putObjectRequest.build(), new 
MinimalCopyBytesResponseTransformer(configuration, blobId)))
+        return getObjectBytes(resolvedBucketName, blobId)
                 .onErrorMap(NoSuchBucketException.class, e -> new 
ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e))
                 .onErrorMap(NoSuchKeyException.class, e -> new 
ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + 
resolvedBucketName.asString(), e))
                 .publishOn(Schedulers.parallel())
                 .map(BytesWrapper::asByteArrayUnsafe)
-                .onErrorMap(e -> e.getCause() instanceof OutOfMemoryError, 
Throwable::getCause));
+                .onErrorMap(e -> e.getCause() instanceof OutOfMemoryError, 
Throwable::getCause);
+    }
+
+    private Mono<ResponseBytes<GetObjectResponse>> getObjectBytes(BucketName 
bucketName, BlobId blobId) {
+        return getObjectBytesFromStore(bucketName, blobId)
+                .onErrorResume(e -> e instanceof NoSuchKeyException || e 
instanceof NoSuchBucketException, e -> {
+                    if (fallbackNamespace.isPresent() && 
bucketNameResolver.isNameSpace(bucketName)) {
+                        BucketName resolvedFallbackBucketName = 
bucketNameResolver.resolve(fallbackNamespace.get());
+                        return 
getObjectBytesFromStore(resolvedFallbackBucketName, blobId);
+                    }
+                    return Mono.error(e);
+                });
+    }
+
+    private Mono<ResponseBytes<GetObjectResponse>> 
getObjectBytesFromStore(BucketName bucketName, BlobId blobId) {
+        return buildGetObjectRequestBuilder(bucketName, blobId)
+            .flatMap(putObjectRequest -> Mono.fromFuture(() ->
+                client.getObject(putObjectRequest.build(), new 
MinimalCopyBytesResponseTransformer(configuration, blobId))));
     }
 
     private Mono<GetObjectRequest.Builder> 
buildGetObjectRequestBuilder(BucketName bucketName, BlobId blobId) {
diff --git 
a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java
 
b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java
index ad84633426..6ef690eff8 100644
--- 
a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java
+++ 
b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java
@@ -24,7 +24,9 @@ import static 
org.apache.james.blob.objectstorage.aws.JamesS3MetricPublisher.DEF
 import static 
org.apache.james.blob.objectstorage.aws.S3BlobStoreConfiguration.UPLOAD_RETRY_EXCEPTION_PREDICATE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Optional;
@@ -32,6 +34,8 @@ import java.util.stream.IntStream;
 
 import org.apache.james.blob.api.BlobStoreDAO;
 import org.apache.james.blob.api.BlobStoreDAOContract;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.ObjectNotFoundException;
 import org.apache.james.blob.api.TestBlobId;
 import org.apache.james.metrics.api.NoopGaugeRegistry;
 import org.apache.james.metrics.tests.RecordingMetricFactory;
@@ -49,6 +53,8 @@ import reactor.util.retry.Retry;
 
 @ExtendWith(DockerAwsS3Extension.class)
 public class S3BlobStoreDAOTest implements BlobStoreDAOContract {
+    private static final BucketName fallbackBucket = BucketName.of("fallback");
+
     private static S3BlobStoreDAO testee;
     private static S3ClientFactory s3ClientFactory;
 
@@ -65,6 +71,8 @@ public class S3BlobStoreDAOTest implements 
BlobStoreDAOContract {
             .region(dockerAwsS3.dockerAwsS3().region())
             .uploadRetrySpec(Optional.of(Retry.backoff(3, 
java.time.Duration.ofSeconds(1))
                 .filter(UPLOAD_RETRY_EXCEPTION_PREDICATE)))
+            .defaultBucketName(BucketName.DEFAULT)
+            .fallbackBucketName(Optional.of(fallbackBucket))
             .build();
 
         s3ClientFactory = new S3ClientFactory(s3Configuration, () -> new 
JamesS3MetricPublisher(new RecordingMetricFactory(), new NoopGaugeRegistry(),
@@ -120,4 +128,73 @@ public class S3BlobStoreDAOTest implements 
BlobStoreDAOContract {
                 }
             })).doesNotThrowAnyException();
     }
+
+    @Test
+    void readShouldFallbackToDefinedBucketWhenFailingOnDefaultOne() {
+        BlobStoreDAO store = testee();
+
+        TestBlobId blobId = new TestBlobId("id");
+        Mono.from(store.save(fallbackBucket, blobId, 
ByteSource.wrap(ELEVEN_KILOBYTES))).block();
+
+        InputStream read = store.read(BucketName.DEFAULT, blobId);
+
+        assertThat(read).hasSameContentAs(new 
ByteArrayInputStream(ELEVEN_KILOBYTES));
+    }
+
+    @Test
+    void readReactiveShouldFallbackToDefinedBucketWhenFailingOnDefaultOne() {
+        BlobStoreDAO store = testee();
+
+        TestBlobId blobId = new TestBlobId("id");
+        Mono.from(store.save(fallbackBucket, blobId, 
ByteSource.wrap(ELEVEN_KILOBYTES))).block();
+
+        InputStream read = Mono.from(store.readReactive(BucketName.DEFAULT, 
blobId)).block();
+
+        assertThat(read).hasSameContentAs(new 
ByteArrayInputStream(ELEVEN_KILOBYTES));
+    }
+
+    @Test
+    void readBytesShouldFallbackToDefinedBucketWhenFailingOnDefaultOne() {
+        BlobStoreDAO store = testee();
+
+        TestBlobId blobId = new TestBlobId("id");
+        Mono.from(store.save(fallbackBucket, blobId, 
ByteSource.wrap(ELEVEN_KILOBYTES))).block();
+
+        byte[] bytes = Mono.from(store.readBytes(BucketName.DEFAULT, 
blobId)).block();
+
+        assertThat(bytes).isEqualTo(ELEVEN_KILOBYTES);
+    }
+
+    @Test
+    void shouldNotReadOnFallbackBucketWhenNotReadingOnDefaultOne() {
+        BlobStoreDAO store = testee();
+
+        TestBlobId blobId = new TestBlobId("id");
+        Mono.from(store.save(TEST_BUCKET_NAME, blobId, 
ByteSource.wrap(ELEVEN_KILOBYTES))).block();
+
+        assertThatThrownBy(() -> store.read(BucketName.DEFAULT, blobId))
+            .isExactlyInstanceOf(ObjectNotFoundException.class);
+    }
+
+    @Test
+    void shouldNotReadReactiveOnFallbackBucketWhenNotReadingOnDefaultOne() {
+        BlobStoreDAO store = testee();
+
+        TestBlobId blobId = new TestBlobId("id");
+        Mono.from(store.save(TEST_BUCKET_NAME, blobId, 
ByteSource.wrap(ELEVEN_KILOBYTES))).block();
+
+        assertThatThrownBy(() -> 
Mono.from(store.readReactive(BucketName.DEFAULT, blobId)).block())
+            .isExactlyInstanceOf(ObjectNotFoundException.class);
+    }
+
+    @Test
+    void shouldNotReadBytesOnFallbackBucketWhenNotReadingOnDefaultOne() {
+        BlobStoreDAO store = testee();
+
+        TestBlobId blobId = new TestBlobId("id");
+        Mono.from(store.save(TEST_BUCKET_NAME, blobId, 
ByteSource.wrap(ELEVEN_KILOBYTES))).block();
+
+        assertThatThrownBy(() -> Mono.from(store.readBytes(BucketName.DEFAULT, 
blobId)).block())
+            .isExactlyInstanceOf(ObjectNotFoundException.class);
+    }
 }
diff --git 
a/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreConfigurationReader.java
 
b/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreConfigurationReader.java
index a760ad0d28..775f35de1e 100644
--- 
a/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreConfigurationReader.java
+++ 
b/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreConfigurationReader.java
@@ -51,6 +51,7 @@ public class S3BlobStoreConfigurationReader {
     private static final String OBJECTSTORAGE_S3_UPLOAD_RETRY_MAX_ATTEMPTS = 
"objectstorage.s3.upload.retry.maxAttempts";
     private static final String 
OBJECTSTORAGE_S3_UPLOAD_RETRY_BACKOFF_DURATION_MILLIS = 
"objectstorage.s3.upload.retry.backoffDurationMillis";
     private static final String 
OBJECTSTORAGE_S3_ENCRYPTION_SSEC_ENABLE_PROPERTY = "encryption.s3.sse.c.enable";
+    private static final String OBJECTSTORAGE_NAMESPACE_READ_FALLBACK = 
"objectstorage.namespace.read.fallback";
 
     public static S3BlobStoreConfiguration from(Configuration configuration) 
throws ConfigurationException {
         Optional<Integer> httpConcurrency = 
Optional.ofNullable(configuration.getInteger(OBJECTSTORAGE_S3_HTTP_CONCURRENCY, 
null));
@@ -78,6 +79,8 @@ public class S3BlobStoreConfigurationReader {
 
         boolean ssecEnabled = 
configuration.getBoolean(OBJECTSTORAGE_S3_ENCRYPTION_SSEC_ENABLE_PROPERTY, 
false);
 
+        Optional<String> fallbackNamespace = 
Optional.ofNullable(configuration.getString(OBJECTSTORAGE_NAMESPACE_READ_FALLBACK,
 null));
+
         S3BlobStoreConfiguration.Builder.ReadyToBuild configBuilder = 
S3BlobStoreConfiguration.builder()
             .authConfiguration(AwsS3ConfigurationReader.from(configuration))
             .region(region)
@@ -88,7 +91,8 @@ public class S3BlobStoreConfigurationReader {
             .readTimeout(readTimeout)
             .writeTimeout(writeTimeout)
             .connectionTimeout(connectionTimeout)
-            .uploadRetrySpec(uploadRetrySpec);
+            .uploadRetrySpec(uploadRetrySpec)
+            .fallbackBucketName(fallbackNamespace.map(BucketName::of));
 
         if (ssecEnabled) {
             configBuilder.ssecEnabled().ssecConfiguration(configuration);


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to