This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c1a21ad078 NIFI-12642 Added support for FileResourceService in 
PutS3Object
c1a21ad078 is described below

commit c1a21ad078d7b9c3b0422c48b9bb1f9faa3c0b6f
Author: Balázs Gerner <balazsger...@gmail.com>
AuthorDate: Mon Jan 22 13:11:18 2024 +0100

    NIFI-12642 Added support for FileResourceService in PutS3Object
    
    This closes #8295.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../nifi-aws-bundle/nifi-aws-processors/pom.xml    |  15 +
 .../apache/nifi/processors/aws/s3/PutS3Object.java | 575 +++++++++++----------
 .../nifi/processors/aws/s3/AbstractS3IT.java       |  83 +--
 .../nifi/processors/aws/s3/ITPutS3Object.java      |  65 +++
 .../nifi/processors/aws/s3/TestPutS3Object.java    |  51 +-
 5 files changed, 462 insertions(+), 327 deletions(-)

diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
index c0748af598..44ac6da9f3 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
@@ -38,6 +38,15 @@
             <artifactId>nifi-listed-entity</artifactId>
             <version>2.0.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-resource-transfer</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-file-resource-service-api</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-aws-abstract-processors</artifactId>
@@ -141,6 +150,12 @@
             <version>2.0.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-file-resource-service</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 3247f9a6a8..26c759417a 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -53,14 +53,15 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.fileresource.service.api.FileResource;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.transfer.ResourceTransferSource;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -78,6 +79,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.TreeSet;
@@ -87,6 +89,10 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
+import static 
org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
+import static 
org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
+import static 
org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource;
+
 @SupportsBatching
 @SeeAlso({FetchS3Object.class, DeleteS3Object.class, ListS3.class})
 @InputRequirement(Requirement.INPUT_REQUIRED)
@@ -261,6 +267,8 @@ public class PutS3Object extends AbstractS3Processor {
             KEY,
             S3_REGION,
             AWS_CREDENTIALS_PROVIDER_SERVICE,
+            RESOURCE_TRANSFER_SOURCE,
+            FILE_RESOURCE_SERVICE,
             STORAGE_CLASS,
             ENCRYPTION_SERVICE,
             SERVER_SIDE_ENCRYPTION,
@@ -501,6 +509,8 @@ public class PutS3Object extends AbstractS3Processor {
         final FlowFile ff = flowFile;
         final Map<String, String> attributes = new HashMap<>();
         final String ffFilename = 
ff.getAttributes().get(CoreAttributes.FILENAME.key());
+        final ResourceTransferSource resourceTransferSource = 
context.getProperty(RESOURCE_TRANSFER_SOURCE).asAllowableValue(ResourceTransferSource.class);
+
         attributes.put(S3_BUCKET_KEY, bucket);
         attributes.put(S3_OBJECT_KEY, key);
 
@@ -519,329 +529,332 @@ public class PutS3Object extends AbstractS3Processor {
          */
         try {
             final FlowFile flowFileCopy = flowFile;
-            session.read(flowFile, new InputStreamCallback() {
-                @Override
-                public void process(final InputStream in) throws IOException {
-                    final ObjectMetadata objectMetadata = new ObjectMetadata();
-                    objectMetadata.setContentLength(ff.getSize());
-
-                    final String contentType = 
context.getProperty(CONTENT_TYPE)
-                            .evaluateAttributeExpressions(ff).getValue();
-                    if (contentType != null) {
-                        objectMetadata.setContentType(contentType);
-                        attributes.put(S3_CONTENT_TYPE, contentType);
-                    }
+            Optional<FileResource> optFileResource = 
getFileResource(resourceTransferSource, context, flowFile.getAttributes());
+            try (InputStream in = optFileResource
+                    .map(FileResource::getInputStream)
+                    .orElseGet(() -> session.read(flowFileCopy))) {
+                final ObjectMetadata objectMetadata = new ObjectMetadata();
+                
objectMetadata.setContentLength(optFileResource.map(FileResource::getSize).orElseGet(ff::getSize));
+
+                final String contentType = context.getProperty(CONTENT_TYPE)
+                        .evaluateAttributeExpressions(ff).getValue();
+                if (contentType != null) {
+                    objectMetadata.setContentType(contentType);
+                    attributes.put(S3_CONTENT_TYPE, contentType);
+                }
+
+                final String cacheControl = context.getProperty(CACHE_CONTROL)
+                        .evaluateAttributeExpressions(ff).getValue();
+                if (cacheControl != null) {
+                    objectMetadata.setCacheControl(cacheControl);
+                    attributes.put(S3_CACHE_CONTROL, cacheControl);
+                }
 
-                    final String cacheControl = 
context.getProperty(CACHE_CONTROL)
-                            .evaluateAttributeExpressions(ff).getValue();
-                    if (cacheControl != null) {
-                        objectMetadata.setCacheControl(cacheControl);
-                        attributes.put(S3_CACHE_CONTROL, cacheControl);
+                final String contentDisposition = 
context.getProperty(CONTENT_DISPOSITION).getValue();
+                String fileName = 
URLEncoder.encode(ff.getAttribute(CoreAttributes.FILENAME.key()), 
StandardCharsets.UTF_8);
+                if (contentDisposition != null && 
contentDisposition.equals(CONTENT_DISPOSITION_INLINE)) {
+                    
objectMetadata.setContentDisposition(CONTENT_DISPOSITION_INLINE);
+                    attributes.put(S3_CONTENT_DISPOSITION, 
CONTENT_DISPOSITION_INLINE);
+                } else if (contentDisposition != null && 
contentDisposition.equals(CONTENT_DISPOSITION_ATTACHMENT)) {
+                    String contentDispositionValue = 
CONTENT_DISPOSITION_ATTACHMENT + "; filename=\"" + fileName + "\"";
+                    
objectMetadata.setContentDisposition(contentDispositionValue);
+                    attributes.put(S3_CONTENT_DISPOSITION, 
contentDispositionValue);
+                } else {
+                    objectMetadata.setContentDisposition(fileName);
+                }
+
+                final String expirationRule = 
context.getProperty(EXPIRATION_RULE_ID)
+                        .evaluateAttributeExpressions(ff).getValue();
+                if (expirationRule != null) {
+                    objectMetadata.setExpirationTimeRuleId(expirationRule);
+                }
+
+                final Map<String, String> userMetadata = new HashMap<>();
+                for (final Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
+                    if (entry.getKey().isDynamic()) {
+                        final String value = context.getProperty(
+                                
entry.getKey()).evaluateAttributeExpressions(ff).getValue();
+                        userMetadata.put(entry.getKey().getName(), value);
                     }
+                }
 
-                    final String contentDisposition = 
context.getProperty(CONTENT_DISPOSITION).getValue();
-                    String fileName = 
URLEncoder.encode(ff.getAttribute(CoreAttributes.FILENAME.key()), 
StandardCharsets.UTF_8);
-                    if (contentDisposition != null && 
contentDisposition.equals(CONTENT_DISPOSITION_INLINE)) {
-                        
objectMetadata.setContentDisposition(CONTENT_DISPOSITION_INLINE);
-                        attributes.put(S3_CONTENT_DISPOSITION, 
CONTENT_DISPOSITION_INLINE);
-                    } else if (contentDisposition != null && 
contentDisposition.equals(CONTENT_DISPOSITION_ATTACHMENT)) {
-                        String contentDispositionValue = 
CONTENT_DISPOSITION_ATTACHMENT + "; filename=\"" + fileName + "\"";
-                        
objectMetadata.setContentDisposition(contentDispositionValue);
-                        attributes.put(S3_CONTENT_DISPOSITION, 
contentDispositionValue);
-                    } else {
-                        objectMetadata.setContentDisposition(fileName);
+                final String serverSideEncryption = 
context.getProperty(SERVER_SIDE_ENCRYPTION).getValue();
+                AmazonS3EncryptionService encryptionService = null;
+
+                if (!serverSideEncryption.equals(NO_SERVER_SIDE_ENCRYPTION)) {
+                    objectMetadata.setSSEAlgorithm(serverSideEncryption);
+                    attributes.put(S3_SSE_ALGORITHM, serverSideEncryption);
+                } else {
+                    encryptionService = 
context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
+                }
+
+                if (!userMetadata.isEmpty()) {
+                    objectMetadata.setUserMetadata(userMetadata);
+                }
+
+                if (ff.getSize() <= multipartThreshold) {
+                    //----------------------------------------
+                    // single part upload
+                    //----------------------------------------
+                    final PutObjectRequest request = new 
PutObjectRequest(bucket, key, in, objectMetadata);
+                    if (encryptionService != null) {
+                        encryptionService.configurePutObjectRequest(request, 
objectMetadata);
+                        attributes.put(S3_ENCRYPTION_STRATEGY, 
encryptionService.getStrategyName());
                     }
 
-                    final String expirationRule = 
context.getProperty(EXPIRATION_RULE_ID)
-                            .evaluateAttributeExpressions(ff).getValue();
-                    if (expirationRule != null) {
-                        objectMetadata.setExpirationTimeRuleId(expirationRule);
+                    
request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+                    final AccessControlList acl = createACL(context, ff);
+                    if (acl != null) {
+                        request.setAccessControlList(acl);
                     }
 
-                    final Map<String, String> userMetadata = new HashMap<>();
-                    for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
-                        if (entry.getKey().isDynamic()) {
-                            final String value = context.getProperty(
-                                    
entry.getKey()).evaluateAttributeExpressions(ff).getValue();
-                            userMetadata.put(entry.getKey().getName(), value);
-                        }
+                    final CannedAccessControlList cannedAcl = 
createCannedACL(context, ff);
+                    if (cannedAcl != null) {
+                        request.withCannedAcl(cannedAcl);
                     }
 
-                    final String serverSideEncryption = 
context.getProperty(SERVER_SIDE_ENCRYPTION).getValue();
-                    AmazonS3EncryptionService encryptionService = null;
+                    if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
+                        request.setTagging(new 
ObjectTagging(getObjectTags(context, flowFileCopy)));
+                    }
 
-                    if 
(!serverSideEncryption.equals(NO_SERVER_SIDE_ENCRYPTION)) {
-                        objectMetadata.setSSEAlgorithm(serverSideEncryption);
-                        attributes.put(S3_SSE_ALGORITHM, serverSideEncryption);
-                    } else {
-                        encryptionService = 
context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
+                    try {
+                        final PutObjectResult result = s3.putObject(request);
+                        if (result.getVersionId() != null) {
+                            attributes.put(S3_VERSION_ATTR_KEY, 
result.getVersionId());
+                        }
+                        if (result.getETag() != null) {
+                            attributes.put(S3_ETAG_ATTR_KEY, result.getETag());
+                        }
+                        if (result.getExpirationTime() != null) {
+                            attributes.put(S3_EXPIRATION_ATTR_KEY, 
result.getExpirationTime().toString());
+                        }
+                        if (result.getMetadata().getStorageClass() != null) {
+                            attributes.put(S3_STORAGECLASS_ATTR_KEY, 
result.getMetadata().getStorageClass());
+                        } else {
+                            attributes.put(S3_STORAGECLASS_ATTR_KEY, 
StorageClass.Standard.toString());
+                        }
+                        if (userMetadata.size() > 0) {
+                            StringBuilder userMetaBldr = new StringBuilder();
+                            for (String userKey : userMetadata.keySet()) {
+                                
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
+                            }
+                            attributes.put(S3_USERMETA_ATTR_KEY, 
userMetaBldr.toString());
+                        }
+                        attributes.put(S3_API_METHOD_ATTR_KEY, 
S3_API_METHOD_PUTOBJECT);
+                    } catch (AmazonClientException e) {
+                        getLogger().info("Failure completing upload 
flowfile={} bucket={} key={} reason={}",
+                                ffFilename, bucket, key, e.getMessage());
+                        throw (e);
                     }
+                } else {
+                    //----------------------------------------
+                    // multipart upload
+                    //----------------------------------------
 
-                    if (!userMetadata.isEmpty()) {
-                        objectMetadata.setUserMetadata(userMetadata);
+                    // load or create persistent state
+                    
//------------------------------------------------------------
+                    MultipartState currentState;
+                    try {
+                        currentState = getLocalStateIfInS3(s3, bucket, 
cacheKey);
+                        if (currentState != null) {
+                            if (currentState.getPartETags().size() > 0) {
+                                final PartETag lastETag = 
currentState.getPartETags().get(
+                                        currentState.getPartETags().size() - 
1);
+                                getLogger().info("Resuming upload for 
flowfile='{}' bucket='{}' key='{}' " +
+                                                "uploadID='{}' 
filePosition='{}' partSize='{}' storageClass='{}' " +
+                                                "contentLength='{}' 
partsLoaded={} lastPart={}/{}",
+                                        ffFilename, bucket, key, 
currentState.getUploadId(),
+                                        currentState.getFilePosition(), 
currentState.getPartSize(),
+                                        
currentState.getStorageClass().toString(),
+                                        currentState.getContentLength(),
+                                        currentState.getPartETags().size(),
+                                        
Integer.toString(lastETag.getPartNumber()),
+                                        lastETag.getETag());
+                            } else {
+                                getLogger().info("Resuming upload for 
flowfile='{}' bucket='{}' key='{}' " +
+                                                "uploadID='{}' 
filePosition='{}' partSize='{}' storageClass='{}' " +
+                                                "contentLength='{}' no 
partsLoaded",
+                                        ffFilename, bucket, key, 
currentState.getUploadId(),
+                                        currentState.getFilePosition(), 
currentState.getPartSize(),
+                                        
currentState.getStorageClass().toString(),
+                                        currentState.getContentLength());
+                            }
+                        } else {
+                            currentState = new MultipartState();
+                            currentState.setPartSize(multipartPartSize);
+                            currentState.setStorageClass(
+                                    
StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+                            currentState.setContentLength(ff.getSize());
+                            persistLocalState(cacheKey, currentState);
+                            getLogger().info("Starting new upload for 
flowfile='{}' bucket='{}' key='{}'",
+                                    ffFilename, bucket, key);
+                        }
+                    } catch (IOException e) {
+                        getLogger().error("IOException initiating cache state 
while processing flow files: " +
+                                e.getMessage());
+                        throw (e);
                     }
 
-                    if (ff.getSize() <= multipartThreshold) {
-                        //----------------------------------------
-                        // single part upload
-                        //----------------------------------------
-                        final PutObjectRequest request = new 
PutObjectRequest(bucket, key, in, objectMetadata);
+                    // initiate multipart upload or find position in file
+                    
//------------------------------------------------------------
+                    if (currentState.getUploadId().isEmpty()) {
+                        final InitiateMultipartUploadRequest initiateRequest = 
new InitiateMultipartUploadRequest(bucket, key, objectMetadata);
                         if (encryptionService != null) {
-                            
encryptionService.configurePutObjectRequest(request, objectMetadata);
+                            
encryptionService.configureInitiateMultipartUploadRequest(initiateRequest, 
objectMetadata);
                             attributes.put(S3_ENCRYPTION_STRATEGY, 
encryptionService.getStrategyName());
                         }
+                        
initiateRequest.setStorageClass(currentState.getStorageClass());
 
-                        
request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
                         final AccessControlList acl = createACL(context, ff);
                         if (acl != null) {
-                            request.setAccessControlList(acl);
+                            initiateRequest.setAccessControlList(acl);
                         }
-
                         final CannedAccessControlList cannedAcl = 
createCannedACL(context, ff);
                         if (cannedAcl != null) {
-                            request.withCannedAcl(cannedAcl);
+                            initiateRequest.withCannedACL(cannedAcl);
                         }
 
                         if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
-                            request.setTagging(new 
ObjectTagging(getObjectTags(context, flowFileCopy)));
+                            initiateRequest.setTagging(new 
ObjectTagging(getObjectTags(context, flowFileCopy)));
                         }
 
                         try {
-                            final PutObjectResult result = 
s3.putObject(request);
-                            if (result.getVersionId() != null) {
-                                attributes.put(S3_VERSION_ATTR_KEY, 
result.getVersionId());
-                            }
-                            if (result.getETag() != null) {
-                                attributes.put(S3_ETAG_ATTR_KEY, 
result.getETag());
-                            }
-                            if (result.getExpirationTime() != null) {
-                                attributes.put(S3_EXPIRATION_ATTR_KEY, 
result.getExpirationTime().toString());
-                            }
-                            if (result.getMetadata().getStorageClass() != 
null) {
-                                attributes.put(S3_STORAGECLASS_ATTR_KEY, 
result.getMetadata().getStorageClass());
-                            } else {
-                                attributes.put(S3_STORAGECLASS_ATTR_KEY, 
StorageClass.Standard.toString());
+                            final InitiateMultipartUploadResult initiateResult 
=
+                                    
s3.initiateMultipartUpload(initiateRequest);
+                            
currentState.setUploadId(initiateResult.getUploadId());
+                            currentState.getPartETags().clear();
+                            try {
+                                persistLocalState(cacheKey, currentState);
+                            } catch (Exception e) {
+                                getLogger().info("Exception saving cache state 
while processing flow file: " +
+                                        e.getMessage());
+                                throw (new ProcessException("Exception saving 
cache state", e));
                             }
-                            if (userMetadata.size() > 0) {
-                                StringBuilder userMetaBldr = new 
StringBuilder();
-                                for (String userKey : userMetadata.keySet()) {
-                                    
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
-                                }
-                                attributes.put(S3_USERMETA_ATTR_KEY, 
userMetaBldr.toString());
+                            getLogger().info("Success initiating upload 
flowfile={} available={} position={} " +
+                                            "length={} bucket={} key={} 
uploadId={}",
+                                    new Object[]{ffFilename, in.available(), 
currentState.getFilePosition(),
+                                            currentState.getContentLength(), 
bucket, key,
+                                            currentState.getUploadId()});
+                            if (initiateResult.getUploadId() != null) {
+                                attributes.put(S3_UPLOAD_ID_ATTR_KEY, 
initiateResult.getUploadId());
                             }
-                            attributes.put(S3_API_METHOD_ATTR_KEY, 
S3_API_METHOD_PUTOBJECT);
                         } catch (AmazonClientException e) {
-                            getLogger().info("Failure completing upload 
flowfile={} bucket={} key={} reason={}",
-                                    ffFilename, bucket, key, e.getMessage());
+                            getLogger().info("Failure initiating upload 
flowfile={} bucket={} key={} reason={}",
+                                    new Object[]{ffFilename, bucket, key, 
e.getMessage()});
                             throw (e);
                         }
                     } else {
-                        //----------------------------------------
-                        // multipart upload
-                        //----------------------------------------
-
-                        // load or create persistent state
-                        
//------------------------------------------------------------
-                        MultipartState currentState;
-                        try {
-                            currentState = getLocalStateIfInS3(s3, bucket, 
cacheKey);
-                            if (currentState != null) {
-                                if (currentState.getPartETags().size() > 0) {
-                                    final PartETag lastETag = 
currentState.getPartETags().get(
-                                            currentState.getPartETags().size() 
- 1);
-                                    getLogger().info("Resuming upload for 
flowfile='{}' bucket='{}' key='{}' " +
-                                                    "uploadID='{}' 
filePosition='{}' partSize='{}' storageClass='{}' " +
-                                                    "contentLength='{}' 
partsLoaded={} lastPart={}/{}",
-                                            ffFilename, bucket, key, 
currentState.getUploadId(),
-                                            currentState.getFilePosition(), 
currentState.getPartSize(),
-                                            
currentState.getStorageClass().toString(),
-                                            currentState.getContentLength(),
-                                            currentState.getPartETags().size(),
-                                            
Integer.toString(lastETag.getPartNumber()),
-                                            lastETag.getETag());
-                                } else {
-                                    getLogger().info("Resuming upload for 
flowfile='{}' bucket='{}' key='{}' " +
-                                                    "uploadID='{}' 
filePosition='{}' partSize='{}' storageClass='{}' " +
-                                                    "contentLength='{}' no 
partsLoaded",
-                                            ffFilename, bucket, key, 
currentState.getUploadId(),
-                                            currentState.getFilePosition(), 
currentState.getPartSize(),
-                                            
currentState.getStorageClass().toString(),
-                                            currentState.getContentLength());
-                                }
-                            } else {
-                                currentState = new MultipartState();
-                                currentState.setPartSize(multipartPartSize);
-                                currentState.setStorageClass(
-                                        
StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
-                                currentState.setContentLength(ff.getSize());
-                                persistLocalState(cacheKey, currentState);
-                                getLogger().info("Starting new upload for 
flowfile='{}' bucket='{}' key='{}'",
-                                        ffFilename, bucket, key);
-                            }
-                        } catch (IOException e) {
-                            getLogger().error("IOException initiating cache 
state while processing flow files: " +
-                                    e.getMessage());
-                            throw (e);
-                        }
-
-                        // initiate multipart upload or find position in file
-                        
//------------------------------------------------------------
-                        if (currentState.getUploadId().isEmpty()) {
-                            final InitiateMultipartUploadRequest 
initiateRequest = new InitiateMultipartUploadRequest(bucket, key, 
objectMetadata);
-                            if (encryptionService != null) {
-                                
encryptionService.configureInitiateMultipartUploadRequest(initiateRequest, 
objectMetadata);
-                                attributes.put(S3_ENCRYPTION_STRATEGY, 
encryptionService.getStrategyName());
-                            }
-                            
initiateRequest.setStorageClass(currentState.getStorageClass());
-
-                            final AccessControlList acl = createACL(context, 
ff);
-                            if (acl != null) {
-                                initiateRequest.setAccessControlList(acl);
-                            }
-                            final CannedAccessControlList cannedAcl = 
createCannedACL(context, ff);
-                            if (cannedAcl != null) {
-                                initiateRequest.withCannedACL(cannedAcl);
-                            }
-
-                            if 
(context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
-                                initiateRequest.setTagging(new 
ObjectTagging(getObjectTags(context, flowFileCopy)));
-                            }
-
+                        if (currentState.getFilePosition() > 0) {
                             try {
-                                final InitiateMultipartUploadResult 
initiateResult =
-                                        
s3.initiateMultipartUpload(initiateRequest);
-                                
currentState.setUploadId(initiateResult.getUploadId());
-                                currentState.getPartETags().clear();
-                                try {
-                                    persistLocalState(cacheKey, currentState);
-                                } catch (Exception e) {
-                                    getLogger().info("Exception saving cache 
state while processing flow file: " +
-                                            e.getMessage());
-                                    throw (new ProcessException("Exception 
saving cache state", e));
-                                }
-                                getLogger().info("Success initiating upload 
flowfile={} available={} position={} " +
-                                                "length={} bucket={} key={} 
uploadId={}",
-                                        new Object[]{ffFilename, 
in.available(), currentState.getFilePosition(),
-                                                
currentState.getContentLength(), bucket, key,
-                                                currentState.getUploadId()});
-                                if (initiateResult.getUploadId() != null) {
-                                    attributes.put(S3_UPLOAD_ID_ATTR_KEY, 
initiateResult.getUploadId());
-                                }
-                            } catch (AmazonClientException e) {
-                                getLogger().info("Failure initiating upload 
flowfile={} bucket={} key={} reason={}",
-                                        new Object[]{ffFilename, bucket, key, 
e.getMessage()});
-                                throw (e);
-                            }
-                        } else {
-                            if (currentState.getFilePosition() > 0) {
-                                try {
-                                    final long skipped = 
in.skip(currentState.getFilePosition());
-                                    if (skipped != 
currentState.getFilePosition()) {
-                                        getLogger().info("Failure skipping to 
resume upload flowfile={} " +
-                                                        "bucket={} key={} 
position={} skipped={}",
-                                                new Object[]{ffFilename, 
bucket, key,
-                                                        
currentState.getFilePosition(), skipped});
-                                    }
-                                } catch (Exception e) {
-                                    getLogger().info("Failure skipping to 
resume upload flowfile={} bucket={} " +
-                                                    "key={} position={} 
reason={}",
-                                            new Object[]{ffFilename, bucket, 
key, currentState.getFilePosition(),
-                                                    e.getMessage()});
-                                    throw (new ProcessException(e));
+                                final long skipped = 
in.skip(currentState.getFilePosition());
+                                if (skipped != currentState.getFilePosition()) 
{
+                                    getLogger().info("Failure skipping to 
resume upload flowfile={} " +
+                                                    "bucket={} key={} 
position={} skipped={}",
+                                            new Object[]{ffFilename, bucket, 
key,
+                                                    
currentState.getFilePosition(), skipped});
                                 }
+                            } catch (Exception e) {
+                                getLogger().info("Failure skipping to resume 
upload flowfile={} bucket={} " +
+                                                "key={} position={} reason={}",
+                                        new Object[]{ffFilename, bucket, key, 
currentState.getFilePosition(),
+                                                e.getMessage()});
+                                throw (new ProcessException(e));
                             }
                         }
+                    }
 
-                        // upload parts
-                        
//------------------------------------------------------------
-                        long thisPartSize;
-                        boolean isLastPart;
-                        for (int part = currentState.getPartETags().size() + 1;
-                             currentState.getFilePosition() < 
currentState.getContentLength(); part++) {
-                            if (!PutS3Object.this.isScheduled()) {
-                                throw new 
IOException(S3_PROCESS_UNSCHEDULED_MESSAGE + " flowfile=" + ffFilename +
-                                        " part=" + part + " uploadId=" + 
currentState.getUploadId());
-                            }
-                            thisPartSize = Math.min(currentState.getPartSize(),
-                                    (currentState.getContentLength() - 
currentState.getFilePosition()));
-                            isLastPart = currentState.getContentLength() == 
currentState.getFilePosition() + thisPartSize;
-                            UploadPartRequest uploadRequest = new 
UploadPartRequest()
-                                    .withBucketName(bucket)
-                                    .withKey(key)
-                                    .withUploadId(currentState.getUploadId())
-                                    .withInputStream(in)
-                                    .withPartNumber(part)
-                                    .withPartSize(thisPartSize)
-                                    .withLastPart(isLastPart);
-                            if (encryptionService != null) {
-                                
encryptionService.configureUploadPartRequest(uploadRequest, objectMetadata);
+                    // upload parts
+                    
//------------------------------------------------------------
+                    long thisPartSize;
+                    boolean isLastPart;
+                    for (int part = currentState.getPartETags().size() + 1;
+                         currentState.getFilePosition() < 
currentState.getContentLength(); part++) {
+                        if (!PutS3Object.this.isScheduled()) {
+                            throw new 
IOException(S3_PROCESS_UNSCHEDULED_MESSAGE + " flowfile=" + ffFilename +
+                                    " part=" + part + " uploadId=" + 
currentState.getUploadId());
+                        }
+                        thisPartSize = Math.min(currentState.getPartSize(),
+                                (currentState.getContentLength() - 
currentState.getFilePosition()));
+                        isLastPart = currentState.getContentLength() == 
currentState.getFilePosition() + thisPartSize;
+                        UploadPartRequest uploadRequest = new 
UploadPartRequest()
+                                .withBucketName(bucket)
+                                .withKey(key)
+                                .withUploadId(currentState.getUploadId())
+                                .withInputStream(in)
+                                .withPartNumber(part)
+                                .withPartSize(thisPartSize)
+                                .withLastPart(isLastPart);
+                        if (encryptionService != null) {
+                            
encryptionService.configureUploadPartRequest(uploadRequest, objectMetadata);
+                        }
+                        try {
+                            UploadPartResult uploadPartResult = 
s3.uploadPart(uploadRequest);
+                            
currentState.addPartETag(uploadPartResult.getPartETag());
+                            
currentState.setFilePosition(currentState.getFilePosition() + thisPartSize);
+                            try {
+                                persistLocalState(cacheKey, currentState);
+                            } catch (Exception e) {
+                                getLogger().info("Exception saving cache state 
processing flow file: " +
+                                        e.getMessage());
                             }
+                            int available = 0;
                             try {
-                                UploadPartResult uploadPartResult = 
s3.uploadPart(uploadRequest);
-                                
currentState.addPartETag(uploadPartResult.getPartETag());
-                                
currentState.setFilePosition(currentState.getFilePosition() + thisPartSize);
-                                try {
-                                    persistLocalState(cacheKey, currentState);
-                                } catch (Exception e) {
-                                    getLogger().info("Exception saving cache 
state processing flow file: " +
-                                            e.getMessage());
-                                }
-                                int available = 0;
-                                try {
-                                    available = in.available();
-                                } catch (IOException e) {
-                                    // in case of the last part, the stream is 
already closed
-                                }
-                                getLogger().info("Success uploading part 
flowfile={} part={} available={} " +
-                                        "etag={} uploadId={}", new 
Object[]{ffFilename, part, available,
-                                        uploadPartResult.getETag(), 
currentState.getUploadId()});
-                            } catch (AmazonClientException e) {
-                                getLogger().info("Failure uploading part 
flowfile={} part={} bucket={} key={} " +
-                                        "reason={}", new Object[]{ffFilename, 
part, bucket, key, e.getMessage()});
-                                throw (e);
+                                available = in.available();
+                            } catch (IOException e) {
+                                // in case of the last part, the stream is 
already closed
                             }
+                            getLogger().info("Success uploading part 
flowfile={} part={} available={} " +
+                                    "etag={} uploadId={}", new 
Object[]{ffFilename, part, available,
+                                    uploadPartResult.getETag(), 
currentState.getUploadId()});
+                        } catch (AmazonClientException e) {
+                            getLogger().info("Failure uploading part 
flowfile={} part={} bucket={} key={} " +
+                                    "reason={}", new Object[]{ffFilename, 
part, bucket, key, e.getMessage()});
+                            throw (e);
                         }
+                    }
 
-                        // complete multipart upload
-                        
//------------------------------------------------------------
-                        CompleteMultipartUploadRequest completeRequest = new 
CompleteMultipartUploadRequest(
-                                bucket, key, currentState.getUploadId(), 
currentState.getPartETags());
+                    // complete multipart upload
+                    
//------------------------------------------------------------
+                    CompleteMultipartUploadRequest completeRequest = new 
CompleteMultipartUploadRequest(
+                            bucket, key, currentState.getUploadId(), 
currentState.getPartETags());
 
-                        // No call to an encryption service is needed for a 
CompleteMultipartUploadRequest.
-                        try {
-                            CompleteMultipartUploadResult completeResult =
-                                    
s3.completeMultipartUpload(completeRequest);
-                            getLogger().info("Success completing upload 
flowfile={} etag={} uploadId={}",
-                                    new Object[]{ffFilename, 
completeResult.getETag(), currentState.getUploadId()});
-                            if (completeResult.getVersionId() != null) {
-                                attributes.put(S3_VERSION_ATTR_KEY, 
completeResult.getVersionId());
-                            }
-                            if (completeResult.getETag() != null) {
-                                attributes.put(S3_ETAG_ATTR_KEY, 
completeResult.getETag());
-                            }
-                            if (completeResult.getExpirationTime() != null) {
-                                attributes.put(S3_EXPIRATION_ATTR_KEY,
-                                        
completeResult.getExpirationTime().toString());
-                            }
-                            if (currentState.getStorageClass() != null) {
-                                attributes.put(S3_STORAGECLASS_ATTR_KEY, 
currentState.getStorageClass().toString());
-                            }
-                            if (userMetadata.size() > 0) {
-                                StringBuilder userMetaBldr = new 
StringBuilder();
-                                for (String userKey : userMetadata.keySet()) {
-                                    
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
-                                }
-                                attributes.put(S3_USERMETA_ATTR_KEY, 
userMetaBldr.toString());
+                    // No call to an encryption service is needed for a 
CompleteMultipartUploadRequest.
+                    try {
+                        CompleteMultipartUploadResult completeResult =
+                                s3.completeMultipartUpload(completeRequest);
+                        getLogger().info("Success completing upload 
flowfile={} etag={} uploadId={}",
+                                new Object[]{ffFilename, 
completeResult.getETag(), currentState.getUploadId()});
+                        if (completeResult.getVersionId() != null) {
+                            attributes.put(S3_VERSION_ATTR_KEY, 
completeResult.getVersionId());
+                        }
+                        if (completeResult.getETag() != null) {
+                            attributes.put(S3_ETAG_ATTR_KEY, 
completeResult.getETag());
+                        }
+                        if (completeResult.getExpirationTime() != null) {
+                            attributes.put(S3_EXPIRATION_ATTR_KEY,
+                                    
completeResult.getExpirationTime().toString());
+                        }
+                        if (currentState.getStorageClass() != null) {
+                            attributes.put(S3_STORAGECLASS_ATTR_KEY, 
currentState.getStorageClass().toString());
+                        }
+                        if (userMetadata.size() > 0) {
+                            StringBuilder userMetaBldr = new StringBuilder();
+                            for (String userKey : userMetadata.keySet()) {
+                                
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
                             }
-                            attributes.put(S3_API_METHOD_ATTR_KEY, 
S3_API_METHOD_MULTIPARTUPLOAD);
-                        } catch (AmazonClientException e) {
-                            getLogger().info("Failure completing upload 
flowfile={} bucket={} key={} reason={}",
-                                    new Object[]{ffFilename, bucket, key, 
e.getMessage()});
-                            throw (e);
+                            attributes.put(S3_USERMETA_ATTR_KEY, 
userMetaBldr.toString());
                         }
+                        attributes.put(S3_API_METHOD_ATTR_KEY, 
S3_API_METHOD_MULTIPARTUPLOAD);
+                    } catch (AmazonClientException e) {
+                        getLogger().info("Failure completing upload 
flowfile={} bucket={} key={} reason={}",
+                                new Object[]{ffFilename, bucket, key, 
e.getMessage()});
+                        throw (e);
                     }
                 }
-            });
+            } catch (IOException e) {
+                getLogger().error("Error during upload of flow files: " + 
e.getMessage());
+                throw e;
+            }
 
             if (!attributes.isEmpty()) {
                 flowFile = session.putAllAttributes(flowFile, attributes);
@@ -852,25 +865,25 @@ public class PutS3Object extends AbstractS3Processor {
             final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
             session.getProvenanceReporter().send(flowFile, url, millis);
 
-            getLogger().info("Successfully put {} to Amazon S3 in {} 
milliseconds", new Object[] {ff, millis});
+            getLogger().info("Successfully put {} to Amazon S3 in {} 
milliseconds", new Object[]{ff, millis});
             try {
                 removeLocalState(cacheKey);
             } catch (IOException e) {
                 getLogger().info("Error trying to delete key {} from cache: 
{}",
                         new Object[]{cacheKey, e.getMessage()});
             }
-        } catch (final ProcessException | AmazonClientException pe) {
-            extractExceptionDetails(pe, session, flowFile);
-            if (pe.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) {
-                getLogger().info(pe.getMessage());
+
+        } catch (final ProcessException | AmazonClientException | IOException 
e) {
+            extractExceptionDetails(e, session, flowFile);
+            if (e.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) {
+                getLogger().info(e.getMessage());
                 session.rollback();
             } else {
-                getLogger().error("Failed to put {} to Amazon S3 due to {}", 
new Object[]{flowFile, pe});
+                getLogger().error("Failed to put {} to Amazon S3 due to {}", 
flowFile, e);
                 flowFile = session.penalize(flowFile);
                 session.transfer(flowFile, REL_FAILURE);
             }
         }
-
     }
 
     private final Lock s3BucketLock = new ReentrantLock();
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
index 80b3950716..f8eb6dd5b2 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
@@ -41,9 +41,12 @@ import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.localstack.LocalStackContainer;
 import org.testcontainers.utility.DockerImageName;
 
@@ -70,6 +73,8 @@ import static org.junit.jupiter.api.Assertions.fail;
  * @see ITListS3
  */
 public abstract class AbstractS3IT {
+    private static final Logger logger = 
LoggerFactory.getLogger(AbstractS3IT.class);
+
     protected final static String SAMPLE_FILE_RESOURCE_NAME = "/hello.txt";
     protected final static String BUCKET_NAME = "test-bucket-" + 
System.currentTimeMillis();
 
@@ -82,7 +87,6 @@ public abstract class AbstractS3IT {
     private static final LocalStackContainer localstack = new 
LocalStackContainer(localstackImage)
             .withServices(LocalStackContainer.Service.S3, 
LocalStackContainer.Service.KMS);
 
-
     @BeforeAll
     public static void oneTimeSetup() {
         localstack.start();
@@ -101,6 +105,45 @@ public abstract class AbstractS3IT {
         client.createBucket(request);
     }
 
+    @BeforeEach
+    public void clearKeys() {
+        addedKeys.clear();
+    }
+
+    @AfterEach
+    public void emptyBucket() {
+        if (!client.doesBucketExistV2(BUCKET_NAME)) {
+            return;
+        }
+
+        ObjectListing objectListing = client.listObjects(BUCKET_NAME);
+        while (true) {
+            for (S3ObjectSummary objectSummary : 
objectListing.getObjectSummaries()) {
+                client.deleteObject(BUCKET_NAME, objectSummary.getKey());
+            }
+
+            if (objectListing.isTruncated()) {
+                objectListing = client.listNextBatchOfObjects(objectListing);
+            } else {
+                break;
+            }
+        }
+    }
+
+    @AfterAll
+    public static void oneTimeTearDown() {
+        try {
+            if (client == null || !client.doesBucketExistV2(BUCKET_NAME)) {
+                return;
+            }
+
+            DeleteBucketRequest dbr = new DeleteBucketRequest(BUCKET_NAME);
+            client.deleteBucket(dbr);
+        } catch (final AmazonS3Exception e) {
+            logger.error("Unable to delete bucket {}", BUCKET_NAME, e);
+        }
+    }
+
     protected AmazonS3 getClient() {
         return client;
     }
@@ -121,44 +164,6 @@ public abstract class AbstractS3IT {
         AuthUtils.enableAccessKey(runner, localstack.getAccessKey(), 
localstack.getSecretKey());
     }
 
-    @BeforeEach
-    public void clearKeys() {
-        addedKeys.clear();
-    }
-
-    @AfterAll
-    public static void oneTimeTearDown() {
-        // Empty the bucket before deleting it.
-        try {
-            if (client == null) {
-                return;
-            }
-
-            ObjectListing objectListing = client.listObjects(BUCKET_NAME);
-
-            while (true) {
-                for (S3ObjectSummary objectSummary : 
objectListing.getObjectSummaries()) {
-                    client.deleteObject(BUCKET_NAME, objectSummary.getKey());
-                }
-
-                if (objectListing.isTruncated()) {
-                    objectListing = 
client.listNextBatchOfObjects(objectListing);
-                } else {
-                    break;
-                }
-            }
-
-            DeleteBucketRequest dbr = new DeleteBucketRequest(BUCKET_NAME);
-            client.deleteBucket(dbr);
-        } catch (final AmazonS3Exception e) {
-            System.err.println("Unable to delete bucket " + BUCKET_NAME + 
e.toString());
-        }
-
-        if (client.doesBucketExistV2(BUCKET_NAME)) {
-            fail("Incomplete teardown, subsequent tests might fail");
-        }
-    }
-
     protected void putTestFile(String key, File file) throws AmazonS3Exception 
{
         PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key, 
file);
         client.putObject(putRequest);
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
index cda5e528db..43f936c9f0 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
@@ -24,13 +24,17 @@ import com.amazonaws.services.s3.model.MultipartUpload;
 import com.amazonaws.services.s3.model.MultipartUploadListing;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.amazonaws.services.s3.model.StorageClass;
 import com.amazonaws.services.s3.model.Tag;
 import org.apache.commons.codec.binary.Base64;
+import org.apache.nifi.fileresource.service.StandardFileResourceService;
+import org.apache.nifi.fileresource.service.api.FileResourceService;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import 
org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService;
+import org.apache.nifi.processors.transfer.ResourceTransferSource;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
@@ -53,6 +57,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
 
+import static 
org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
+import static 
org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -88,6 +98,61 @@ public class ITPutS3Object extends AbstractS3IT {
         kmsKeyId = getKMSKey();
     }
 
+    @Test
+    public void testPutFromLocalFile() throws Exception {
+        TestRunner runner = initTestRunner();
+        String attributeName = "file.path";
+        Path resourcePath = getResourcePath(SAMPLE_FILE_RESOURCE_NAME);
+
+        String serviceId = FileResourceService.class.getSimpleName();
+        FileResourceService service = new StandardFileResourceService();
+        runner.addControllerService(serviceId, service);
+        runner.setProperty(service, StandardFileResourceService.FILE_PATH, 
String.format("${%s}", attributeName));
+        runner.enableControllerService(service);
+
+        runner.setProperty(RESOURCE_TRANSFER_SOURCE, 
ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue());
+        runner.setProperty(FILE_RESOURCE_SERVICE, serviceId);
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(attributeName, resourcePath.toString());
+        runner.enqueue(resourcePath, attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).getFirst();
+        
flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+
+        List<S3ObjectSummary> objectSummaries = 
getClient().listObjects(BUCKET_NAME).getObjectSummaries();
+        assertThat(objectSummaries, hasSize(1));
+        assertEquals(objectSummaries.getFirst().getKey(), 
resourcePath.getFileName().toString());
+        assertThat(objectSummaries.getFirst().getSize(), greaterThan(0L));
+    }
+
+    @Test
+    public void testPutFromNonExistentLocalFile() throws Exception {
+        TestRunner runner = initTestRunner();
+        String attributeName = "file.path";
+
+        String serviceId = FileResourceService.class.getSimpleName();
+        FileResourceService service = new StandardFileResourceService();
+        runner.addControllerService(serviceId, service);
+        runner.setProperty(service, StandardFileResourceService.FILE_PATH, 
String.format("${%s}", attributeName));
+        runner.enableControllerService(service);
+
+        runner.setProperty(RESOURCE_TRANSFER_SOURCE, 
ResourceTransferSource.FILE_RESOURCE_SERVICE);
+        runner.setProperty(FILE_RESOURCE_SERVICE, serviceId);
+
+        String filePath = "nonexistent.txt";
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(attributeName, filePath);
+
+        runner.enqueue("", attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_FAILURE, 1);
+        assertThat(getClient().listObjects(BUCKET_NAME).getObjectSummaries(), 
empty());
+    }
 
     @Test
     public void testSimplePut() throws IOException {
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
index fff03d7510..ab7bf59fb4 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
@@ -37,10 +37,13 @@ import com.amazonaws.services.s3.model.StorageClass;
 import com.amazonaws.services.s3.model.Tag;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.fileresource.service.api.FileResource;
+import org.apache.nifi.fileresource.service.api.FileResourceService;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.aws.signer.AwsSignerType;
 import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.processors.transfer.ResourceTransferSource;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -50,6 +53,7 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import java.io.File;
+import java.io.InputStream;
 import java.net.URLEncoder;
 import java.util.Date;
 import java.util.HashMap;
@@ -57,11 +61,16 @@ import java.util.List;
 import java.util.Map;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
+import static 
org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.ArgumentMatchers.anyMap;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 public class TestPutS3Object {
@@ -87,6 +96,33 @@ public class TestPutS3Object {
         runner.setEnvironmentVariableValue("java.io.tmpdir", 
System.getProperty("java.io.tmpdir"));
     }
 
+    @Test
+    public void testPutSinglePartFromLocalFileSource() throws Exception {
+        prepareTest();
+
+        String serviceId = "fileresource";
+        FileResourceService service = mock(FileResourceService.class);
+        InputStream localFileInputStream = mock(InputStream.class);
+        when(service.getIdentifier()).thenReturn(serviceId);
+        long contentLength = 10L;
+        when(service.getFileResource(anyMap())).thenReturn(new 
FileResource(localFileInputStream, contentLength));
+
+        runner.addControllerService(serviceId, service);
+        runner.enableControllerService(service);
+        runner.setProperty(RESOURCE_TRANSFER_SOURCE, 
ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue());
+        runner.setProperty(FILE_RESOURCE_SERVICE, serviceId);
+
+        runner.run();
+
+        ArgumentCaptor<PutObjectRequest> captureRequest = 
ArgumentCaptor.forClass(PutObjectRequest.class);
+        verify(mockS3Client).putObject(captureRequest.capture());
+        PutObjectRequest putObjectRequest = captureRequest.getValue();
+        assertEquals(localFileInputStream, putObjectRequest.getInputStream());
+        assertEquals(putObjectRequest.getMetadata().getContentLength(), 
contentLength);
+
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
+    }
+
     @Test
     public void testPutSinglePart() {
         runner.setProperty("x-custom-prop", "hello");
@@ -95,7 +131,7 @@ public class TestPutS3Object {
         runner.run(1);
 
         ArgumentCaptor<PutObjectRequest> captureRequest = 
ArgumentCaptor.forClass(PutObjectRequest.class);
-        Mockito.verify(mockS3Client, 
Mockito.times(1)).putObject(captureRequest.capture());
+        verify(mockS3Client, 
Mockito.times(1)).putObject(captureRequest.capture());
         PutObjectRequest request = captureRequest.getValue();
         assertEquals("test-bucket", request.getBucketName());
 
@@ -105,6 +141,7 @@ public class TestPutS3Object {
         MockFlowFile ff0 = flowFiles.get(0);
 
         ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), 
"testfile.txt");
+        ff0.assertContentEquals("Test Content");
         ff0.assertAttributeEquals(PutS3Object.S3_ETAG_ATTR_KEY, "test-etag");
         ff0.assertAttributeEquals(PutS3Object.S3_VERSION_ATTR_KEY, 
"test-version");
     }
@@ -113,7 +150,7 @@ public class TestPutS3Object {
     public void testPutSinglePartException() {
         prepareTest();
 
-        
Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new
 AmazonS3Exception("TestFail"));
+        
when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new 
AmazonS3Exception("TestFail"));
 
         runner.run(1);
 
@@ -150,7 +187,7 @@ public class TestPutS3Object {
         runner.run(1);
 
         ArgumentCaptor<PutObjectRequest> captureRequest = 
ArgumentCaptor.forClass(PutObjectRequest.class);
-        Mockito.verify(mockS3Client, 
Mockito.times(1)).putObject(captureRequest.capture());
+        verify(mockS3Client, 
Mockito.times(1)).putObject(captureRequest.capture());
         PutObjectRequest request = captureRequest.getValue();
 
         List<Tag> tagSet = request.getTagging().getTagSet();
@@ -169,7 +206,7 @@ public class TestPutS3Object {
             runner.run(1);
 
             ArgumentCaptor<PutObjectRequest> captureRequest = 
ArgumentCaptor.forClass(PutObjectRequest.class);
-            Mockito.verify(mockS3Client, 
Mockito.times(1)).putObject(captureRequest.capture());
+            verify(mockS3Client, 
Mockito.times(1)).putObject(captureRequest.capture());
             PutObjectRequest request = captureRequest.getValue();
 
             assertEquals(storageClass.toString(), request.getStorageClass());
@@ -185,7 +222,7 @@ public class TestPutS3Object {
         runner.run(1);
 
         ArgumentCaptor<PutObjectRequest> captureRequest = 
ArgumentCaptor.forClass(PutObjectRequest.class);
-        Mockito.verify(mockS3Client, 
Mockito.times(1)).putObject(captureRequest.capture());
+        verify(mockS3Client, 
Mockito.times(1)).putObject(captureRequest.capture());
         PutObjectRequest request = captureRequest.getValue();
 
         ObjectMetadata objectMetadata = request.getMetadata();
@@ -241,10 +278,10 @@ public class TestPutS3Object {
         putObjectResult.setVersionId("test-version");
         putObjectResult.setETag("test-etag");
 
-        
Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult);
+        
when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult);
 
         MultipartUploadListing uploadListing = new MultipartUploadListing();
-        
Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing);
+        
when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing);
     }
 
     @Test

Reply via email to