This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 14f042e11bc CAMEL-16871: camel-aws-s3 - multipartUpload to support
generic input … (#18805)
14f042e11bc is described below
commit 14f042e11bc1522a5eb917bfc9365a3c17877d17
Author: Claus Ibsen <[email protected]>
AuthorDate: Sun Aug 3 12:55:38 2025 +0200
CAMEL-16871: camel-aws-s3 - multipartUpload to support generic input …
(#18805)
* CAMEL-16871: camel-aws-s3 - multipartUpload to support generic input
stream also
---
.../apache/camel/catalog/components/aws2-s3.json | 4 +-
.../apache/camel/component/aws2/s3/aws2-s3.json | 4 +-
.../component/aws2/s3/AWS2S3Configuration.java | 4 +-
.../camel/component/aws2/s3/AWS2S3Producer.java | 99 +++++++++++----------
.../camel/component/aws2/s3/utils/AWS2S3Utils.java | 3 +
.../s3/integration/S3StreamUploadMultipartIT.java | 10 +--
...ltipartIT.java => S3UploadFileMultipartIT.java} | 35 ++++----
...IT.java => S3UploadInputStreamMultipartIT.java} | 37 ++++----
...UploadInputStreamMultipartNoStreamCacheIT.java} | 39 ++++----
.../camel-aws2-s3/src/test/resources/empty-big.bin | Bin 0 -> 10485760 bytes
.../dsl/Aws2S3ComponentBuilderFactory.java | 7 +-
.../endpoint/dsl/AWS2S3EndpointBuilderFactory.java | 14 +--
12 files changed, 129 insertions(+), 127 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-s3.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-s3.json
index 71939689b15..2c7d968d35f 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-s3.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-s3.json
@@ -59,7 +59,7 @@
"multiPartUpload": { "index": 32, "kind": "property", "displayName":
"Multi Part Upload", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration",
"configurationField": "configuration", "description": "If it is true, camel
will upload the file with multipart format. The part size [...]
"namingStrategy": { "index": 33, "kind": "property", "displayName":
"Naming Strategy", "group": "producer", "label": "producer", "required": false,
"type": "object", "javaType":
"org.apache.camel.component.aws2.s3.stream.AWSS3NamingStrategyEnum", "enum": [
"progressive", "random" ], "deprecated": false, "autowired": false, "secret":
false, "defaultValue": "progressive", "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configuratio [...]
"operation": { "index": 34, "kind": "property", "displayName":
"Operation", "group": "producer", "label": "producer", "required": false,
"type": "object", "javaType":
"org.apache.camel.component.aws2.s3.AWS2S3Operations", "enum": [ "copyObject",
"listObjects", "deleteObject", "deleteBucket", "listBuckets", "getObject",
"getObjectRange", "createDownloadLink", "headBucket", "headObject" ],
"deprecated": false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel [...]
- "partSize": { "index": 35, "kind": "property", "displayName": "Part Size",
"group": "producer", "label": "producer", "required": false, "type": "integer",
"javaType": "long", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": 26214400, "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configuration", "description": "Set up the partSize which is used in multipart
upload, the default size is 25M. Camel will onl [...]
+ "partSize": { "index": 35, "kind": "property", "displayName": "Part Size",
"group": "producer", "label": "producer", "required": false, "type": "integer",
"javaType": "long", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": 26214400, "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configuration", "description": "Set up the partSize which is used in multipart
upload, the default size is 25 MB. The minimum [...]
"restartingPolicy": { "index": 36, "kind": "property", "displayName":
"Restarting Policy", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.aws2.s3.stream.AWSS3RestartingPolicyEnum", "enum":
[ "override", "lastPart" ], "deprecated": false, "autowired": false, "secret":
false, "defaultValue": "override", "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configurat [...]
"storageClass": { "index": 37, "kind": "property", "displayName": "Storage
Class", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configuration", "description": "The storage class to set in the
com.amazonaws.services.s3.model.PutObjectRequest request." },
"streamingUploadMode": { "index": 38, "kind": "property", "displayName":
"Streaming Upload Mode", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration",
"configurationField": "configuration", "description": "When stream mode is
true, the upload to bucket will be done in streaming" },
@@ -168,7 +168,7 @@
"multiPartUpload": { "index": 38, "kind": "parameter", "displayName":
"Multi Part Upload", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration",
"configurationField": "configuration", "description": "If it is true, camel
will upload the file with multipart format. The part size [...]
"namingStrategy": { "index": 39, "kind": "parameter", "displayName":
"Naming Strategy", "group": "producer", "label": "producer", "required": false,
"type": "object", "javaType":
"org.apache.camel.component.aws2.s3.stream.AWSS3NamingStrategyEnum", "enum": [
"progressive", "random" ], "deprecated": false, "autowired": false, "secret":
false, "defaultValue": "progressive", "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configurati [...]
"operation": { "index": 40, "kind": "parameter", "displayName":
"Operation", "group": "producer", "label": "producer", "required": false,
"type": "object", "javaType":
"org.apache.camel.component.aws2.s3.AWS2S3Operations", "enum": [ "copyObject",
"listObjects", "deleteObject", "deleteBucket", "listBuckets", "getObject",
"getObjectRange", "createDownloadLink", "headBucket", "headObject" ],
"deprecated": false, "autowired": false, "secret": false, "configurationClass":
"org.apache.came [...]
- "partSize": { "index": 41, "kind": "parameter", "displayName": "Part
Size", "group": "producer", "label": "producer", "required": false, "type":
"integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 26214400, "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configuration", "description": "Set up the partSize which is used in multipart
upload, the default size is 25M. Camel will on [...]
+ "partSize": { "index": 41, "kind": "parameter", "displayName": "Part
Size", "group": "producer", "label": "producer", "required": false, "type":
"integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 26214400, "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configuration", "description": "Set up the partSize which is used in multipart
upload, the default size is 25 MB. The minimum [...]
"restartingPolicy": { "index": 42, "kind": "parameter", "displayName":
"Restarting Policy", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.aws2.s3.stream.AWSS3RestartingPolicyEnum", "enum":
[ "override", "lastPart" ], "deprecated": false, "autowired": false, "secret":
false, "defaultValue": "override", "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configura [...]
"storageClass": { "index": 43, "kind": "parameter", "displayName":
"Storage Class", "group": "producer", "label": "producer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configuration", "description": "The storage class to set in the
com.amazonaws.services.s3.model.PutObjectRequest request." },
"streamingUploadMode": { "index": 44, "kind": "parameter", "displayName":
"Streaming Upload Mode", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration",
"configurationField": "configuration", "description": "When stream mode is
true, the upload to bucket will be done in streaming" },
diff --git
a/components/camel-aws/camel-aws2-s3/src/generated/resources/META-INF/org/apache/camel/component/aws2/s3/aws2-s3.json
b/components/camel-aws/camel-aws2-s3/src/generated/resources/META-INF/org/apache/camel/component/aws2/s3/aws2-s3.json
index 71939689b15..2c7d968d35f 100644
---
a/components/camel-aws/camel-aws2-s3/src/generated/resources/META-INF/org/apache/camel/component/aws2/s3/aws2-s3.json
+++
b/components/camel-aws/camel-aws2-s3/src/generated/resources/META-INF/org/apache/camel/component/aws2/s3/aws2-s3.json
@@ -59,7 +59,7 @@
"multiPartUpload": { "index": 32, "kind": "property", "displayName":
"Multi Part Upload", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration",
"configurationField": "configuration", "description": "If it is true, camel
will upload the file with multipart format. The part size [...]
"namingStrategy": { "index": 33, "kind": "property", "displayName":
"Naming Strategy", "group": "producer", "label": "producer", "required": false,
"type": "object", "javaType":
"org.apache.camel.component.aws2.s3.stream.AWSS3NamingStrategyEnum", "enum": [
"progressive", "random" ], "deprecated": false, "autowired": false, "secret":
false, "defaultValue": "progressive", "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configuratio [...]
"operation": { "index": 34, "kind": "property", "displayName":
"Operation", "group": "producer", "label": "producer", "required": false,
"type": "object", "javaType":
"org.apache.camel.component.aws2.s3.AWS2S3Operations", "enum": [ "copyObject",
"listObjects", "deleteObject", "deleteBucket", "listBuckets", "getObject",
"getObjectRange", "createDownloadLink", "headBucket", "headObject" ],
"deprecated": false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel [...]
- "partSize": { "index": 35, "kind": "property", "displayName": "Part Size",
"group": "producer", "label": "producer", "required": false, "type": "integer",
"javaType": "long", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": 26214400, "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configuration", "description": "Set up the partSize which is used in multipart
upload, the default size is 25M. Camel will onl [...]
+ "partSize": { "index": 35, "kind": "property", "displayName": "Part Size",
"group": "producer", "label": "producer", "required": false, "type": "integer",
"javaType": "long", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": 26214400, "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configuration", "description": "Set up the partSize which is used in multipart
upload, the default size is 25 MB. The minimum [...]
"restartingPolicy": { "index": 36, "kind": "property", "displayName":
"Restarting Policy", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.aws2.s3.stream.AWSS3RestartingPolicyEnum", "enum":
[ "override", "lastPart" ], "deprecated": false, "autowired": false, "secret":
false, "defaultValue": "override", "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configurat [...]
"storageClass": { "index": 37, "kind": "property", "displayName": "Storage
Class", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configuration", "description": "The storage class to set in the
com.amazonaws.services.s3.model.PutObjectRequest request." },
"streamingUploadMode": { "index": 38, "kind": "property", "displayName":
"Streaming Upload Mode", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration",
"configurationField": "configuration", "description": "When stream mode is
true, the upload to bucket will be done in streaming" },
@@ -168,7 +168,7 @@
"multiPartUpload": { "index": 38, "kind": "parameter", "displayName":
"Multi Part Upload", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration",
"configurationField": "configuration", "description": "If it is true, camel
will upload the file with multipart format. The part size [...]
"namingStrategy": { "index": 39, "kind": "parameter", "displayName":
"Naming Strategy", "group": "producer", "label": "producer", "required": false,
"type": "object", "javaType":
"org.apache.camel.component.aws2.s3.stream.AWSS3NamingStrategyEnum", "enum": [
"progressive", "random" ], "deprecated": false, "autowired": false, "secret":
false, "defaultValue": "progressive", "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configurati [...]
"operation": { "index": 40, "kind": "parameter", "displayName":
"Operation", "group": "producer", "label": "producer", "required": false,
"type": "object", "javaType":
"org.apache.camel.component.aws2.s3.AWS2S3Operations", "enum": [ "copyObject",
"listObjects", "deleteObject", "deleteBucket", "listBuckets", "getObject",
"getObjectRange", "createDownloadLink", "headBucket", "headObject" ],
"deprecated": false, "autowired": false, "secret": false, "configurationClass":
"org.apache.came [...]
- "partSize": { "index": 41, "kind": "parameter", "displayName": "Part
Size", "group": "producer", "label": "producer", "required": false, "type":
"integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 26214400, "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configuration", "description": "Set up the partSize which is used in multipart
upload, the default size is 25M. Camel will on [...]
+ "partSize": { "index": 41, "kind": "parameter", "displayName": "Part
Size", "group": "producer", "label": "producer", "required": false, "type":
"integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 26214400, "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configuration", "description": "Set up the partSize which is used in multipart
upload, the default size is 25 MB. The minimum [...]
"restartingPolicy": { "index": 42, "kind": "parameter", "displayName":
"Restarting Policy", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.aws2.s3.stream.AWSS3RestartingPolicyEnum", "enum":
[ "override", "lastPart" ], "deprecated": false, "autowired": false, "secret":
false, "defaultValue": "override", "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configura [...]
"storageClass": { "index": 43, "kind": "parameter", "displayName":
"Storage Class", "group": "producer", "label": "producer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField":
"configuration", "description": "The storage class to set in the
com.amazonaws.services.s3.model.PutObjectRequest request." },
"streamingUploadMode": { "index": 44, "kind": "parameter", "displayName":
"Streaming Upload Mode", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration",
"configurationField": "configuration", "description": "When stream mode is
true, the upload to bucket will be done in streaming" },
diff --git
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
index 243261095a9..f84228d05db 100644
---
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
+++
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
@@ -149,7 +149,9 @@ public class AWS2S3Configuration implements Cloneable {
}
/**
- * Set up the partSize which is used in multipart upload, the default size
is 25M.
+ * Set up the partSize which is used in multipart upload, the default size
is 25 MB.
+ *
+ * The minimum size in AWS is 5 MB.
*
* Camel will only do multipart uploads for files that are larger than the
part-size thresholds. Files that are
* smaller will be uploaded in a single operation.
diff --git
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java
index 68b7f85fc26..7e6180d577b 100644
---
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java
+++
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java
@@ -114,36 +114,51 @@ public class AWS2S3Producer extends DefaultProducer {
}
public void processMultiPart(final Exchange exchange) throws Exception {
- File filePayload;
- Object obj = exchange.getIn().getMandatoryBody();
+ Object obj = exchange.getIn().getBody();
+ InputStream inputStream;
+ File filePayload = null;
+
+ // the content-length may already be known
+ long contentLength =
exchange.getIn().getHeader(AWS2S3Constants.CONTENT_LENGTH, -1, Long.class);
+
// Need to check if the message body is WrappedFile
- if (obj instanceof WrappedFile) {
- obj = ((WrappedFile<?>) obj).getFile();
+ if (obj instanceof WrappedFile<?> wf) {
+ obj = wf.getFile();
}
- if (obj instanceof File) {
- filePayload = (File) obj;
+ if (obj instanceof File f) {
+ filePayload = f;
+ inputStream = new FileInputStream(f);
+ contentLength = f.length();
} else {
- throw new IllegalArgumentException("aws2-s3: MultiPart upload
requires a File input.");
+ // okay we use input stream
+ inputStream = exchange.getIn().getMandatoryBody(InputStream.class);
+ if (contentLength <= 0) {
+ contentLength =
AWS2S3Utils.determineLengthInputStream(inputStream);
+ if (contentLength == -1) {
+ // fallback to read into memory to calculate length
+ LOG.debug(
+ "The content length is not defined. It needs to be
determined by reading the data into memory");
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ IOHelper.copyAndCloseInput(inputStream, baos);
+ byte[] arr = baos.toByteArray();
+ contentLength = arr.length;
+ inputStream = new ByteArrayInputStream(arr);
+ }
+ }
}
Map<String, String> objectMetadata = determineMetadata(exchange);
- Long contentLength =
exchange.getIn().getHeader(AWS2S3Constants.CONTENT_LENGTH, Long.class);
- if (contentLength == null || contentLength == 0) {
- contentLength = filePayload.length();
- }
-
long partSize = getConfiguration().getPartSize();
if (contentLength == 0 || contentLength < partSize) {
// optimize to do a single op if content length is known and <
part size
- LOG.debug("File size < partSize. Uploading file in single
operation: {}", filePayload);
- processSingleOp(exchange);
+ LOG.debug("Payload size < partSize ({} > {}). Uploading payload in
single operation", contentLength, partSize);
+ doPutObject(exchange, objectMetadata, filePayload, inputStream,
contentLength);
return;
}
- LOG.debug("File size >= partSize. Uploading file using multi-part
operation: {}", filePayload);
-
- objectMetadata.put("Content-Length", contentLength.toString());
+ LOG.debug("Payload size >= partSize ({} > {}). Uploading payload using
multi-part operation", contentLength, partSize);
+ objectMetadata.put("Content-Length", "" + contentLength);
final String keyName = AWS2S3Utils.determineKey(exchange,
getConfiguration());
final String bucketName = AWS2S3Utils.determineBucketName(exchange,
getConfiguration());
@@ -198,36 +213,29 @@ public class AWS2S3Producer extends DefaultProducer {
List<CompletedPart> completedParts = new ArrayList<CompletedPart>();
CompleteMultipartUploadResponse uploadResult;
- long filePosition = 0;
+ long position = 0;
try {
- for (int part = 1; filePosition < contentLength; part++) {
- partSize = Math.min(partSize, contentLength - filePosition);
+ for (int part = 1; position < contentLength; part++) {
+ partSize = Math.min(partSize, contentLength - position);
UploadPartRequest uploadRequest =
UploadPartRequest.builder().bucket(getConfiguration().getBucketName())
.key(keyName).uploadId(initResponse.uploadId())
.partNumber(part).build();
- LOG.trace("Uploading part [{}] for {}", part, keyName);
- try (InputStream fileInputStream = new
FileInputStream(filePayload)) {
- if (filePosition > 0) {
- long skipped = fileInputStream.skip(filePosition);
- if (skipped == 0) {
- LOG.warn("While trying to upload the file {} file,
0 bytes were skipped", keyName);
- }
- }
+ LOG.debug("Uploading multi-part [{}] at position: [{}] for
{}", part, position, keyName);
- String etag = getEndpoint().getS3Client()
- .uploadPart(uploadRequest,
RequestBody.fromInputStream(fileInputStream, partSize)).eTag();
- CompletedPart partUpload =
CompletedPart.builder().partNumber(part).eTag(etag).build();
- completedParts.add(partUpload);
- filePosition += partSize;
- }
+ String etag = getEndpoint().getS3Client()
+ .uploadPart(uploadRequest,
RequestBody.fromInputStream(inputStream, partSize)).eTag();
+ CompletedPart partUpload =
CompletedPart.builder().partNumber(part).eTag(etag).build();
+ completedParts.add(partUpload);
+ position += partSize;
}
- CompletedMultipartUpload completeMultipartUpload =
CompletedMultipartUpload.builder().parts(completedParts).build();
- CompleteMultipartUploadRequest.Builder compRequestBuilder;
- compRequestBuilder =
CompleteMultipartUploadRequest.builder().multipartUpload(completeMultipartUpload)
-
.bucket(getConfiguration().getBucketName()).key(keyName).uploadId(initResponse.uploadId());
+ LOG.debug("Completing multi-part upload for {}", keyName);
+ CompletedMultipartUpload completeMultipartUpload =
CompletedMultipartUpload.builder().parts(completedParts).build();
+ CompleteMultipartUploadRequest.Builder compRequestBuilder =
CompleteMultipartUploadRequest.builder()
+
.multipartUpload(completeMultipartUpload).bucket(getConfiguration().getBucketName()).key(keyName)
+ .uploadId(initResponse.uploadId());
if (getConfiguration().isConditionalWritesEnabled()) {
compRequestBuilder.ifNoneMatch("*");
}
@@ -238,6 +246,8 @@ public class AWS2S3Producer extends DefaultProducer {
.abortMultipartUpload(AbortMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName())
.key(keyName).uploadId(initResponse.uploadId()).build());
throw e;
+ } finally {
+ IOHelper.close(inputStream);
}
Message message = getMessageForResponse(exchange);
@@ -248,16 +258,12 @@ public class AWS2S3Producer extends DefaultProducer {
message.setHeader(AWS2S3Constants.VERSION_ID,
uploadResult.versionId());
}
- if (getConfiguration().isDeleteAfterWrite()) {
+ if (filePayload != null && getConfiguration().isDeleteAfterWrite()) {
FileUtil.deleteFile(filePayload);
}
}
public void processSingleOp(final Exchange exchange) throws Exception {
- PutObjectRequest.Builder putObjectRequest = PutObjectRequest.builder();
-
- Map<String, String> objectMetadata = determineMetadata(exchange);
-
// the content-length may already be known
long contentLength =
exchange.getIn().getHeader(AWS2S3Constants.CONTENT_LENGTH, -1, Long.class);
@@ -292,7 +298,8 @@ public class AWS2S3Producer extends DefaultProducer {
}
}
- doPutObject(exchange, putObjectRequest, objectMetadata,
filePayload, inputStream, contentLength);
+ Map<String, String> objectMetadata = determineMetadata(exchange);
+ doPutObject(exchange, objectMetadata, filePayload, inputStream,
contentLength);
} finally {
IOHelper.close(inputStream);
}
@@ -303,10 +310,12 @@ public class AWS2S3Producer extends DefaultProducer {
}
private void doPutObject(
- Exchange exchange, PutObjectRequest.Builder putObjectRequest,
Map<String, String> objectMetadata,
+ Exchange exchange, Map<String, String> objectMetadata,
File file, InputStream inputStream, long contentLength) {
final String bucketName = AWS2S3Utils.determineBucketName(exchange,
getConfiguration());
final String keyName = AWS2S3Utils.determineKey(exchange,
getConfiguration());
+
+ PutObjectRequest.Builder putObjectRequest = PutObjectRequest.builder();
putObjectRequest.bucket(bucketName).key(keyName).metadata(objectMetadata);
String storageClass = AWS2S3Utils.determineStorageClass(exchange,
getConfiguration());
diff --git
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/utils/AWS2S3Utils.java
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/utils/AWS2S3Utils.java
index 3ff68d36065..37c008ca8f9 100644
---
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/utils/AWS2S3Utils.java
+++
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/utils/AWS2S3Utils.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.aws2.s3.utils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -95,6 +96,8 @@ public final class AWS2S3Utils {
if (len > 0) {
return len;
}
+ } else if (is instanceof FileInputStream fis) {
+ return fis.getChannel().size();
}
if (!is.markSupported()) {
diff --git
a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java
index d4401155514..cec920bfeed 100644
---
a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java
+++
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java
@@ -24,7 +24,6 @@ import java.util.List;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.aws2.s3.AWS2S3Constants;
import org.apache.camel.component.aws2.s3.AWS2S3Operations;
@@ -33,21 +32,19 @@ import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.s3.model.S3Object;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
public class S3StreamUploadMultipartIT extends Aws2S3Base {
- @EndpointInject
- private ProducerTemplate template;
-
@EndpointInject("mock:result")
private MockEndpoint result;
@Test
- public void sendIn() throws Exception {
+ public void sendFile() throws Exception {
result.expectedMessageCount(10);
for (int i = 0; i < 10; i++) {
- template.send("direct:stream1", new Processor() {
+ Exchange out = template.send("direct:stream1", new Processor() {
@Override
public void process(Exchange exchange) {
@@ -55,6 +52,7 @@ public class S3StreamUploadMultipartIT extends Aws2S3Base {
exchange.getIn().setBody(new
File("src/test/resources/empty.bin"));
}
});
+ assertFalse(out.isFailed());
}
MockEndpoint.assertIsSatisfied(context);
diff --git
a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3UploadFileMultipartIT.java
similarity index 73%
copy from
components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java
copy to
components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3UploadFileMultipartIT.java
index d4401155514..a7845fe8137 100644
---
a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java
+++
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3UploadFileMultipartIT.java
@@ -24,7 +24,6 @@ import java.util.List;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.aws2.s3.AWS2S3Constants;
import org.apache.camel.component.aws2.s3.AWS2S3Operations;
@@ -33,29 +32,26 @@ import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.s3.model.S3Object;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
-public class S3StreamUploadMultipartIT extends Aws2S3Base {
-
- @EndpointInject
- private ProducerTemplate template;
+public class S3UploadFileMultipartIT extends Aws2S3Base {
@EndpointInject("mock:result")
private MockEndpoint result;
@Test
- public void sendIn() throws Exception {
- result.expectedMessageCount(10);
+ public void sendFile() throws Exception {
+ result.expectedMessageCount(1);
- for (int i = 0; i < 10; i++) {
- template.send("direct:stream1", new Processor() {
+ Exchange out = template.send("direct:stream1", new Processor() {
- @Override
- public void process(Exchange exchange) {
- exchange.getIn().setHeader(AWS2S3Constants.KEY,
"empty.bin");
- exchange.getIn().setBody(new
File("src/test/resources/empty.bin"));
- }
- });
- }
+ @Override
+ public void process(Exchange exchange) {
+ exchange.getIn().setHeader(AWS2S3Constants.KEY,
"empty-big.bin");
+ exchange.getIn().setBody(new
File("src/test/resources/empty-big.bin"));
+ }
+ });
+ assertFalse(out.isFailed());
MockEndpoint.assertIsSatisfied(context);
@@ -67,11 +63,10 @@ public class S3StreamUploadMultipartIT extends Aws2S3Base {
}
});
- // file size: 5,242,880 bytes, 10 * (5 chunks of 1,000,000 + remainder
of 242,880)
List<S3Object> resp = ex.getMessage().getBody(List.class);
- assertEquals(60, resp.size());
+ assertEquals(1, resp.size());
- assertEquals(10 *
Files.size(Paths.get("src/test/resources/empty.bin")),
+ assertEquals(1 *
Files.size(Paths.get("src/test/resources/empty-big.bin")),
resp.stream().mapToLong(S3Object::size).sum());
}
@@ -82,7 +77,7 @@ public class S3StreamUploadMultipartIT extends Aws2S3Base {
public void configure() {
String awsEndpoint1
= String.format(
-
"aws2-s3://%s?autoCreateBucket=true&streamingUploadMode=true&keyName=fileTest.txt&batchMessageNumber=25&namingStrategy=random",
+
"aws2-s3://%s?autoCreateBucket=true&keyName=fileTest.txt&multiPartUpload=true&partSize=6000000",
name.get());
from("direct:stream1").to(awsEndpoint1).to("mock:result");
diff --git
a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3UploadInputStreamMultipartIT.java
similarity index 72%
copy from
components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java
copy to
components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3UploadInputStreamMultipartIT.java
index d4401155514..9f0127a6eb8 100644
---
a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java
+++
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3UploadInputStreamMultipartIT.java
@@ -16,7 +16,7 @@
*/
package org.apache.camel.component.aws2.s3.integration;
-import java.io.File;
+import java.io.FileInputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
@@ -24,38 +24,34 @@ import java.util.List;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.aws2.s3.AWS2S3Constants;
import org.apache.camel.component.aws2.s3.AWS2S3Operations;
import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.s3.model.S3Object;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class S3StreamUploadMultipartIT extends Aws2S3Base {
-
- @EndpointInject
- private ProducerTemplate template;
+public class S3UploadInputStreamMultipartIT extends Aws2S3Base {
@EndpointInject("mock:result")
private MockEndpoint result;
@Test
- public void sendIn() throws Exception {
- result.expectedMessageCount(10);
+ public void sendInputStream() throws Exception {
+ result.expectedMessageCount(1);
- for (int i = 0; i < 10; i++) {
- template.send("direct:stream1", new Processor() {
+ Exchange out = template.send("direct:stream1", new Processor() {
- @Override
- public void process(Exchange exchange) {
- exchange.getIn().setHeader(AWS2S3Constants.KEY,
"empty.bin");
- exchange.getIn().setBody(new
File("src/test/resources/empty.bin"));
- }
- });
- }
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(AWS2S3Constants.KEY,
"empty-big.bin");
+ exchange.getIn().setBody(new
FileInputStream("src/test/resources/empty-big.bin"));
+ }
+ });
+ Assertions.assertFalse(out.isFailed());
MockEndpoint.assertIsSatisfied(context);
@@ -67,11 +63,10 @@ public class S3StreamUploadMultipartIT extends Aws2S3Base {
}
});
- // file size: 5,242,880 bytes, 10 * (5 chunks of 1,000,000 + remainder
of 242,880)
List<S3Object> resp = ex.getMessage().getBody(List.class);
- assertEquals(60, resp.size());
+ assertEquals(1, resp.size());
- assertEquals(10 *
Files.size(Paths.get("src/test/resources/empty.bin")),
+ assertEquals(1 *
Files.size(Paths.get("src/test/resources/empty-big.bin")),
resp.stream().mapToLong(S3Object::size).sum());
}
@@ -82,7 +77,7 @@ public class S3StreamUploadMultipartIT extends Aws2S3Base {
public void configure() {
String awsEndpoint1
= String.format(
-
"aws2-s3://%s?autoCreateBucket=true&streamingUploadMode=true&keyName=fileTest.txt&batchMessageNumber=25&namingStrategy=random",
+
"aws2-s3://%s?autoCreateBucket=true&keyName=fileTest.txt&multiPartUpload=true&partSize=6000000",
name.get());
from("direct:stream1").to(awsEndpoint1).to("mock:result");
diff --git
a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3UploadInputStreamMultipartNoStreamCacheIT.java
similarity index 72%
copy from
components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java
copy to
components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3UploadInputStreamMultipartNoStreamCacheIT.java
index d4401155514..d4c17a7d6b2 100644
---
a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartIT.java
+++
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3UploadInputStreamMultipartNoStreamCacheIT.java
@@ -16,7 +16,7 @@
*/
package org.apache.camel.component.aws2.s3.integration;
-import java.io.File;
+import java.io.FileInputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
@@ -24,38 +24,34 @@ import java.util.List;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.aws2.s3.AWS2S3Constants;
import org.apache.camel.component.aws2.s3.AWS2S3Operations;
import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.s3.model.S3Object;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class S3StreamUploadMultipartIT extends Aws2S3Base {
-
- @EndpointInject
- private ProducerTemplate template;
+public class S3UploadInputStreamMultipartNoStreamCacheIT extends Aws2S3Base {
@EndpointInject("mock:result")
private MockEndpoint result;
@Test
- public void sendIn() throws Exception {
- result.expectedMessageCount(10);
+ public void sendInputStream() throws Exception {
+ result.expectedMessageCount(1);
- for (int i = 0; i < 10; i++) {
- template.send("direct:stream1", new Processor() {
+ Exchange out = template.send("direct:stream1", new Processor() {
- @Override
- public void process(Exchange exchange) {
- exchange.getIn().setHeader(AWS2S3Constants.KEY,
"empty.bin");
- exchange.getIn().setBody(new
File("src/test/resources/empty.bin"));
- }
- });
- }
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(AWS2S3Constants.KEY,
"empty-big.bin");
+ exchange.getIn().setBody(new
FileInputStream("src/test/resources/empty-big.bin"));
+ }
+ });
+ Assertions.assertFalse(out.isFailed());
MockEndpoint.assertIsSatisfied(context);
@@ -67,11 +63,10 @@ public class S3StreamUploadMultipartIT extends Aws2S3Base {
}
});
- // file size: 5,242,880 bytes, 10 * (5 chunks of 1,000,000 + remainder
of 242,880)
List<S3Object> resp = ex.getMessage().getBody(List.class);
- assertEquals(60, resp.size());
+ assertEquals(1, resp.size());
- assertEquals(10 *
Files.size(Paths.get("src/test/resources/empty.bin")),
+ assertEquals(1 *
Files.size(Paths.get("src/test/resources/empty-big.bin")),
resp.stream().mapToLong(S3Object::size).sum());
}
@@ -80,9 +75,11 @@ public class S3StreamUploadMultipartIT extends Aws2S3Base {
return new RouteBuilder() {
@Override
public void configure() {
+ context.setStreamCaching(false);
+
String awsEndpoint1
= String.format(
-
"aws2-s3://%s?autoCreateBucket=true&streamingUploadMode=true&keyName=fileTest.txt&batchMessageNumber=25&namingStrategy=random",
+
"aws2-s3://%s?autoCreateBucket=true&keyName=fileTest.txt&multiPartUpload=true&partSize=6000000",
name.get());
from("direct:stream1").to(awsEndpoint1).to("mock:result");
diff --git
a/components/camel-aws/camel-aws2-s3/src/test/resources/empty-big.bin
b/components/camel-aws/camel-aws2-s3/src/test/resources/empty-big.bin
new file mode 100644
index 00000000000..6c5d4031e03
Binary files /dev/null and
b/components/camel-aws/camel-aws2-s3/src/test/resources/empty-big.bin differ
diff --git
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2S3ComponentBuilderFactory.java
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2S3ComponentBuilderFactory.java
index f582680793e..b778dcb9638 100644
---
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2S3ComponentBuilderFactory.java
+++
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2S3ComponentBuilderFactory.java
@@ -695,9 +695,10 @@ public interface Aws2S3ComponentBuilderFactory {
/**
* Set up the partSize which is used in multipart upload, the default
- * size is 25M. Camel will only do multipart uploads for files that are
- * larger than the part-size thresholds. Files that are smaller will be
- * uploaded in a single operation.
+ * size is 25 MB. The minimum size in AWS is 5 MB. Camel will only do
+ * multipart uploads for files that are larger than the part-size
+ * thresholds. Files that are smaller will be uploaded in a single
+ * operation.
*
* The option is a: <code>long</code> type.
*
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2S3EndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2S3EndpointBuilderFactory.java
index 8f1fc44bea5..0f2bcccc41e 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2S3EndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2S3EndpointBuilderFactory.java
@@ -2220,9 +2220,10 @@ public interface AWS2S3EndpointBuilderFactory {
}
/**
* Set up the partSize which is used in multipart upload, the default
- * size is 25M. Camel will only do multipart uploads for files that are
- * larger than the part-size thresholds. Files that are smaller will be
- * uploaded in a single operation.
+ * size is 25 MB. The minimum size in AWS is 5 MB. Camel will only do
+ * multipart uploads for files that are larger than the part-size
+ * thresholds. Files that are smaller will be uploaded in a single
+ * operation.
*
* The option is a: <code>long</code> type.
*
@@ -2238,9 +2239,10 @@ public interface AWS2S3EndpointBuilderFactory {
}
/**
* Set up the partSize which is used in multipart upload, the default
- * size is 25M. Camel will only do multipart uploads for files that are
- * larger than the part-size thresholds. Files that are smaller will be
- * uploaded in a single operation.
+ * size is 25 MB. The minimum size in AWS is 5 MB. Camel will only do
+ * multipart uploads for files that are larger than the part-size
+ * thresholds. Files that are smaller will be uploaded in a single
+ * operation.
*
* The option will be converted to a <code>long</code> type.
*