This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b5fb2f7174e177944336953afa4fb9e09f615c3e
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Thu Jul 4 17:09:51 2019 +0700

    JAMES-2806 ObjectStorage S3 retry save() on non existing bucket
---
 .../blob/objectstorage/aws/AwsS3ObjectStorage.java | 72 +++++++++++------
 .../ObjectStorageBlobsDAOAWSTest.java              | 90 ++++++++++++++++++++++
 2 files changed, 137 insertions(+), 25 deletions(-)

diff --git 
a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
 
b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
index 1b52c85..4884dd5 100644
--- 
a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
+++ 
b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
@@ -21,6 +21,7 @@ package org.apache.james.blob.objectstorage.aws;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.UUID;
@@ -45,7 +46,6 @@ import org.jclouds.blobstore.BlobStoreContext;
 import org.jclouds.blobstore.domain.Blob;
 import org.jclouds.logging.slf4j.config.SLF4JLoggingModule;
 
-import com.amazonaws.AmazonClientException;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.AWSStaticCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
@@ -53,13 +53,19 @@ import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.retry.PredefinedRetryPolicies;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.transfer.TransferManager;
 import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
+import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.Module;
 
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.retry.Retry;
+
 public class AwsS3ObjectStorage {
 
     private static final Iterable<Module> JCLOUDS_MODULES = 
ImmutableSet.of(new SLF4JLoggingModule());
@@ -82,7 +88,7 @@ public class AwsS3ObjectStorage {
 
     @Inject
     @VisibleForTesting
-    AwsS3ObjectStorage() {
+    public AwsS3ObjectStorage() {
         executorService = Executors.newFixedThreadPool(MAX_THREADS, 
NamedThreadFactory.withClassName(AwsS3ObjectStorage.class));
     }
 
@@ -125,6 +131,12 @@ public class AwsS3ObjectStorage {
     }
 
     private static class AwsS3BlobPutter implements BlobPutter {
+
+        private static final int NOT_FOUND_STATUS_CODE = 404;
+        private static final String BUCKET_NOT_FOUND_ERROR_CODE = 
"NoSuchBucket";
+        private static final Duration FIRST_BACK_OFF = Duration.ofMillis(100);
+        private static final Duration FOREVER = 
Duration.ofMillis(Long.MAX_VALUE);
+
         private final AwsS3AuthConfiguration configuration;
         private final ExecutorService executorService;
 
@@ -135,14 +147,14 @@ public class AwsS3ObjectStorage {
 
         @Override
         public void putDirectly(BucketName bucketName, Blob blob) {
-            writeFileAndAct(blob, (file) -> putWithRetry(bucketName, 
configuration, blob, file, FIRST_TRY));
+            writeFileAndAct(blob, (file) -> putWithRetry(bucketName, 
configuration, blob, file, FIRST_TRY).block());
         }
 
         @Override
         public BlobId putAndComputeId(BucketName bucketName, Blob initialBlob, 
Supplier<BlobId> blobIdSupplier) {
             Consumer<File> putChangedBlob = (file) -> {
                 
initialBlob.getMetadata().setName(blobIdSupplier.get().asString());
-                putWithRetry(bucketName, configuration, initialBlob, file, 
FIRST_TRY);
+                putWithRetry(bucketName, configuration, initialBlob, file, 
FIRST_TRY).block();
             };
             writeFileAndAct(initialBlob, putChangedBlob);
             return blobIdSupplier.get();
@@ -163,30 +175,40 @@ public class AwsS3ObjectStorage {
             }
         }
 
-        private void putWithRetry(BucketName bucketName, 
AwsS3AuthConfiguration configuration, Blob blob, File file, int tried) {
-            try {
-                put(bucketName, configuration, blob, file);
-            } catch (RuntimeException e) {
-                if (tried < MAX_RETRY_ON_EXCEPTION) {
-                    putWithRetry(bucketName, configuration, blob, file, tried 
+ 1);
-                } else {
-                    throw e;
-                }
-            }
+        private Mono<Void> putWithRetry(BucketName bucketName, 
AwsS3AuthConfiguration configuration, Blob blob, File file, int tried) {
+            return Mono.<Void>fromRunnable(Throwing.runnable(() -> 
put(bucketName, configuration, blob, file)).sneakyThrow())
+                .publishOn(Schedulers.elastic())
+                .retryWhen(Retry
+                    .<Void>onlyIf(retryContext -> 
needToCreateBucket(retryContext.exception()))
+                    .exponentialBackoff(FIRST_BACK_OFF, FOREVER)
+                    .withBackoffScheduler(Schedulers.elastic())
+                    .retryMax(MAX_RETRY_ON_EXCEPTION)
+                    .doOnRetry(retryContext -> createBucket(bucketName, 
configuration)));
         }
 
-        private void put(BucketName bucketName, AwsS3AuthConfiguration 
configuration, Blob blob, File file) {
-            try {
-                PutObjectRequest request = new 
PutObjectRequest(bucketName.asString(),
-                        blob.getMetadata().getName(),
-                        file);
-
-                getTransferManager(configuration)
-                        .upload(request)
-                        .waitForUploadResult();
-            } catch (AmazonClientException | InterruptedException e) {
-                throw new RuntimeException(e);
+        private void put(BucketName bucketName, AwsS3AuthConfiguration 
configuration, Blob blob, File file) throws InterruptedException {
+            PutObjectRequest request = new 
PutObjectRequest(bucketName.asString(),
+                blob.getMetadata().getName(),
+                file);
+
+            getTransferManager(configuration)
+                .upload(request)
+                .waitForUploadResult();
+        }
+
+        private void createBucket(BucketName bucketName, 
AwsS3AuthConfiguration configuration) {
+            getS3Client(configuration, getClientConfiguration())
+                .createBucket(bucketName.asString());
+        }
+
+        private boolean needToCreateBucket(Throwable th) {
+            if (th instanceof AmazonS3Exception) {
+                AmazonS3Exception s3Exception = (AmazonS3Exception) th;
+                return NOT_FOUND_STATUS_CODE == s3Exception.getStatusCode()
+                    && 
BUCKET_NOT_FOUND_ERROR_CODE.equals(s3Exception.getErrorCode());
             }
+
+            return false;
         }
 
         private TransferManager getTransferManager(AwsS3AuthConfiguration 
configuration) {
diff --git 
a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOAWSTest.java
 
b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOAWSTest.java
new file mode 100644
index 0000000..716ff23
--- /dev/null
+++ 
b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOAWSTest.java
@@ -0,0 +1,90 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.objectstorage;
+
+import java.util.UUID;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BucketBlobStoreContract;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.MetricableBlobStore;
+import org.apache.james.blob.api.MetricableBlobStoreContract;
+import org.apache.james.blob.objectstorage.aws.AwsS3AuthConfiguration;
+import org.apache.james.blob.objectstorage.aws.AwsS3ObjectStorage;
+import org.apache.james.blob.objectstorage.aws.DockerAwsS3Container;
+import org.apache.james.blob.objectstorage.aws.DockerAwsS3Extension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(DockerAwsS3Extension.class)
+public class ObjectStorageBlobsDAOAWSTest implements 
MetricableBlobStoreContract, BucketBlobStoreContract {
+
+    private static final HashBlobId.Factory BLOB_ID_FACTORY = new 
HashBlobId.Factory();
+
+    private BucketName defaultBucketName;
+    private org.jclouds.blobstore.BlobStore blobStore;
+    private ObjectStorageBlobsDAO objectStorageBlobsDAO;
+    private AwsS3ObjectStorage awsS3ObjectStorage;
+    private AwsS3AuthConfiguration configuration;
+    private BlobStore testee;
+
+    @BeforeEach
+    void setUp(DockerAwsS3Container dockerAwsS3) {
+        awsS3ObjectStorage = new AwsS3ObjectStorage();
+        defaultBucketName = BucketName.of(UUID.randomUUID().toString());
+        configuration = AwsS3AuthConfiguration.builder()
+            .endpoint(dockerAwsS3.getEndpoint())
+            .accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID)
+            .secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY)
+            .build();
+
+        ObjectStorageBlobsDAOBuilder.ReadyToBuild builder = 
ObjectStorageBlobsDAO
+            .builder(configuration)
+            .defaultBucketName(defaultBucketName)
+            .blobIdFactory(BLOB_ID_FACTORY)
+            .blobPutter(awsS3ObjectStorage.putBlob(configuration));
+
+        blobStore = builder.getSupplier().get();
+        objectStorageBlobsDAO = builder.build();
+        objectStorageBlobsDAO.createBucket(defaultBucketName).block();
+        testee = new 
MetricableBlobStore(metricsTestExtension.getMetricFactory(), 
objectStorageBlobsDAO);
+    }
+
+    @AfterEach
+    void tearDown() {
+        blobStore.deleteContainer(defaultBucketName.asString());
+        blobStore.deleteContainer(CUSTOM.asString());
+        blobStore.getContext().close();
+        awsS3ObjectStorage.tearDown();
+    }
+
+    @Override
+    public BlobStore testee() {
+        return testee;
+    }
+
+    @Override
+    public BlobId.Factory blobIdFactory() {
+        return BLOB_ID_FACTORY;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to