This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch CAMEL-21631
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 02ffcd56b4e7a8979200830b69a5a17a2bc8efa8
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Fri Jan 17 10:59:07 2025 +0100

    CAMEL-21631 - Enable Checksum algorithm on S3 streaming upload producer
    
    Signed-off-by: Andrea Cosentino <anco...@gmail.com>
---
 components/camel-aws/camel-aws2-s3/pom.xml         |  5 +++++
 .../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 25 +++++++---------------
 2 files changed, 13 insertions(+), 17 deletions(-)

diff --git a/components/camel-aws/camel-aws2-s3/pom.xml 
b/components/camel-aws/camel-aws2-s3/pom.xml
index 6378926e0a6..d35fa75fd7f 100644
--- a/components/camel-aws/camel-aws2-s3/pom.xml
+++ b/components/camel-aws/camel-aws2-s3/pom.xml
@@ -45,6 +45,11 @@
             <artifactId>s3</artifactId>
             <version>${aws-java-sdk2-version}</version>
         </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>checksums</artifactId>
+            <version>${aws-java-sdk2-version}</version>
+        </dependency>
         <dependency>
             <groupId>software.amazon.awssdk</groupId>
             <artifactId>apache-client</artifactId>
diff --git 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
index 57c8459fb9c..b25d3ae0281 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
@@ -41,18 +41,7 @@ import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.core.sync.RequestBody;
-import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
-import software.amazon.awssdk.services.s3.model.BucketCannedACL;
-import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
-import 
software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
-import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
-import software.amazon.awssdk.services.s3.model.CompletedPart;
-import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
-import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
-import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
-import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
-import software.amazon.awssdk.services.s3.model.S3Object;
-import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+import software.amazon.awssdk.services.s3.model.*;
 import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
 
 /**
@@ -68,6 +57,7 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
     private final Lock lock = new ReentrantLock();
     private transient String s3ProducerToString;
     private ScheduledExecutorService timeoutCheckerExecutorService;
+    private ChecksumAlgorithm algorithm = ChecksumAlgorithm.CRC32;
 
     public AWS2S3StreamUploadProducer(final Endpoint endpoint) {
         super(endpoint);
@@ -194,7 +184,7 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
                     state.part, state.id);
             CreateMultipartUploadRequest.Builder createMultipartUploadRequest
                     = 
CreateMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName())
-                            .key(state.dynamicKeyName);
+                            
.key(state.dynamicKeyName).checksumAlgorithm(algorithm);
 
             String storageClass = AWS2S3Utils.determineStorageClass(exchange, 
getConfiguration());
             if (storageClass != null) {
@@ -300,14 +290,15 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
     private void uploadPart(UploadState state) {
         UploadPartRequest uploadRequest = 
UploadPartRequest.builder().bucket(getConfiguration().getBucketName())
                 
.key(state.dynamicKeyName).uploadId(state.initResponse.uploadId())
-                .partNumber(state.multipartIndex).build();
+                
.partNumber(state.multipartIndex).checksumAlgorithm(algorithm).build();
 
         LOG.trace("Uploading part {}, multipart {} at index {} for {}", 
state.part, state.multipartIndex, state.index,
                 getConfiguration().getKeyName());
 
-        String etag = getEndpoint().getS3Client()
-                .uploadPart(uploadRequest, 
RequestBody.fromBytes(state.buffer.toByteArray())).eTag();
-        CompletedPart partUpload = 
CompletedPart.builder().partNumber(state.multipartIndex).eTag(etag).build();
+        UploadPartResponse partResponse = getEndpoint().getS3Client()
+                .uploadPart(uploadRequest, 
RequestBody.fromBytes(state.buffer.toByteArray()));
+        CompletedPart partUpload = 
CompletedPart.builder().partNumber(state.multipartIndex)
+                
.checksumCRC32(partResponse.checksumCRC32()).eTag(partResponse.eTag()).build();
         state.completedParts.add(partUpload);
         state.buffer.reset();
         state.multipartIndex++;

Reply via email to