This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 514cc902da64e982d105fba844edc2597287cd13 Author: Gautier DI FOLCO <gdifo...@linagora.com> AuthorDate: Fri Jul 5 11:34:22 2019 +0200 JAMES-2815 Stop storing twice in AWS S3 BlobStore --- .../{PutBlobFunction.java => BlobPutter.java} | 11 +- .../blob/objectstorage/ObjectStorageBlobsDAO.java | 33 +--- .../ObjectStorageBlobsDAOBuilder.java | 14 +- .../objectstorage/StreamCompatibleBlobPutter.java | 55 +++++++ .../blob/objectstorage/aws/AwsS3ObjectStorage.java | 169 ++++++++++++--------- .../aws/AwsS3ObjectStorageBlobsDAOBuilderTest.java | 2 +- .../ObjectStorageDependenciesModule.java | 6 +- 7 files changed, 181 insertions(+), 109 deletions(-) diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/PutBlobFunction.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/BlobPutter.java similarity index 81% rename from server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/PutBlobFunction.java rename to server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/BlobPutter.java index 6bae45d..a48c8f2 100644 --- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/PutBlobFunction.java +++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/BlobPutter.java @@ -19,6 +19,9 @@ package org.apache.james.blob.objectstorage; +import java.util.function.Supplier; + +import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BucketName; import org.jclouds.blobstore.domain.Blob; @@ -31,8 +34,10 @@ import org.jclouds.blobstore.domain.Blob; * whereas you don't need one by using the S3 client. * */ -@FunctionalInterface -public interface PutBlobFunction { - void putBlob(BucketName bucketName, Blob blob); +public interface BlobPutter { + + void putDirectly(BucketName bucketName, Blob blob); + + BlobId putAndComputeId(BucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier); } diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java index cdfc166..394529d 100644 --- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java +++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java @@ -22,6 +22,7 @@ package org.apache.james.blob.objectstorage; import java.io.IOException; import java.io.InputStream; import java.util.Optional; +import java.util.function.Supplier; import org.apache.commons.io.IOUtils; import org.apache.james.blob.api.BlobId; @@ -34,7 +35,6 @@ import org.apache.james.blob.objectstorage.swift.SwiftKeystone2ObjectStorage; import org.apache.james.blob.objectstorage.swift.SwiftKeystone3ObjectStorage; import org.apache.james.blob.objectstorage.swift.SwiftTempAuthObjectStorage; import org.jclouds.blobstore.domain.Blob; -import org.jclouds.blobstore.options.CopyOptions; import org.jclouds.domain.Location; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,17 +55,17 @@ public class ObjectStorageBlobsDAO implements BlobStore { private final BucketName defaultBucketName; private final org.jclouds.blobstore.BlobStore blobStore; - private final PutBlobFunction putBlobFunction; + private final BlobPutter blobPutter; private final PayloadCodec payloadCodec; ObjectStorageBlobsDAO(BucketName defaultBucketName, BlobId.Factory blobIdFactory, org.jclouds.blobstore.BlobStore blobStore, - PutBlobFunction putBlobFunction, + BlobPutter blobPutter, PayloadCodec payloadCodec) { this.blobIdFactory = blobIdFactory; this.defaultBucketName = defaultBucketName; this.blobStore = blobStore; - this.putBlobFunction = putBlobFunction; + this.blobPutter = blobPutter; this.payloadCodec = payloadCodec; } @@ -103,7 +103,7 @@ public class ObjectStorageBlobsDAO implements BlobStore { .contentLength(payload.getLength().orElse(new Long(data.length))) .build(); - return save(bucketName, blob) + return Mono.fromRunnable(() -> blobPutter.putDirectly(bucketName, blob)) .thenReturn(blobId); } @@ -112,31 +112,14 @@ public class ObjectStorageBlobsDAO implements BlobStore { Preconditions.checkNotNull(data); BlobId tmpId = blobIdFactory.randomId(); - return save(bucketName, data, tmpId) - .flatMap(id -> updateBlobId(bucketName, tmpId, id)); - } - - private Mono<BlobId> updateBlobId(BucketName bucketName, BlobId from, BlobId to) { - String bucketNameAsString = bucketName.asString(); - return Mono - .fromCallable(() -> blobStore.copyBlob(bucketNameAsString, from.asString(), bucketNameAsString, to.asString(), CopyOptions.NONE)) - .then(Mono.fromRunnable(() -> blobStore.removeBlob(bucketNameAsString, from.asString()))) - .thenReturn(to); - } - - private Mono<BlobId> save(BucketName bucketName, InputStream data, BlobId id) { HashingInputStream hashingInputStream = new HashingInputStream(Hashing.sha256(), data); Payload payload = payloadCodec.write(hashingInputStream); - Blob blob = blobStore.blobBuilder(id.asString()) + Blob blob = blobStore.blobBuilder(tmpId.asString()) .payload(payload.getPayload()) .build(); - return save(bucketName, blob) - .then(Mono.fromCallable(() -> blobIdFactory.from(hashingInputStream.hash().toString()))); - } - - private Mono<Void> save(BucketName bucketName, Blob blob) { - return Mono.fromRunnable(() -> putBlobFunction.putBlob(bucketName, blob)); + Supplier<BlobId> blobIdSupplier = () -> blobIdFactory.from(hashingInputStream.hash().toString()); + return Mono.fromRunnable(() -> blobPutter.putAndComputeId(bucketName, blob, blobIdSupplier)); } @Override diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOBuilder.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOBuilder.java index 11cef30..9122c29 100644 --- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOBuilder.java +++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOBuilder.java @@ -51,14 +51,14 @@ public class ObjectStorageBlobsDAOBuilder { private final BucketName defaultBucketName; private final BlobId.Factory blobIdFactory; private Optional<PayloadCodec> payloadCodec; - private Optional<PutBlobFunction> putBlob; + private Optional<BlobPutter> blobPutter; public ReadyToBuild(Supplier<BlobStore> supplier, BlobId.Factory blobIdFactory, BucketName defaultBucketName) { this.blobIdFactory = blobIdFactory; this.defaultBucketName = defaultBucketName; this.payloadCodec = Optional.empty(); this.supplier = supplier; - this.putBlob = Optional.empty(); + this.blobPutter = Optional.empty(); } public ReadyToBuild payloadCodec(PayloadCodec payloadCodec) { @@ -71,8 +71,8 @@ public class ObjectStorageBlobsDAOBuilder { return this; } - public ReadyToBuild putBlob(Optional<PutBlobFunction> putBlob) { - this.putBlob = putBlob; + public ReadyToBuild blobPutter(Optional<BlobPutter> blobPutter) { + this.blobPutter = blobPutter; return this; } @@ -85,12 +85,12 @@ public class ObjectStorageBlobsDAOBuilder { return new ObjectStorageBlobsDAO(defaultBucketName, blobIdFactory, blobStore, - putBlob.orElse(defaultPutBlob(blobStore)), + blobPutter.orElseGet(() -> defaultPutBlob(blobStore)), payloadCodec.orElse(PayloadCodec.DEFAULT_CODEC)); } - private PutBlobFunction defaultPutBlob(BlobStore blobStore) { - return (bucketName, blob) -> blobStore.putBlob(bucketName.asString(), blob); + private BlobPutter defaultPutBlob(BlobStore blobStore) { + return new StreamCompatibleBlobPutter(blobStore); } @VisibleForTesting diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java new file mode 100644 index 0000000..f01bc34 --- /dev/null +++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.james.blob.objectstorage; + +import java.util.function.Supplier; + +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BucketName; +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.options.CopyOptions; + +public class StreamCompatibleBlobPutter implements BlobPutter { + private final org.jclouds.blobstore.BlobStore blobStore; + + public StreamCompatibleBlobPutter(BlobStore blobStore) { + this.blobStore = blobStore; + } + + @Override + public void putDirectly(BucketName bucketName, Blob blob) { + blobStore.putBlob(bucketName.asString(), blob); + } + + @Override + public BlobId putAndComputeId(BucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier) { + putDirectly(bucketName, initialBlob); + BlobId finalId = blobIdSupplier.get(); + updateBlobId(bucketName, initialBlob.getMetadata().getName(), finalId.asString()); + return finalId; + } + + private void updateBlobId(BucketName bucketName, String from, String to) { + String bucketNameAsString = bucketName.asString(); + blobStore.copyBlob(bucketNameAsString, from, bucketNameAsString, to, CopyOptions.NONE); + blobStore.removeBlob(bucketNameAsString, from); + } +} \ No newline at end of file diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java index 3d4a3b3..1b52c85 100644 --- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java +++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java @@ -26,15 +26,17 @@ import java.util.Properties; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Consumer; import java.util.function.Supplier; import javax.annotation.PreDestroy; import javax.inject.Inject; import org.apache.commons.io.FileUtils; +import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.objectstorage.BlobPutter; import org.apache.james.blob.objectstorage.ObjectStorageBlobsDAOBuilder; -import org.apache.james.blob.objectstorage.PutBlobFunction; import org.apache.james.util.Size; import org.apache.james.util.concurrent.NamedThreadFactory; import org.jclouds.ContextBuilder; @@ -93,75 +95,8 @@ public class AwsS3ObjectStorage { return ObjectStorageBlobsDAOBuilder.forBlobStore(new BlobStoreBuilder(configuration)); } - public Optional<PutBlobFunction> putBlob(AwsS3AuthConfiguration configuration) { - return Optional.of((bucketName, blob) -> { - File file = null; - try { - file = File.createTempFile(UUID.randomUUID().toString(), ".tmp"); - FileUtils.copyToFile(blob.getPayload().openStream(), file); - putWithRetry(bucketName, configuration, blob, file, FIRST_TRY); - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - if (file != null) { - FileUtils.deleteQuietly(file); - } - } - }); - } - - private void putWithRetry(BucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file, int tried) { - try { - put(bucketName, configuration, blob, file); - } catch (RuntimeException e) { - if (tried < MAX_RETRY_ON_EXCEPTION) { - putWithRetry(bucketName, configuration, blob, file, tried + 1); - } else { - throw e; - } - } - } - - private void put(BucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file) { - try { - PutObjectRequest request = new PutObjectRequest(bucketName.asString(), - blob.getMetadata().getName(), - file); - - getTransferManager(configuration) - .upload(request) - .waitForUploadResult(); - } catch (AmazonClientException | InterruptedException e) { - throw new RuntimeException(e); - } - } - - private TransferManager getTransferManager(AwsS3AuthConfiguration configuration) { - ClientConfiguration clientConfiguration = getClientConfiguration(); - AmazonS3 amazonS3 = getS3Client(configuration, clientConfiguration); - - return TransferManagerBuilder - .standard() - .withS3Client(amazonS3) - .withMultipartUploadThreshold(MULTIPART_UPLOAD_THRESHOLD.getValue()) - .withExecutorFactory(() -> executorService) - .withShutDownThreadPools(DO_NOT_SHUTDOWN_THREAD_POOL) - .build(); - } - - private static AmazonS3 getS3Client(AwsS3AuthConfiguration configuration, ClientConfiguration clientConfiguration) { - return AmazonS3ClientBuilder - .standard() - .withClientConfiguration(clientConfiguration) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(configuration.getAccessKeyId(), configuration.getSecretKey()))) - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(configuration.getEndpoint(), null)) - .build(); - } - - private static ClientConfiguration getClientConfiguration() { - ClientConfiguration clientConfiguration = new ClientConfiguration(); - clientConfiguration.setRetryPolicy(PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(MAX_ERROR_RETRY)); - return clientConfiguration; + public Optional<BlobPutter> putBlob(AwsS3AuthConfiguration configuration) { + return Optional.of(new AwsS3BlobPutter(configuration, executorService)); } private static class BlobStoreBuilder implements Supplier<BlobStore> { @@ -188,4 +123,98 @@ public class AwsS3ObjectStorage { return ContextBuilder.newBuilder("s3"); } } + + private static class AwsS3BlobPutter implements BlobPutter { + private final AwsS3AuthConfiguration configuration; + private final ExecutorService executorService; + + AwsS3BlobPutter(AwsS3AuthConfiguration configuration, ExecutorService executorService) { + this.configuration = configuration; + this.executorService = executorService; + } + + @Override + public void putDirectly(BucketName bucketName, Blob blob) { + writeFileAndAct(blob, (file) -> putWithRetry(bucketName, configuration, blob, file, FIRST_TRY)); + } + + @Override + public BlobId putAndComputeId(BucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier) { + Consumer<File> putChangedBlob = (file) -> { + initialBlob.getMetadata().setName(blobIdSupplier.get().asString()); + putWithRetry(bucketName, configuration, initialBlob, file, FIRST_TRY); + }; + writeFileAndAct(initialBlob, putChangedBlob); + return blobIdSupplier.get(); + } + + private void writeFileAndAct(Blob blob, Consumer<File> putFile) { + File file = null; + try { + file = File.createTempFile(UUID.randomUUID().toString(), ".tmp"); + FileUtils.copyToFile(blob.getPayload().openStream(), file); + putFile.accept(file); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (file != null) { + FileUtils.deleteQuietly(file); + } + } + } + + private void putWithRetry(BucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file, int tried) { + try { + put(bucketName, configuration, blob, file); + } catch (RuntimeException e) { + if (tried < MAX_RETRY_ON_EXCEPTION) { + putWithRetry(bucketName, configuration, blob, file, tried + 1); + } else { + throw e; + } + } + } + + private void put(BucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file) { + try { + PutObjectRequest request = new PutObjectRequest(bucketName.asString(), + blob.getMetadata().getName(), + file); + + getTransferManager(configuration) + .upload(request) + .waitForUploadResult(); + } catch (AmazonClientException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private TransferManager getTransferManager(AwsS3AuthConfiguration configuration) { + ClientConfiguration clientConfiguration = getClientConfiguration(); + AmazonS3 amazonS3 = getS3Client(configuration, clientConfiguration); + + return TransferManagerBuilder + .standard() + .withS3Client(amazonS3) + .withMultipartUploadThreshold(MULTIPART_UPLOAD_THRESHOLD.getValue()) + .withExecutorFactory(() -> executorService) + .withShutDownThreadPools(DO_NOT_SHUTDOWN_THREAD_POOL) + .build(); + } + + private static AmazonS3 getS3Client(AwsS3AuthConfiguration configuration, ClientConfiguration clientConfiguration) { + return AmazonS3ClientBuilder + .standard() + .withClientConfiguration(clientConfiguration) + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(configuration.getAccessKeyId(), configuration.getSecretKey()))) + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(configuration.getEndpoint(), null)) + .build(); + } + + private static ClientConfiguration getClientConfiguration() { + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setRetryPolicy(PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(MAX_ERROR_RETRY)); + return clientConfiguration; + } + } } diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorageBlobsDAOBuilderTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorageBlobsDAOBuilderTest.java index 87b87f6..1592c99 100644 --- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorageBlobsDAOBuilderTest.java +++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorageBlobsDAOBuilderTest.java @@ -87,7 +87,7 @@ class AwsS3ObjectStorageBlobsDAOBuilderTest implements ObjectStorageBlobsDAOCont .builder(configuration) .defaultBucketName(defaultBucketName) .blobIdFactory(new HashBlobId.Factory()) - .putBlob(awsS3ObjectStorage.putBlob(configuration)); + .blobPutter(awsS3ObjectStorage.putBlob(configuration)); assertBlobsDAOCanStoreAndRetrieve(builder); } diff --git a/server/container/guice/blob-objectstorage-guice/src/main/java/org/apache/james/modules/objectstorage/ObjectStorageDependenciesModule.java b/server/container/guice/blob-objectstorage-guice/src/main/java/org/apache/james/modules/objectstorage/ObjectStorageDependenciesModule.java index 550238e..fc7675b 100644 --- a/server/container/guice/blob-objectstorage-guice/src/main/java/org/apache/james/modules/objectstorage/ObjectStorageDependenciesModule.java +++ b/server/container/guice/blob-objectstorage-guice/src/main/java/org/apache/james/modules/objectstorage/ObjectStorageDependenciesModule.java @@ -32,9 +32,9 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.HashBlobId; +import org.apache.james.blob.objectstorage.BlobPutter; import org.apache.james.blob.objectstorage.ObjectStorageBlobsDAO; import org.apache.james.blob.objectstorage.ObjectStorageBlobsDAOBuilder; -import org.apache.james.blob.objectstorage.PutBlobFunction; import org.apache.james.blob.objectstorage.aws.AwsS3AuthConfiguration; import org.apache.james.blob.objectstorage.aws.AwsS3ObjectStorage; import org.apache.james.modules.mailbox.ConfigurationComponent; @@ -70,7 +70,7 @@ public class ObjectStorageDependenciesModule extends AbstractModule { .defaultBucketName(configuration.getNamespace()) .blobIdFactory(blobIdFactory) .payloadCodec(configuration.getPayloadCodec()) - .putBlob(putBlob(blobIdFactory, configuration, awsS3ObjectStorageProvider)) + .blobPutter(putBlob(blobIdFactory, configuration, awsS3ObjectStorageProvider)) .build(); dao.createBucket(dao.getDefaultBucketName()).block(Duration.ofMinutes(1)); return dao; @@ -86,7 +86,7 @@ public class ObjectStorageDependenciesModule extends AbstractModule { throw new IllegalArgumentException("unknown provider " + configuration.getProvider()); } - private Optional<PutBlobFunction> putBlob(BlobId.Factory blobIdFactory, ObjectStorageBlobConfiguration configuration, Provider<AwsS3ObjectStorage> awsS3ObjectStorageProvider) { + private Optional<BlobPutter> putBlob(BlobId.Factory blobIdFactory, ObjectStorageBlobConfiguration configuration, Provider<AwsS3ObjectStorage> awsS3ObjectStorageProvider) { switch (configuration.getProvider()) { case SWIFT: return Optional.empty(); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org