This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new bce14f573b NIFI-12643 Added support for FileResourceService in PutGCSObject bce14f573b is described below commit bce14f573b57685637d776333029641e62730d26 Author: Balázs Gerner <balazsger...@gmail.com> AuthorDate: Fri Jan 19 09:49:30 2024 +0100 NIFI-12643 Added support for FileResourceService in PutGCSObject This closes #8281. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../nifi-gcp-bundle/nifi-gcp-processors/pom.xml | 5 + .../nifi/processors/gcp/storage/PutGCSObject.java | 375 +++++++++++---------- .../processors/gcp/storage/PutGCSObjectTest.java | 55 ++- 3 files changed, 242 insertions(+), 193 deletions(-) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml index 70f104365e..897e710395 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml @@ -69,6 +69,11 @@ <version>2.0.0-SNAPSHOT</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-resource-transfer</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-file-resource-service-api</artifactId> diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java index 07546637da..438fac19b8 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java @@ -22,15 +22,6 @@ import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; @@ -43,13 +34,23 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.fileresource.service.api.FileResource; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.transfer.ResourceTransferSource; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import static com.google.cloud.storage.Storage.PredefinedAcl.ALL_AUTHENTICATED_USERS; import static com.google.cloud.storage.Storage.PredefinedAcl.AUTHENTICATED_READ; @@ -102,6 +103,9 @@ import static org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TI import static org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_DESC; import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR; import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC; +import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE; +import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE; +import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource; @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @@ -290,6 +294,8 @@ public class PutGCSObject extends AbstractGCSProcessor { final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors()); descriptors.add(BUCKET); descriptors.add(KEY); + descriptors.add(RESOURCE_TRANSFER_SOURCE); + descriptors.add(FILE_RESOURCE_SERVICE); descriptors.add(CONTENT_TYPE); descriptors.add(CRC32C); descriptors.add(ACL); @@ -322,199 +328,196 @@ public class PutGCSObject extends AbstractGCSProcessor { return; } - final long startNanos = System.nanoTime(); + try { + final long startNanos = System.nanoTime(); + + final String bucket = context.getProperty(BUCKET) + .evaluateAttributeExpressions(flowFile) + .getValue(); + final String key = context.getProperty(KEY) + .evaluateAttributeExpressions(flowFile) + .getValue(); + final boolean overwrite = context.getProperty(OVERWRITE).asBoolean(); + + final FlowFile ff = flowFile; + final String ffFilename = ff.getAttributes().get(CoreAttributes.FILENAME.key()); + final Map<String, String> attributes = new HashMap<>(); + final ResourceTransferSource resourceTransferSource = context.getProperty(RESOURCE_TRANSFER_SOURCE).asAllowableValue(ResourceTransferSource.class); + final Storage storage = getCloudService(); - final String bucket = context.getProperty(BUCKET) - .evaluateAttributeExpressions(flowFile) - .getValue(); - final String key = context.getProperty(KEY) - .evaluateAttributeExpressions(flowFile) - .getValue(); - final boolean overwrite = context.getProperty(OVERWRITE).asBoolean(); + try (final InputStream inputStream = getFileResource(resourceTransferSource, context, flowFile.getAttributes()) + .map(FileResource::getInputStream) + .orElseGet(() -> session.read(ff))) { - final FlowFile ff = flowFile; - final String ffFilename = ff.getAttributes().get(CoreAttributes.FILENAME.key()); - final Map<String, String> attributes = new HashMap<>(); + final BlobId id = BlobId.of(bucket, key); + final BlobInfo.Builder blobInfoBuilder = BlobInfo.newBuilder(id); + final List<Storage.BlobWriteOption> blobWriteOptions = new ArrayList<>(); - try { - final Storage storage = getCloudService(); - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream rawIn) throws IOException { - try (final InputStream in = new BufferedInputStream(rawIn)) { - final BlobId id = BlobId.of(bucket, key); - final BlobInfo.Builder blobInfoBuilder = BlobInfo.newBuilder(id); - final List<Storage.BlobWriteOption> blobWriteOptions = new ArrayList<>(); - - if (!overwrite) { - blobWriteOptions.add(Storage.BlobWriteOption.doesNotExist()); - } + if (!overwrite) { + blobWriteOptions.add(Storage.BlobWriteOption.doesNotExist()); + } - final String contentDispositionType = context.getProperty(CONTENT_DISPOSITION_TYPE).getValue(); - if (contentDispositionType != null) { - blobInfoBuilder.setContentDisposition(contentDispositionType + "; filename=" + ffFilename); - } + final String contentDispositionType = context.getProperty(CONTENT_DISPOSITION_TYPE).getValue(); + if (contentDispositionType != null) { + blobInfoBuilder.setContentDisposition(contentDispositionType + "; filename=" + ffFilename); + } - final String contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(ff).getValue(); - if (contentType != null) { - blobInfoBuilder.setContentType(contentType); - } + final String contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(ff).getValue(); + if (contentType != null) { + blobInfoBuilder.setContentType(contentType); + } - final String crc32c = context.getProperty(CRC32C).evaluateAttributeExpressions(ff).getValue(); - if (crc32c != null) { - blobInfoBuilder.setCrc32c(crc32c); - blobWriteOptions.add(Storage.BlobWriteOption.crc32cMatch()); - } + final String crc32c = context.getProperty(CRC32C).evaluateAttributeExpressions(ff).getValue(); + if (crc32c != null) { + blobInfoBuilder.setCrc32c(crc32c); + blobWriteOptions.add(Storage.BlobWriteOption.crc32cMatch()); + } - final String acl = context.getProperty(ACL).getValue(); - if (acl != null) { - blobWriteOptions.add(Storage.BlobWriteOption.predefinedAcl( - Storage.PredefinedAcl.valueOf(acl) - )); - } + final String acl = context.getProperty(ACL).getValue(); + if (acl != null) { + blobWriteOptions.add(Storage.BlobWriteOption.predefinedAcl( + Storage.PredefinedAcl.valueOf(acl) + )); + } - final String encryptionKey = context.getProperty(ENCRYPTION_KEY) - .evaluateAttributeExpressions(ff).getValue(); - if (encryptionKey != null) { - blobWriteOptions.add(Storage.BlobWriteOption.encryptionKey(encryptionKey)); - } + final String encryptionKey = context.getProperty(ENCRYPTION_KEY) + .evaluateAttributeExpressions(ff).getValue(); + if (encryptionKey != null) { + blobWriteOptions.add(Storage.BlobWriteOption.encryptionKey(encryptionKey)); + } - final boolean gzipCompress = context.getProperty(GZIPCONTENT).asBoolean(); - if (!gzipCompress){ - blobWriteOptions.add(Storage.BlobWriteOption.disableGzipContent()); - } + final boolean gzipCompress = context.getProperty(GZIPCONTENT).asBoolean(); + if (!gzipCompress) { + blobWriteOptions.add(Storage.BlobWriteOption.disableGzipContent()); + } - final HashMap<String, String> userMetadata = new HashMap<>(); - for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { - if (entry.getKey().isDynamic()) { - final String value = context.getProperty( - entry.getKey()).evaluateAttributeExpressions(ff).getValue(); - userMetadata.put(entry.getKey().getName(), value); - } - } + final HashMap<String, String> userMetadata = new HashMap<>(); + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + if (entry.getKey().isDynamic()) { + final String value = context.getProperty( + entry.getKey()).evaluateAttributeExpressions(ff).getValue(); + userMetadata.put(entry.getKey().getName(), value); + } + } + + if (!userMetadata.isEmpty()) { + blobInfoBuilder.setMetadata(userMetadata); + } - if (!userMetadata.isEmpty()) { - blobInfoBuilder.setMetadata(userMetadata); + try { + final Blob blob = storage.createFrom(blobInfoBuilder.build(), + inputStream, + blobWriteOptions.toArray(new Storage.BlobWriteOption[blobWriteOptions.size()]) + ); + + // Create attributes + attributes.put(BUCKET_ATTR, blob.getBucket()); + attributes.put(KEY_ATTR, blob.getName()); + + + if (blob.getSize() != null) { + attributes.put(SIZE_ATTR, String.valueOf(blob.getSize())); + } + + if (blob.getCacheControl() != null) { + attributes.put(CACHE_CONTROL_ATTR, blob.getCacheControl()); + } + + if (blob.getComponentCount() != null) { + attributes.put(COMPONENT_COUNT_ATTR, String.valueOf(blob.getComponentCount())); + } + + if (blob.getContentDisposition() != null) { + attributes.put(CONTENT_DISPOSITION_ATTR, blob.getContentDisposition()); + final Util.ParsedContentDisposition parsed = Util.parseContentDisposition(blob.getContentDisposition()); + + if (parsed != null) { + attributes.put(CoreAttributes.FILENAME.key(), parsed.getFileName()); } + } + + if (blob.getContentEncoding() != null) { + attributes.put(CONTENT_ENCODING_ATTR, blob.getContentEncoding()); + } + + if (blob.getContentLanguage() != null) { + attributes.put(CONTENT_LANGUAGE_ATTR, blob.getContentLanguage()); + } + + if (blob.getContentType() != null) { + attributes.put(CoreAttributes.MIME_TYPE.key(), blob.getContentType()); + } + + if (blob.getCrc32c() != null) { + attributes.put(CRC32C_ATTR, blob.getCrc32c()); + } + + if (blob.getCustomerEncryption() != null) { + final BlobInfo.CustomerEncryption encryption = blob.getCustomerEncryption(); - try { - final Blob blob = storage.createFrom(blobInfoBuilder.build(), - in, - blobWriteOptions.toArray(new Storage.BlobWriteOption[blobWriteOptions.size()]) - ); - - // Create attributes - attributes.put(BUCKET_ATTR, blob.getBucket()); - attributes.put(KEY_ATTR, blob.getName()); - - - if (blob.getSize() != null) { - attributes.put(SIZE_ATTR, String.valueOf(blob.getSize())); - } - - if (blob.getCacheControl() != null) { - attributes.put(CACHE_CONTROL_ATTR, blob.getCacheControl()); - } - - if (blob.getComponentCount() != null) { - attributes.put(COMPONENT_COUNT_ATTR, String.valueOf(blob.getComponentCount())); - } - - if (blob.getContentDisposition() != null) { - attributes.put(CONTENT_DISPOSITION_ATTR, blob.getContentDisposition()); - final Util.ParsedContentDisposition parsed = Util.parseContentDisposition(blob.getContentDisposition()); - - if (parsed != null) { - attributes.put(CoreAttributes.FILENAME.key(), parsed.getFileName()); - } - } - - if (blob.getContentEncoding() != null) { - attributes.put(CONTENT_ENCODING_ATTR, blob.getContentEncoding()); - } - - if (blob.getContentLanguage() != null) { - attributes.put(CONTENT_LANGUAGE_ATTR, blob.getContentLanguage()); - } - - if (blob.getContentType() != null) { - attributes.put(CoreAttributes.MIME_TYPE.key(), blob.getContentType()); - } - - if (blob.getCrc32c() != null) { - attributes.put(CRC32C_ATTR, blob.getCrc32c()); - } - - if (blob.getCustomerEncryption() != null) { - final BlobInfo.CustomerEncryption encryption = blob.getCustomerEncryption(); - - attributes.put(ENCRYPTION_ALGORITHM_ATTR, encryption.getEncryptionAlgorithm()); - attributes.put(ENCRYPTION_SHA256_ATTR, encryption.getKeySha256()); - } - - if (blob.getEtag() != null) { - attributes.put(ETAG_ATTR, blob.getEtag()); - } - - if (blob.getGeneratedId() != null) { - attributes.put(GENERATED_ID_ATTR, blob.getGeneratedId()); - } - - if (blob.getGeneration() != null) { - attributes.put(GENERATION_ATTR, String.valueOf(blob.getGeneration())); - } - - if (blob.getMd5() != null) { - attributes.put(MD5_ATTR, blob.getMd5()); - } - - if (blob.getMediaLink() != null) { - attributes.put(MEDIA_LINK_ATTR, blob.getMediaLink()); - } - - if (blob.getMetageneration() != null) { - attributes.put(METAGENERATION_ATTR, String.valueOf(blob.getMetageneration())); - } - - if (blob.getOwner() != null) { - final Acl.Entity entity = blob.getOwner(); - - if (entity instanceof Acl.User) { - attributes.put(OWNER_ATTR, ((Acl.User) entity).getEmail()); - attributes.put(OWNER_TYPE_ATTR, "user"); - } else if (entity instanceof Acl.Group) { - attributes.put(OWNER_ATTR, ((Acl.Group) entity).getEmail()); - attributes.put(OWNER_TYPE_ATTR, "group"); - } else if (entity instanceof Acl.Domain) { - attributes.put(OWNER_ATTR, ((Acl.Domain) entity).getDomain()); - attributes.put(OWNER_TYPE_ATTR, "domain"); - } else if (entity instanceof Acl.Project) { - attributes.put(OWNER_ATTR, ((Acl.Project) entity).getProjectId()); - attributes.put(OWNER_TYPE_ATTR, "project"); - } - } - - if (blob.getSelfLink() != null) { - attributes.put(URI_ATTR, blob.getSelfLink()); - } - - if (blob.getCreateTimeOffsetDateTime() != null) { - attributes.put(CREATE_TIME_ATTR, String.valueOf(blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli())); - } - - if (blob.getUpdateTimeOffsetDateTime() != null) { - attributes.put(UPDATE_TIME_ATTR, String.valueOf(blob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli())); - } - } catch (StorageException e) { - getLogger().error("Failure completing upload flowfile={} bucket={} key={} reason={}", - ffFilename, bucket, key, e.getMessage(), e); - throw (e); + attributes.put(ENCRYPTION_ALGORITHM_ATTR, encryption.getEncryptionAlgorithm()); + attributes.put(ENCRYPTION_SHA256_ATTR, encryption.getKeySha256()); + } + + if (blob.getEtag() != null) { + attributes.put(ETAG_ATTR, blob.getEtag()); + } + + if (blob.getGeneratedId() != null) { + attributes.put(GENERATED_ID_ATTR, blob.getGeneratedId()); + } + + if (blob.getGeneration() != null) { + attributes.put(GENERATION_ATTR, String.valueOf(blob.getGeneration())); + } + + if (blob.getMd5() != null) { + attributes.put(MD5_ATTR, blob.getMd5()); + } + + if (blob.getMediaLink() != null) { + attributes.put(MEDIA_LINK_ATTR, blob.getMediaLink()); + } + + if (blob.getMetageneration() != null) { + attributes.put(METAGENERATION_ATTR, String.valueOf(blob.getMetageneration())); + } + + if (blob.getOwner() != null) { + final Acl.Entity entity = blob.getOwner(); + + if (entity instanceof Acl.User) { + attributes.put(OWNER_ATTR, ((Acl.User) entity).getEmail()); + attributes.put(OWNER_TYPE_ATTR, "user"); + } else if (entity instanceof Acl.Group) { + attributes.put(OWNER_ATTR, ((Acl.Group) entity).getEmail()); + attributes.put(OWNER_TYPE_ATTR, "group"); + } else if (entity instanceof Acl.Domain) { + attributes.put(OWNER_ATTR, ((Acl.Domain) entity).getDomain()); + attributes.put(OWNER_TYPE_ATTR, "domain"); + } else if (entity instanceof Acl.Project) { + attributes.put(OWNER_ATTR, ((Acl.Project) entity).getProjectId()); + attributes.put(OWNER_TYPE_ATTR, "project"); } + } + if (blob.getSelfLink() != null) { + attributes.put(URI_ATTR, blob.getSelfLink()); + } + + if (blob.getCreateTimeOffsetDateTime() != null) { + attributes.put(CREATE_TIME_ATTR, String.valueOf(blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli())); + } + if (blob.getUpdateTimeOffsetDateTime() != null) { + attributes.put(UPDATE_TIME_ATTR, String.valueOf(blob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli())); } + } catch (StorageException | IOException e) { + getLogger().error("Failure completing upload flowfile={} bucket={} key={} reason={}", + ffFilename, bucket, key, e.getMessage(), e); + throw (e); } - }); + } if (!attributes.isEmpty()) { flowFile = session.putAllAttributes(flowFile, attributes); @@ -527,7 +530,7 @@ public class PutGCSObject extends AbstractGCSProcessor { getLogger().info("Successfully put {} to Google Cloud Storage in {} milliseconds", new Object[]{ff, millis}); - } catch (final ProcessException | StorageException e) { + } catch (final ProcessException | StorageException | IOException e) { getLogger().error("Failed to put {} to Google Cloud Storage due to {}", flowFile, e.getMessage(), e); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java index 2cb446d87a..0277ce0d6b 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java @@ -22,6 +22,17 @@ import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.fileresource.service.api.FileResourceService; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processors.transfer.ResourceTransferSource; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; + import java.io.InputStream; import java.time.Instant; import java.time.LocalDateTime; @@ -31,13 +42,6 @@ import java.time.ZoneOffset; import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; import static com.google.cloud.storage.Storage.PredefinedAcl.BUCKET_OWNER_READ; import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR; @@ -62,10 +66,13 @@ import static org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_TYP import static org.apache.nifi.processors.gcp.storage.StorageAttributes.SIZE_ATTR; import static org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_ATTR; import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR; +import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE; +import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; @@ -174,6 +181,40 @@ public class PutGCSObjectTest extends AbstractGCSTest { assertNull(blobInfo.getCrc32c()); } + @Test + public void testSuccessfulPutOperationFromLocalFileSource() throws Exception { + reset(storageOptions, storage, blob); + when(storageOptions.getHost()).thenReturn(STORAGE_API_URL); + when(storage.getOptions()).thenReturn(storageOptions); + final PutGCSObject processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + + String serviceId = "fileresource"; + FileResourceService service = mock(FileResourceService.class); + InputStream localFileInputStream = mock(InputStream.class); + when(service.getIdentifier()).thenReturn(serviceId); + when(service.getFileResource(anyMap())).thenReturn(new FileResource(localFileInputStream, 10)); + + runner.addControllerService(serviceId, service); + runner.enableControllerService(service); + runner.setProperty(RESOURCE_TRANSFER_SOURCE, ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue()); + runner.setProperty(FILE_RESOURCE_SERVICE, serviceId); + + runner.assertValid(); + + when(storage.createFrom(blobInfoArgumentCaptor.capture(), + inputStreamArgumentCaptor.capture(), + blobWriteOptionArgumentCaptor.capture())).thenReturn(blob); + + runner.enqueue("test"); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutGCSObject.REL_SUCCESS); + runner.assertTransferCount(PutGCSObject.REL_SUCCESS, 1); + assertEquals(inputStreamArgumentCaptor.getValue(), localFileInputStream); + } + @Test public void testSuccessfulPutOperation() throws Exception { reset(storageOptions, storage, blob);