This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new bce14f573b NIFI-12643 Added support for FileResourceService in 
PutGCSObject
bce14f573b is described below

commit bce14f573b57685637d776333029641e62730d26
Author: Balázs Gerner <balazsger...@gmail.com>
AuthorDate: Fri Jan 19 09:49:30 2024 +0100

    NIFI-12643 Added support for FileResourceService in PutGCSObject
    
    This closes #8281.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../nifi-gcp-bundle/nifi-gcp-processors/pom.xml    |   5 +
 .../nifi/processors/gcp/storage/PutGCSObject.java  | 375 +++++++++++----------
 .../processors/gcp/storage/PutGCSObjectTest.java   |  55 ++-
 3 files changed, 242 insertions(+), 193 deletions(-)

diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index 70f104365e..897e710395 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -69,6 +69,11 @@
             <version>2.0.0-SNAPSHOT</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-resource-transfer</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-file-resource-service-api</artifactId>
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
index 07546637da..438fac19b8 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
@@ -22,15 +22,6 @@ import com.google.cloud.storage.BlobId;
 import com.google.cloud.storage.BlobInfo;
 import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.StorageException;
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
@@ -43,13 +34,23 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.fileresource.service.api.FileResource;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.transfer.ResourceTransferSource;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static 
com.google.cloud.storage.Storage.PredefinedAcl.ALL_AUTHENTICATED_USERS;
 import static 
com.google.cloud.storage.Storage.PredefinedAcl.AUTHENTICATED_READ;
@@ -102,6 +103,9 @@ import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TI
 import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_DESC;
 import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR;
 import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC;
+import static 
org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
+import static 
org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
+import static 
org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource;
 
 
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@@ -290,6 +294,8 @@ public class PutGCSObject extends AbstractGCSProcessor {
         final List<PropertyDescriptor> descriptors = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
         descriptors.add(BUCKET);
         descriptors.add(KEY);
+        descriptors.add(RESOURCE_TRANSFER_SOURCE);
+        descriptors.add(FILE_RESOURCE_SERVICE);
         descriptors.add(CONTENT_TYPE);
         descriptors.add(CRC32C);
         descriptors.add(ACL);
@@ -322,199 +328,196 @@ public class PutGCSObject extends AbstractGCSProcessor {
             return;
         }
 
-        final long startNanos = System.nanoTime();
+        try {
+            final long startNanos = System.nanoTime();
+
+            final String bucket = context.getProperty(BUCKET)
+                    .evaluateAttributeExpressions(flowFile)
+                    .getValue();
+            final String key = context.getProperty(KEY)
+                    .evaluateAttributeExpressions(flowFile)
+                    .getValue();
+            final boolean overwrite = 
context.getProperty(OVERWRITE).asBoolean();
+
+            final FlowFile ff = flowFile;
+            final String ffFilename = 
ff.getAttributes().get(CoreAttributes.FILENAME.key());
+            final Map<String, String> attributes = new HashMap<>();
+            final ResourceTransferSource resourceTransferSource = 
context.getProperty(RESOURCE_TRANSFER_SOURCE).asAllowableValue(ResourceTransferSource.class);
+            final Storage storage = getCloudService();
 
-        final String bucket = context.getProperty(BUCKET)
-                .evaluateAttributeExpressions(flowFile)
-                .getValue();
-        final String key = context.getProperty(KEY)
-                .evaluateAttributeExpressions(flowFile)
-                .getValue();
-        final boolean overwrite = context.getProperty(OVERWRITE).asBoolean();
+            try (final InputStream inputStream = 
getFileResource(resourceTransferSource, context, flowFile.getAttributes())
+                    .map(FileResource::getInputStream)
+                    .orElseGet(() -> session.read(ff))) {
 
-        final FlowFile ff = flowFile;
-        final String ffFilename = 
ff.getAttributes().get(CoreAttributes.FILENAME.key());
-        final Map<String, String> attributes = new HashMap<>();
+                final BlobId id = BlobId.of(bucket, key);
+                final BlobInfo.Builder blobInfoBuilder = 
BlobInfo.newBuilder(id);
+                final List<Storage.BlobWriteOption> blobWriteOptions = new 
ArrayList<>();
 
-        try {
-            final Storage storage = getCloudService();
-            session.read(flowFile, new InputStreamCallback() {
-                @Override
-                public void process(InputStream rawIn) throws IOException {
-                    try (final InputStream in = new 
BufferedInputStream(rawIn)) {
-                        final BlobId id = BlobId.of(bucket, key);
-                        final BlobInfo.Builder blobInfoBuilder = 
BlobInfo.newBuilder(id);
-                        final List<Storage.BlobWriteOption> blobWriteOptions = 
new ArrayList<>();
-
-                        if (!overwrite) {
-                            
blobWriteOptions.add(Storage.BlobWriteOption.doesNotExist());
-                        }
+                if (!overwrite) {
+                    
blobWriteOptions.add(Storage.BlobWriteOption.doesNotExist());
+                }
 
-                        final String contentDispositionType = 
context.getProperty(CONTENT_DISPOSITION_TYPE).getValue();
-                        if (contentDispositionType != null) {
-                            
blobInfoBuilder.setContentDisposition(contentDispositionType + "; filename=" + 
ffFilename);
-                        }
+                final String contentDispositionType = 
context.getProperty(CONTENT_DISPOSITION_TYPE).getValue();
+                if (contentDispositionType != null) {
+                    
blobInfoBuilder.setContentDisposition(contentDispositionType + "; filename=" + 
ffFilename);
+                }
 
-                        final String contentType = 
context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(ff).getValue();
-                        if (contentType != null) {
-                            blobInfoBuilder.setContentType(contentType);
-                        }
+                final String contentType = 
context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(ff).getValue();
+                if (contentType != null) {
+                    blobInfoBuilder.setContentType(contentType);
+                }
 
-                        final String crc32c = 
context.getProperty(CRC32C).evaluateAttributeExpressions(ff).getValue();
-                        if (crc32c != null) {
-                            blobInfoBuilder.setCrc32c(crc32c);
-                            
blobWriteOptions.add(Storage.BlobWriteOption.crc32cMatch());
-                        }
+                final String crc32c = 
context.getProperty(CRC32C).evaluateAttributeExpressions(ff).getValue();
+                if (crc32c != null) {
+                    blobInfoBuilder.setCrc32c(crc32c);
+                    
blobWriteOptions.add(Storage.BlobWriteOption.crc32cMatch());
+                }
 
-                        final String acl = context.getProperty(ACL).getValue();
-                        if (acl != null) {
-                            
blobWriteOptions.add(Storage.BlobWriteOption.predefinedAcl(
-                                    Storage.PredefinedAcl.valueOf(acl)
-                            ));
-                        }
+                final String acl = context.getProperty(ACL).getValue();
+                if (acl != null) {
+                    blobWriteOptions.add(Storage.BlobWriteOption.predefinedAcl(
+                            Storage.PredefinedAcl.valueOf(acl)
+                    ));
+                }
 
-                        final String encryptionKey = 
context.getProperty(ENCRYPTION_KEY)
-                                .evaluateAttributeExpressions(ff).getValue();
-                        if (encryptionKey != null) {
-                            
blobWriteOptions.add(Storage.BlobWriteOption.encryptionKey(encryptionKey));
-                        }
+                final String encryptionKey = 
context.getProperty(ENCRYPTION_KEY)
+                        .evaluateAttributeExpressions(ff).getValue();
+                if (encryptionKey != null) {
+                    
blobWriteOptions.add(Storage.BlobWriteOption.encryptionKey(encryptionKey));
+                }
 
-                        final boolean gzipCompress = 
context.getProperty(GZIPCONTENT).asBoolean();
-                        if (!gzipCompress){
-                            
blobWriteOptions.add(Storage.BlobWriteOption.disableGzipContent());
-                        }
+                final boolean gzipCompress = 
context.getProperty(GZIPCONTENT).asBoolean();
+                if (!gzipCompress) {
+                    
blobWriteOptions.add(Storage.BlobWriteOption.disableGzipContent());
+                }
 
-                        final HashMap<String, String> userMetadata = new 
HashMap<>();
-                        for (final Map.Entry<PropertyDescriptor, String> entry 
: context.getProperties().entrySet()) {
-                            if (entry.getKey().isDynamic()) {
-                                final String value = context.getProperty(
-                                        
entry.getKey()).evaluateAttributeExpressions(ff).getValue();
-                                userMetadata.put(entry.getKey().getName(), 
value);
-                            }
-                        }
+                final HashMap<String, String> userMetadata = new HashMap<>();
+                for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
+                    if (entry.getKey().isDynamic()) {
+                        final String value = context.getProperty(
+                                
entry.getKey()).evaluateAttributeExpressions(ff).getValue();
+                        userMetadata.put(entry.getKey().getName(), value);
+                    }
+                }
+
+                if (!userMetadata.isEmpty()) {
+                    blobInfoBuilder.setMetadata(userMetadata);
+                }
 
-                        if (!userMetadata.isEmpty()) {
-                            blobInfoBuilder.setMetadata(userMetadata);
+                try {
+                    final Blob blob = 
storage.createFrom(blobInfoBuilder.build(),
+                            inputStream,
+                            blobWriteOptions.toArray(new 
Storage.BlobWriteOption[blobWriteOptions.size()])
+                    );
+
+                    // Create attributes
+                    attributes.put(BUCKET_ATTR, blob.getBucket());
+                    attributes.put(KEY_ATTR, blob.getName());
+
+
+                    if (blob.getSize() != null) {
+                        attributes.put(SIZE_ATTR, 
String.valueOf(blob.getSize()));
+                    }
+
+                    if (blob.getCacheControl() != null) {
+                        attributes.put(CACHE_CONTROL_ATTR, 
blob.getCacheControl());
+                    }
+
+                    if (blob.getComponentCount() != null) {
+                        attributes.put(COMPONENT_COUNT_ATTR, 
String.valueOf(blob.getComponentCount()));
+                    }
+
+                    if (blob.getContentDisposition() != null) {
+                        attributes.put(CONTENT_DISPOSITION_ATTR, 
blob.getContentDisposition());
+                        final Util.ParsedContentDisposition parsed = 
Util.parseContentDisposition(blob.getContentDisposition());
+
+                        if (parsed != null) {
+                            attributes.put(CoreAttributes.FILENAME.key(), 
parsed.getFileName());
                         }
+                    }
+
+                    if (blob.getContentEncoding() != null) {
+                        attributes.put(CONTENT_ENCODING_ATTR, 
blob.getContentEncoding());
+                    }
+
+                    if (blob.getContentLanguage() != null) {
+                        attributes.put(CONTENT_LANGUAGE_ATTR, 
blob.getContentLanguage());
+                    }
+
+                    if (blob.getContentType() != null) {
+                        attributes.put(CoreAttributes.MIME_TYPE.key(), 
blob.getContentType());
+                    }
+
+                    if (blob.getCrc32c() != null) {
+                        attributes.put(CRC32C_ATTR, blob.getCrc32c());
+                    }
+
+                    if (blob.getCustomerEncryption() != null) {
+                        final BlobInfo.CustomerEncryption encryption = 
blob.getCustomerEncryption();
 
-                        try {
-                            final Blob blob = 
storage.createFrom(blobInfoBuilder.build(),
-                                    in,
-                                    blobWriteOptions.toArray(new 
Storage.BlobWriteOption[blobWriteOptions.size()])
-                            );
-
-                            // Create attributes
-                            attributes.put(BUCKET_ATTR, blob.getBucket());
-                            attributes.put(KEY_ATTR, blob.getName());
-
-
-                            if (blob.getSize() != null) {
-                                attributes.put(SIZE_ATTR, 
String.valueOf(blob.getSize()));
-                            }
-
-                            if (blob.getCacheControl() != null) {
-                                attributes.put(CACHE_CONTROL_ATTR, 
blob.getCacheControl());
-                            }
-
-                            if (blob.getComponentCount() != null) {
-                                attributes.put(COMPONENT_COUNT_ATTR, 
String.valueOf(blob.getComponentCount()));
-                            }
-
-                            if (blob.getContentDisposition() != null) {
-                                attributes.put(CONTENT_DISPOSITION_ATTR, 
blob.getContentDisposition());
-                                final Util.ParsedContentDisposition parsed = 
Util.parseContentDisposition(blob.getContentDisposition());
-
-                                if (parsed != null) {
-                                    
attributes.put(CoreAttributes.FILENAME.key(), parsed.getFileName());
-                                }
-                            }
-
-                            if (blob.getContentEncoding() != null) {
-                                attributes.put(CONTENT_ENCODING_ATTR, 
blob.getContentEncoding());
-                            }
-
-                            if (blob.getContentLanguage() != null) {
-                                attributes.put(CONTENT_LANGUAGE_ATTR, 
blob.getContentLanguage());
-                            }
-
-                            if (blob.getContentType() != null) {
-                                attributes.put(CoreAttributes.MIME_TYPE.key(), 
blob.getContentType());
-                            }
-
-                            if (blob.getCrc32c() != null) {
-                                attributes.put(CRC32C_ATTR, blob.getCrc32c());
-                            }
-
-                            if (blob.getCustomerEncryption() != null) {
-                                final BlobInfo.CustomerEncryption encryption = 
blob.getCustomerEncryption();
-
-                                attributes.put(ENCRYPTION_ALGORITHM_ATTR, 
encryption.getEncryptionAlgorithm());
-                                attributes.put(ENCRYPTION_SHA256_ATTR, 
encryption.getKeySha256());
-                            }
-
-                            if (blob.getEtag() != null) {
-                                attributes.put(ETAG_ATTR, blob.getEtag());
-                            }
-
-                            if (blob.getGeneratedId() != null) {
-                                attributes.put(GENERATED_ID_ATTR, 
blob.getGeneratedId());
-                            }
-
-                            if (blob.getGeneration() != null) {
-                                attributes.put(GENERATION_ATTR, 
String.valueOf(blob.getGeneration()));
-                            }
-
-                            if (blob.getMd5() != null) {
-                                attributes.put(MD5_ATTR, blob.getMd5());
-                            }
-
-                            if (blob.getMediaLink() != null) {
-                                attributes.put(MEDIA_LINK_ATTR, 
blob.getMediaLink());
-                            }
-
-                            if (blob.getMetageneration() != null) {
-                                attributes.put(METAGENERATION_ATTR, 
String.valueOf(blob.getMetageneration()));
-                            }
-
-                            if (blob.getOwner() != null) {
-                                final Acl.Entity entity = blob.getOwner();
-
-                                if (entity instanceof Acl.User) {
-                                    attributes.put(OWNER_ATTR, ((Acl.User) 
entity).getEmail());
-                                    attributes.put(OWNER_TYPE_ATTR, "user");
-                                } else if (entity instanceof Acl.Group) {
-                                    attributes.put(OWNER_ATTR, ((Acl.Group) 
entity).getEmail());
-                                    attributes.put(OWNER_TYPE_ATTR, "group");
-                                } else if (entity instanceof Acl.Domain) {
-                                    attributes.put(OWNER_ATTR, ((Acl.Domain) 
entity).getDomain());
-                                    attributes.put(OWNER_TYPE_ATTR, "domain");
-                                } else if (entity instanceof Acl.Project) {
-                                    attributes.put(OWNER_ATTR, ((Acl.Project) 
entity).getProjectId());
-                                    attributes.put(OWNER_TYPE_ATTR, "project");
-                                }
-                            }
-
-                            if (blob.getSelfLink() != null) {
-                                attributes.put(URI_ATTR, blob.getSelfLink());
-                            }
-
-                            if (blob.getCreateTimeOffsetDateTime() != null) {
-                                attributes.put(CREATE_TIME_ATTR, 
String.valueOf(blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli()));
-                            }
-
-                            if (blob.getUpdateTimeOffsetDateTime() != null) {
-                                attributes.put(UPDATE_TIME_ATTR, 
String.valueOf(blob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli()));
-                            }
-                        } catch (StorageException e) {
-                            getLogger().error("Failure completing upload 
flowfile={} bucket={} key={} reason={}",
-                                    ffFilename, bucket, key, e.getMessage(), 
e);
-                            throw (e);
+                        attributes.put(ENCRYPTION_ALGORITHM_ATTR, 
encryption.getEncryptionAlgorithm());
+                        attributes.put(ENCRYPTION_SHA256_ATTR, 
encryption.getKeySha256());
+                    }
+
+                    if (blob.getEtag() != null) {
+                        attributes.put(ETAG_ATTR, blob.getEtag());
+                    }
+
+                    if (blob.getGeneratedId() != null) {
+                        attributes.put(GENERATED_ID_ATTR, 
blob.getGeneratedId());
+                    }
+
+                    if (blob.getGeneration() != null) {
+                        attributes.put(GENERATION_ATTR, 
String.valueOf(blob.getGeneration()));
+                    }
+
+                    if (blob.getMd5() != null) {
+                        attributes.put(MD5_ATTR, blob.getMd5());
+                    }
+
+                    if (blob.getMediaLink() != null) {
+                        attributes.put(MEDIA_LINK_ATTR, blob.getMediaLink());
+                    }
+
+                    if (blob.getMetageneration() != null) {
+                        attributes.put(METAGENERATION_ATTR, 
String.valueOf(blob.getMetageneration()));
+                    }
+
+                    if (blob.getOwner() != null) {
+                        final Acl.Entity entity = blob.getOwner();
+
+                        if (entity instanceof Acl.User) {
+                            attributes.put(OWNER_ATTR, ((Acl.User) 
entity).getEmail());
+                            attributes.put(OWNER_TYPE_ATTR, "user");
+                        } else if (entity instanceof Acl.Group) {
+                            attributes.put(OWNER_ATTR, ((Acl.Group) 
entity).getEmail());
+                            attributes.put(OWNER_TYPE_ATTR, "group");
+                        } else if (entity instanceof Acl.Domain) {
+                            attributes.put(OWNER_ATTR, ((Acl.Domain) 
entity).getDomain());
+                            attributes.put(OWNER_TYPE_ATTR, "domain");
+                        } else if (entity instanceof Acl.Project) {
+                            attributes.put(OWNER_ATTR, ((Acl.Project) 
entity).getProjectId());
+                            attributes.put(OWNER_TYPE_ATTR, "project");
                         }
+                    }
 
+                    if (blob.getSelfLink() != null) {
+                        attributes.put(URI_ATTR, blob.getSelfLink());
+                    }
+
+                    if (blob.getCreateTimeOffsetDateTime() != null) {
+                        attributes.put(CREATE_TIME_ATTR, 
String.valueOf(blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli()));
+                    }
 
+                    if (blob.getUpdateTimeOffsetDateTime() != null) {
+                        attributes.put(UPDATE_TIME_ATTR, 
String.valueOf(blob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli()));
                     }
+                } catch (StorageException | IOException e) {
+                    getLogger().error("Failure completing upload flowfile={} 
bucket={} key={} reason={}",
+                            ffFilename, bucket, key, e.getMessage(), e);
+                    throw (e);
                 }
-            });
+            }
 
             if (!attributes.isEmpty()) {
                 flowFile = session.putAllAttributes(flowFile, attributes);
@@ -527,7 +530,7 @@ public class PutGCSObject extends AbstractGCSProcessor {
             getLogger().info("Successfully put {} to Google Cloud Storage in 
{} milliseconds",
                     new Object[]{ff, millis});
 
-        } catch (final ProcessException | StorageException e) {
+        } catch (final ProcessException | StorageException | IOException e) {
             getLogger().error("Failed to put {} to Google Cloud Storage due to 
{}", flowFile, e.getMessage(), e);
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java
index 2cb446d87a..0277ce0d6b 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java
@@ -22,6 +22,17 @@ import com.google.cloud.storage.BlobInfo;
 import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.StorageException;
 import com.google.cloud.storage.StorageOptions;
+import org.apache.nifi.fileresource.service.api.FileResource;
+import org.apache.nifi.fileresource.service.api.FileResourceService;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processors.transfer.ResourceTransferSource;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+
 import java.io.InputStream;
 import java.time.Instant;
 import java.time.LocalDateTime;
@@ -31,13 +42,6 @@ import java.time.ZoneOffset;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
 
 import static com.google.cloud.storage.Storage.PredefinedAcl.BUCKET_OWNER_READ;
 import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR;
@@ -62,10 +66,13 @@ import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_TYP
 import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.SIZE_ATTR;
 import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_ATTR;
 import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR;
+import static 
org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
+import static 
org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyMap;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
@@ -174,6 +181,40 @@ public class PutGCSObjectTest extends AbstractGCSTest {
         assertNull(blobInfo.getCrc32c());
     }
 
+    @Test
+    public void testSuccessfulPutOperationFromLocalFileSource() throws 
Exception {
+        reset(storageOptions, storage, blob);
+        when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+        when(storage.getOptions()).thenReturn(storageOptions);
+        final PutGCSObject processor = getProcessor();
+        final TestRunner runner = buildNewRunner(processor);
+        addRequiredPropertiesToRunner(runner);
+
+        String serviceId = "fileresource";
+        FileResourceService service = mock(FileResourceService.class);
+        InputStream localFileInputStream = mock(InputStream.class);
+        when(service.getIdentifier()).thenReturn(serviceId);
+        when(service.getFileResource(anyMap())).thenReturn(new 
FileResource(localFileInputStream, 10));
+
+        runner.addControllerService(serviceId, service);
+        runner.enableControllerService(service);
+        runner.setProperty(RESOURCE_TRANSFER_SOURCE, 
ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue());
+        runner.setProperty(FILE_RESOURCE_SERVICE, serviceId);
+
+        runner.assertValid();
+
+        when(storage.createFrom(blobInfoArgumentCaptor.capture(),
+                inputStreamArgumentCaptor.capture(),
+                blobWriteOptionArgumentCaptor.capture())).thenReturn(blob);
+
+        runner.enqueue("test");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutGCSObject.REL_SUCCESS);
+        runner.assertTransferCount(PutGCSObject.REL_SUCCESS, 1);
+        assertEquals(inputStreamArgumentCaptor.getValue(), 
localFileInputStream);
+    }
+
     @Test
     public void testSuccessfulPutOperation() throws Exception {
         reset(storageOptions, storage, blob);

Reply via email to