This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch CAMEL-21631 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 02ffcd56b4e7a8979200830b69a5a17a2bc8efa8 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Fri Jan 17 10:59:07 2025 +0100 CAMEL-21631 - Enable Checksum algorithm on S3 streaming upload producer Signed-off-by: Andrea Cosentino <anco...@gmail.com> --- components/camel-aws/camel-aws2-s3/pom.xml | 5 +++++ .../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 25 +++++++--------------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/components/camel-aws/camel-aws2-s3/pom.xml b/components/camel-aws/camel-aws2-s3/pom.xml index 6378926e0a6..d35fa75fd7f 100644 --- a/components/camel-aws/camel-aws2-s3/pom.xml +++ b/components/camel-aws/camel-aws2-s3/pom.xml @@ -45,6 +45,11 @@ <artifactId>s3</artifactId> <version>${aws-java-sdk2-version}</version> </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>checksums</artifactId> + <version>${aws-java-sdk2-version}</version> + </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>apache-client</artifactId> diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java index 57c8459fb9c..b25d3ae0281 100644 --- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java +++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java @@ -41,18 +41,7 @@ import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; -import software.amazon.awssdk.services.s3.model.BucketCannedACL; -import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; -import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; -import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; -import software.amazon.awssdk.services.s3.model.CompletedPart; -import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; -import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; -import software.amazon.awssdk.services.s3.model.ObjectCannedACL; -import software.amazon.awssdk.services.s3.model.S3Object; -import software.amazon.awssdk.services.s3.model.UploadPartRequest; +import software.amazon.awssdk.services.s3.model.*; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; /** @@ -68,6 +57,7 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { private final Lock lock = new ReentrantLock(); private transient String s3ProducerToString; private ScheduledExecutorService timeoutCheckerExecutorService; + private ChecksumAlgorithm algorithm = ChecksumAlgorithm.CRC32; public AWS2S3StreamUploadProducer(final Endpoint endpoint) { super(endpoint); @@ -194,7 +184,7 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { state.part, state.id); CreateMultipartUploadRequest.Builder createMultipartUploadRequest = CreateMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName()) - .key(state.dynamicKeyName); + .key(state.dynamicKeyName).checksumAlgorithm(algorithm); String storageClass = AWS2S3Utils.determineStorageClass(exchange, getConfiguration()); if (storageClass != null) { @@ -300,14 +290,15 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer { private void uploadPart(UploadState state) { UploadPartRequest uploadRequest = UploadPartRequest.builder().bucket(getConfiguration().getBucketName()) .key(state.dynamicKeyName).uploadId(state.initResponse.uploadId()) - .partNumber(state.multipartIndex).build(); + .partNumber(state.multipartIndex).checksumAlgorithm(algorithm).build(); LOG.trace("Uploading part {}, multipart {} at index {} for {}", state.part, state.multipartIndex, state.index, getConfiguration().getKeyName()); - String etag = getEndpoint().getS3Client() - .uploadPart(uploadRequest, RequestBody.fromBytes(state.buffer.toByteArray())).eTag(); - CompletedPart partUpload = CompletedPart.builder().partNumber(state.multipartIndex).eTag(etag).build(); + UploadPartResponse partResponse = getEndpoint().getS3Client() + .uploadPart(uploadRequest, RequestBody.fromBytes(state.buffer.toByteArray())); + CompletedPart partUpload = CompletedPart.builder().partNumber(state.multipartIndex) + .checksumCRC32(partResponse.checksumCRC32()).eTag(partResponse.eTag()).build(); state.completedParts.add(partUpload); state.buffer.reset(); state.multipartIndex++;