This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b5fb2f7174e177944336953afa4fb9e09f615c3e Author: Tran Tien Duc <dt...@linagora.com> AuthorDate: Thu Jul 4 17:09:51 2019 +0700 JAMES-2806 ObjectStorage S3 retry save() on non existing bucket --- .../blob/objectstorage/aws/AwsS3ObjectStorage.java | 72 +++++++++++------ .../ObjectStorageBlobsDAOAWSTest.java | 90 ++++++++++++++++++++++ 2 files changed, 137 insertions(+), 25 deletions(-) diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java index 1b52c85..4884dd5 100644 --- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java +++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java @@ -21,6 +21,7 @@ package org.apache.james.blob.objectstorage.aws; import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.Optional; import java.util.Properties; import java.util.UUID; @@ -45,7 +46,6 @@ import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.domain.Blob; import org.jclouds.logging.slf4j.config.SLF4JLoggingModule; -import com.amazonaws.AmazonClientException; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; @@ -53,13 +53,19 @@ import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.retry.PredefinedRetryPolicies; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.TransferManagerBuilder; +import com.github.fge.lambdas.Throwing; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.google.inject.Module; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.retry.Retry; + public class AwsS3ObjectStorage { private static final Iterable<Module> JCLOUDS_MODULES = ImmutableSet.of(new SLF4JLoggingModule()); @@ -82,7 +88,7 @@ public class AwsS3ObjectStorage { @Inject @VisibleForTesting - AwsS3ObjectStorage() { + public AwsS3ObjectStorage() { executorService = Executors.newFixedThreadPool(MAX_THREADS, NamedThreadFactory.withClassName(AwsS3ObjectStorage.class)); } @@ -125,6 +131,12 @@ public class AwsS3ObjectStorage { } private static class AwsS3BlobPutter implements BlobPutter { + + private static final int NOT_FOUND_STATUS_CODE = 404; + private static final String BUCKET_NOT_FOUND_ERROR_CODE = "NoSuchBucket"; + private static final Duration FIRST_BACK_OFF = Duration.ofMillis(100); + private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE); + private final AwsS3AuthConfiguration configuration; private final ExecutorService executorService; @@ -135,14 +147,14 @@ public class AwsS3ObjectStorage { @Override public void putDirectly(BucketName bucketName, Blob blob) { - writeFileAndAct(blob, (file) -> putWithRetry(bucketName, configuration, blob, file, FIRST_TRY)); + writeFileAndAct(blob, (file) -> putWithRetry(bucketName, configuration, blob, file, FIRST_TRY).block()); } @Override public BlobId putAndComputeId(BucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier) { Consumer<File> putChangedBlob = (file) -> { initialBlob.getMetadata().setName(blobIdSupplier.get().asString()); - putWithRetry(bucketName, configuration, initialBlob, file, FIRST_TRY); + putWithRetry(bucketName, configuration, initialBlob, file, FIRST_TRY).block(); }; writeFileAndAct(initialBlob, putChangedBlob); return blobIdSupplier.get(); @@ -163,30 +175,40 @@ public class AwsS3ObjectStorage { } } - private void putWithRetry(BucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file, int tried) { - try { - put(bucketName, configuration, blob, file); - } catch (RuntimeException e) { - if (tried < MAX_RETRY_ON_EXCEPTION) { - putWithRetry(bucketName, configuration, blob, file, tried + 1); - } else { - throw e; - } - } + private Mono<Void> putWithRetry(BucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file, int tried) { + return Mono.<Void>fromRunnable(Throwing.runnable(() -> put(bucketName, configuration, blob, file)).sneakyThrow()) + .publishOn(Schedulers.elastic()) + .retryWhen(Retry + .<Void>onlyIf(retryContext -> needToCreateBucket(retryContext.exception())) + .exponentialBackoff(FIRST_BACK_OFF, FOREVER) + .withBackoffScheduler(Schedulers.elastic()) + .retryMax(MAX_RETRY_ON_EXCEPTION) + .doOnRetry(retryContext -> createBucket(bucketName, configuration))); } - private void put(BucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file) { - try { - PutObjectRequest request = new PutObjectRequest(bucketName.asString(), - blob.getMetadata().getName(), - file); - - getTransferManager(configuration) - .upload(request) - .waitForUploadResult(); - } catch (AmazonClientException | InterruptedException e) { - throw new RuntimeException(e); + private void put(BucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file) throws InterruptedException { + PutObjectRequest request = new PutObjectRequest(bucketName.asString(), + blob.getMetadata().getName(), + file); + + getTransferManager(configuration) + .upload(request) + .waitForUploadResult(); + } + + private void createBucket(BucketName bucketName, AwsS3AuthConfiguration configuration) { + getS3Client(configuration, getClientConfiguration()) + .createBucket(bucketName.asString()); + } + + private boolean needToCreateBucket(Throwable th) { + if (th instanceof AmazonS3Exception) { + AmazonS3Exception s3Exception = (AmazonS3Exception) th; + return NOT_FOUND_STATUS_CODE == s3Exception.getStatusCode() + && BUCKET_NOT_FOUND_ERROR_CODE.equals(s3Exception.getErrorCode()); } + + return false; } private TransferManager getTransferManager(AwsS3AuthConfiguration configuration) { diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOAWSTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOAWSTest.java new file mode 100644 index 0000000..716ff23 --- /dev/null +++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOAWSTest.java @@ -0,0 +1,90 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.objectstorage; + +import java.util.UUID; + +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BucketBlobStoreContract; +import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.HashBlobId; +import org.apache.james.blob.api.MetricableBlobStore; +import org.apache.james.blob.api.MetricableBlobStoreContract; +import org.apache.james.blob.objectstorage.aws.AwsS3AuthConfiguration; +import org.apache.james.blob.objectstorage.aws.AwsS3ObjectStorage; +import org.apache.james.blob.objectstorage.aws.DockerAwsS3Container; +import org.apache.james.blob.objectstorage.aws.DockerAwsS3Extension; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(DockerAwsS3Extension.class) +public class ObjectStorageBlobsDAOAWSTest implements MetricableBlobStoreContract, BucketBlobStoreContract { + + private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); + + private BucketName defaultBucketName; + private org.jclouds.blobstore.BlobStore blobStore; + private ObjectStorageBlobsDAO objectStorageBlobsDAO; + private AwsS3ObjectStorage awsS3ObjectStorage; + private AwsS3AuthConfiguration configuration; + private BlobStore testee; + + @BeforeEach + void setUp(DockerAwsS3Container dockerAwsS3) { + awsS3ObjectStorage = new AwsS3ObjectStorage(); + defaultBucketName = BucketName.of(UUID.randomUUID().toString()); + configuration = AwsS3AuthConfiguration.builder() + .endpoint(dockerAwsS3.getEndpoint()) + .accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID) + .secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY) + .build(); + + ObjectStorageBlobsDAOBuilder.ReadyToBuild builder = ObjectStorageBlobsDAO + .builder(configuration) + .defaultBucketName(defaultBucketName) + .blobIdFactory(BLOB_ID_FACTORY) + .blobPutter(awsS3ObjectStorage.putBlob(configuration)); + + blobStore = builder.getSupplier().get(); + objectStorageBlobsDAO = builder.build(); + objectStorageBlobsDAO.createBucket(defaultBucketName).block(); + testee = new MetricableBlobStore(metricsTestExtension.getMetricFactory(), objectStorageBlobsDAO); + } + + @AfterEach + void tearDown() { + blobStore.deleteContainer(defaultBucketName.asString()); + blobStore.deleteContainer(CUSTOM.asString()); + blobStore.getContext().close(); + awsS3ObjectStorage.tearDown(); + } + + @Override + public BlobStore testee() { + return testee; + } + + @Override + public BlobId.Factory blobIdFactory() { + return BLOB_ID_FACTORY; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org