This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 4700fed249 NIFI-10965 PutGoogleDrive 4700fed249 is described below commit 4700fed249cf975ee50168c38cce85d090a0312c Author: krisztina-zsihovszki <zsikr...@gmail.com> AuthorDate: Thu Dec 8 16:58:59 2022 +0100 NIFI-10965 PutGoogleDrive This closes #6832. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../processors/gcp/drive/FetchGoogleDrive.java | 103 +++-- .../gcp/drive/GoogleDriveAttributes.java | 46 ++ .../processors/gcp/drive/GoogleDriveFileInfo.java | 12 +- .../gcp/drive/GoogleDriveFlowFileAttribute.java | 10 +- .../processors/gcp/drive/GoogleDriveTrait.java | 16 + .../nifi/processors/gcp/drive/ListGoogleDrive.java | 59 +-- .../nifi/processors/gcp/drive/PutGoogleDrive.java | 501 +++++++++++++++++++++ .../services/org.apache.nifi.processor.Processor | 1 + .../additionalDetails.html | 16 + .../additionalDetails.html | 15 +- .../gcp/drive/AbstractGoogleDriveIT.java | 10 +- .../gcp/drive/AbstractGoogleDriveTest.java | 108 +++++ .../processors/gcp/drive/FetchGoogleDriveIT.java | 46 +- .../processors/gcp/drive/FetchGoogleDriveTest.java | 120 +++++ ...est.java => ListGoogleDriveTestRunnerTest.java} | 42 +- .../processors/gcp/drive/PutGoogleDriveIT.java | 215 +++++++++ .../processors/gcp/drive/PutGoogleDriveTest.java | 253 +++++++++++ 17 files changed, 1455 insertions(+), 118 deletions(-) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java index 347534fe9e..c958cd4489 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java @@ -16,10 +16,34 @@ */ package org.apache.nifi.processors.gcp.drive; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC; + import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.services.drive.Drive; import com.google.api.services.drive.DriveScopes; +import com.google.api.services.drive.model.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -39,31 +63,28 @@ import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory; import org.apache.nifi.processors.gcp.util.GoogleUtils; import org.apache.nifi.proxy.ProxyConfiguration; -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @Tags({"google", "drive", "storage", "fetch"}) @CapabilityDescription("Fetches files from a Google Drive Folder. Designed to be used in tandem with ListGoogleDrive. " + - "For how to setup access to Google Drive please see additional details.") -@SeeAlso({ListGoogleDrive.class}) + "Please see Additional Details to set up access to Google Drive.") +@SeeAlso({ListGoogleDrive.class, PutGoogleDrive.class}) +@ReadsAttribute(attribute = ID, description = ID_DESC) @WritesAttributes({ - @WritesAttribute(attribute = FetchGoogleDrive.ERROR_CODE_ATTRIBUTE, description = "The error code returned by Google Drive when the fetch of a file fails"), - @WritesAttribute(attribute = FetchGoogleDrive.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by Google Drive when the fetch of a file fails") + @WritesAttribute(attribute = ID, description = ID_DESC), + @WritesAttribute(attribute = "filename", description = FILENAME_DESC), + @WritesAttribute(attribute = "mime.type", description = MIME_TYPE_DESC), + @WritesAttribute(attribute = SIZE, description = SIZE_DESC), + @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC), + @WritesAttribute(attribute = ERROR_CODE, description = ERROR_CODE_DESC), + @WritesAttribute(attribute = ERROR_MESSAGE, description = ERROR_MESSAGE_DESC) }) public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTrait { - public static final String ERROR_CODE_ATTRIBUTE = "error.code"; - public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message"; public static final PropertyDescriptor FILE_ID = new PropertyDescriptor .Builder().name("drive-file-id") .displayName("File ID") - .description("The Drive ID of the File to fetch") + .description("The Drive ID of the File to fetch. " + + "Please see Additional Details to obtain Drive ID.") .required(true) .defaultValue("${drive.id}") .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) @@ -73,12 +94,12 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") - .description("A flowfile will be routed here for each successfully fetched File.") + .description("A FlowFile will be routed here for each successfully fetched File.") .build(); public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") - .description("A flowfile will be routed here for each File for which fetch was attempted but failed.") + .description("A FlowFile will be routed here for each File for which fetch was attempted but failed.") .build(); private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( @@ -87,7 +108,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS) )); - public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( REL_SUCCESS, REL_FAILURE ))); @@ -101,7 +122,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr @Override public Set<Relationship> getRelationships() { - return relationships; + return RELATIONSHIPS; } @OnScheduled @@ -122,13 +143,20 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr return; } - String fileId = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue(); + final String fileId = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue(); - FlowFile outFlowFile = flowFile; + final long startNanos = System.nanoTime(); try { - outFlowFile = fetchFile(fileId, session, outFlowFile); + flowFile = fetchFile(fileId, session, flowFile); + + final File fileMetadata = fetchFileMetadata(fileId); + final Map<String, String> attributes = createAttributeMap(fileMetadata); + flowFile = session.putAllAttributes(flowFile, attributes); - session.transfer(outFlowFile, REL_SUCCESS); + final String url = DRIVE_URL + fileMetadata.getId(); + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().fetch(flowFile, url, transferMillis); + session.transfer(flowFile, REL_SUCCESS); } catch (GoogleJsonResponseException e) { handleErrorResponse(session, fileId, flowFile, e); } catch (Exception e) { @@ -136,31 +164,40 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr } } - FlowFile fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws IOException { - InputStream driveFileInputStream = driveService + private FlowFile fetchFile(String fileId, ProcessSession session, FlowFile flowFile) throws IOException { + try (final InputStream driveFileInputStream = driveService .files() .get(fileId) - .executeMediaAsInputStream(); + .executeMediaAsInputStream()) { - outFlowFile = session.importFrom(driveFileInputStream, outFlowFile); + return session.importFrom(driveFileInputStream, flowFile); + } + } - return outFlowFile; + private File fetchFileMetadata(String fileId) throws IOException { + return driveService + .files() + .get(fileId) + .setFields("id, name, createdTime, mimeType, size") + .execute(); } - private void handleErrorResponse(ProcessSession session, String fileId, FlowFile outFlowFile, GoogleJsonResponseException e) { + private void handleErrorResponse(ProcessSession session, String fileId, FlowFile flowFile, GoogleJsonResponseException e) { getLogger().error("Couldn't fetch file with id '{}'", fileId, e); - outFlowFile = session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, "" + e.getStatusCode()); - outFlowFile = session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage()); + flowFile = session.putAttribute(flowFile, ERROR_CODE, "" + e.getStatusCode()); + flowFile = session.putAttribute(flowFile, ERROR_MESSAGE, e.getMessage()); - session.transfer(outFlowFile, REL_FAILURE); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); } private void handleUnexpectedError(ProcessSession session, FlowFile flowFile, String fileId, Exception e) { getLogger().error("Unexpected error while fetching and processing file with id '{}'", fileId, e); - flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage()); + flowFile = session.putAttribute(flowFile, ERROR_MESSAGE, e.getMessage()); + flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); } } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveAttributes.java new file mode 100644 index 0000000000..6c4eb47fe5 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveAttributes.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.drive; + +import org.apache.nifi.flowfile.attributes.CoreAttributes; + +public class GoogleDriveAttributes { + + public static final String ID = "drive.id"; + public static final String ID_DESC = "The id of the file"; + + public static final String FILENAME = CoreAttributes.FILENAME.key(); + public static final String FILENAME_DESC = "The name of the file"; + + public static final String SIZE = "drive.size"; + public static final String SIZE_DESC = "The size of the file"; + + public static final String TIMESTAMP = "drive.timestamp"; + public static final String TIMESTAMP_DESC = "The last modified time or created time (whichever is greater) of the file." + + " The reason for this is that the original modified date of a file is preserved when uploaded to Google Drive." + + " 'Created time' takes the time when the upload occurs. However uploaded files can still be modified later."; + + public static final String MIME_TYPE = CoreAttributes.MIME_TYPE.key(); + public static final String MIME_TYPE_DESC = "The MIME type of the file"; + + public static final String ERROR_MESSAGE = "error.message"; + public static final String ERROR_MESSAGE_DESC = "The error message returned by Google Drive"; + + public static final String ERROR_CODE = "error.code"; + public static final String ERROR_CODE_DESC = "The error code returned by Google Drive"; + +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFileInfo.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFileInfo.java index f4a1a0a3cf..7859a57bdd 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFileInfo.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFileInfo.java @@ -16,6 +16,12 @@ */ package org.apache.nifi.processors.gcp.drive; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP; + import org.apache.nifi.processor.util.list.ListableEntity; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; @@ -30,12 +36,6 @@ import java.util.List; import java.util.Map; public class GoogleDriveFileInfo implements ListableEntity { - public static final String ID = "drive.id"; - public static final String FILENAME = "filename"; - public static final String SIZE = "drive.size"; - public static final String TIMESTAMP = "drive.timestamp"; - public static final String MIME_TYPE = "mime.type"; - private static final RecordSchema SCHEMA; static { diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFlowFileAttribute.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFlowFileAttribute.java index 19f73fa398..95f07a6a20 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFlowFileAttribute.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFlowFileAttribute.java @@ -22,17 +22,17 @@ import java.util.Optional; import java.util.function.Function; public enum GoogleDriveFlowFileAttribute { - ID(GoogleDriveFileInfo.ID, GoogleDriveFileInfo::getId), - FILE_NAME(GoogleDriveFileInfo.FILENAME, GoogleDriveFileInfo::getName), - SIZE(GoogleDriveFileInfo.SIZE, fileInfo -> Optional.ofNullable(fileInfo.getSize()) + ID(GoogleDriveAttributes.ID, GoogleDriveFileInfo::getId), + FILENAME(GoogleDriveAttributes.FILENAME, GoogleDriveFileInfo::getName), + SIZE(GoogleDriveAttributes.SIZE, fileInfo -> Optional.ofNullable(fileInfo.getSize()) .map(String::valueOf) .orElse(null) ), - TIME_STAMP(GoogleDriveFileInfo.TIMESTAMP, fileInfo -> Optional.ofNullable(fileInfo.getTimestamp()) + TIMESTAMP(GoogleDriveAttributes.TIMESTAMP, fileInfo -> Optional.ofNullable(fileInfo.getTimestamp()) .map(String::valueOf) .orElse(null) ), - MIME_TYPE(GoogleDriveFileInfo.MIME_TYPE, GoogleDriveFileInfo::getMimeType); + MIME_TYPE(GoogleDriveAttributes.MIME_TYPE, GoogleDriveFileInfo::getMimeType); private final String name; private final Function<GoogleDriveFileInfo, String> fromFileInfo; diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveTrait.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveTrait.java index 5794c0b598..2cf18d9752 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveTrait.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveTrait.java @@ -20,8 +20,11 @@ import com.google.api.client.http.HttpTransport; import com.google.api.client.json.JsonFactory; import com.google.api.client.json.gson.GsonFactory; import com.google.api.services.drive.Drive; +import com.google.api.services.drive.model.File; import com.google.auth.http.HttpCredentialsAdapter; import com.google.auth.oauth2.GoogleCredentials; +import java.util.HashMap; +import java.util.Map; import org.apache.nifi.gcp.credentials.service.GCPCredentialsService; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.gcp.util.GoogleUtils; @@ -30,6 +33,9 @@ import java.util.Arrays; import java.util.Collection; public interface GoogleDriveTrait { + + String DRIVE_FOLDER_MIME_TYPE = "application/vnd.google-apps.folder"; + String DRIVE_URL = "https://drive.google.com/open?id=" ; String APPLICATION_NAME = "NiFi"; JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance(); @@ -66,4 +72,14 @@ public interface GoogleDriveTrait { return gcpCredentialsService.getGoogleCredentials(); } + + default Map<String, String> createAttributeMap(File file) { + final Map<String, String> attributes = new HashMap<>(); + attributes.put(GoogleDriveAttributes.ID, file.getId()); + attributes.put(GoogleDriveAttributes.FILENAME, file.getName()); + attributes.put(GoogleDriveAttributes.MIME_TYPE, file.getMimeType()); + attributes.put(GoogleDriveAttributes.TIMESTAMP, String.valueOf(file.getCreatedTime())); + attributes.put(GoogleDriveAttributes.SIZE, String.valueOf(file.getSize())); + return attributes; + } } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/ListGoogleDrive.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/ListGoogleDrive.java index 60f4d789cd..4488f9006a 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/ListGoogleDrive.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/ListGoogleDrive.java @@ -16,12 +16,36 @@ */ package org.apache.nifi.processors.gcp.drive; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC; + import com.google.api.client.http.HttpTransport; import com.google.api.client.util.DateTime; import com.google.api.services.drive.Drive; import com.google.api.services.drive.DriveScopes; import com.google.api.services.drive.model.File; import com.google.api.services.drive.model.FileList; +import java.io.IOException; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; @@ -50,22 +74,6 @@ import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.serialization.record.RecordSchema; -import java.io.IOException; -import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.TimeUnit; - @PrimaryNodeOnly @TriggerSerially @Tags({"google", "drive", "storage"}) @@ -74,16 +82,15 @@ import java.util.concurrent.TimeUnit; "Or - in case the 'Record Writer' property is set - the entire result is written as records to a single flowfile. " + "This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the " + "previous node left off without duplicating all of the data. " + - "For how to setup access to Google Drive please see additional details.") -@SeeAlso({FetchGoogleDrive.class}) + "Please see Additional Details to set up access to Google Drive.") +@SeeAlso({FetchGoogleDrive.class, PutGoogleDrive.class}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@WritesAttributes({@WritesAttribute(attribute = GoogleDriveFileInfo.ID, description = "The id of the file"), - @WritesAttribute(attribute = GoogleDriveFileInfo.FILENAME, description = "The name of the file"), - @WritesAttribute(attribute = GoogleDriveFileInfo.SIZE, description = "The size of the file"), - @WritesAttribute(attribute = GoogleDriveFileInfo.TIMESTAMP, description = "The last modified time or created time (whichever is greater) of the file." + - " The reason for this is that the original modified date of a file is preserved when uploaded to Google Drive." + - " 'Created time' takes the time when the upload occurs. However uploaded files can still be modified later."), - @WritesAttribute(attribute = GoogleDriveFileInfo.MIME_TYPE, description = "MimeType of the file")}) +@WritesAttributes({ + @WritesAttribute(attribute = ID, description = ID_DESC), + @WritesAttribute(attribute = "filename", description = FILENAME_DESC), + @WritesAttribute(attribute = "mime.type", description = MIME_TYPE_DESC), + @WritesAttribute(attribute = SIZE, description = SIZE_DESC), + @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC)}) @Stateful(scopes = {Scope.CLUSTER}, description = "The processor stores necessary data to be able to keep track what files have been listed already." + " What exactly needs to be stored depends on the 'Listing Strategy'." + " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up" + @@ -94,7 +101,7 @@ public class ListGoogleDrive extends AbstractListProcessor<GoogleDriveFileInfo> .name("folder-id") .displayName("Folder ID") .description("The ID of the folder from which to pull list of files." + - " For how to setup access to Google Drive and obtain Folder ID please see additional details." + + " Please see Additional Details to set up access to Google Drive and obtain Folder ID." + " WARNING: Unauthorized access to the folder is treated as if the folder was empty." + " This results in the processor not creating outgoing FlowFiles. No additional error message is provided.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java new file mode 100644 index 0000000000..0276c3a0eb --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.drive; + +import static java.lang.String.format; +import static java.lang.String.valueOf; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static java.util.stream.Collectors.joining; +import static org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.createRegexMatchingValidator; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC; +import static org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.googleapis.media.MediaHttpUploader; +import com.google.api.client.http.GenericUrl; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpStatusCodes; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.InputStreamContent; +import com.google.api.client.util.DateTime; +import com.google.api.services.drive.Drive; +import com.google.api.services.drive.DriveRequest; +import com.google.api.services.drive.DriveScopes; +import com.google.api.services.drive.model.File; +import com.google.api.services.drive.model.FileList; +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory; +import org.apache.nifi.proxy.ProxyConfiguration; +import org.json.JSONObject; + +@SeeAlso({ListGoogleDrive.class, FetchGoogleDrive.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"google", "drive", "storage", "put"}) +@CapabilityDescription("Puts content to a Google Drive Folder.") +@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the Google Drive object.") +@WritesAttributes({ + @WritesAttribute(attribute = ID, description = ID_DESC), + @WritesAttribute(attribute = "filename", description = FILENAME_DESC), + @WritesAttribute(attribute = "mime.type", description = MIME_TYPE_DESC), + @WritesAttribute(attribute = SIZE, description = SIZE_DESC), + @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC), + @WritesAttribute(attribute = ERROR_CODE, description = ERROR_CODE_DESC), + @WritesAttribute(attribute = ERROR_MESSAGE, description = ERROR_MESSAGE_DESC)}) +public class PutGoogleDrive extends AbstractProcessor implements GoogleDriveTrait { + + public static final String IGNORE_RESOLUTION = "ignore"; + public static final String REPLACE_RESOLUTION = "replace"; + public static final String FAIL_RESOLUTION = "fail"; + public static final int MIN_ALLOWED_CHUNK_SIZE_IN_BYTES = MediaHttpUploader.MINIMUM_CHUNK_SIZE; + public static final int MAX_ALLOWED_CHUNK_SIZE_IN_BYTES = 1024 * 1024 * 1024; + + public static final PropertyDescriptor FOLDER_ID = new PropertyDescriptor.Builder() + .name("folder-id") + .displayName("Folder ID") + .description("The ID of the shared folder. " + + " Please see Additional Details to set up access to Google Drive and obtain Folder ID.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .build(); + + public static final PropertyDescriptor SUBFOLDER_NAME = new PropertyDescriptor.Builder() + .name("subfolder-name") + .displayName("Subfolder Name") + .description("The name (path) of the subfolder where files are uploaded. The subfolder name is relative to the shared folder specified by 'Folder ID'." + + " Example: subFolder, subFolder1/subfolder2") + .addValidator(createRegexMatchingValidator(Pattern.compile("^(?!/).+(?<!/)$"), false, + "Subfolder Name should not contain leading or trailing slash ('/') character.")) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .build(); + + public static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder() + .name("file-name") + .displayName("Filename") + .description("The name of the file to upload to the specified Google Drive folder.") + .required(true) + .defaultValue("${filename}") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CREATE_SUBFOLDER = new PropertyDescriptor.Builder() + .name("create-subfolder") + .displayName("Create Subfolder") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("false") + .dependsOn(SUBFOLDER_NAME) + .description("Specifies whether to automatically create the subfolder specified by 'Folder Name' if it does not exist. " + + "Permission to list folders is required. ") + .build(); + + public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder() + .name("conflict-resolution-strategy") + .displayName("Conflict Resolution Strategy") + .description("Indicates what should happen when a file with the same name already exists in the specified Google Drive folder.") + .required(true) + .defaultValue(FAIL_RESOLUTION) + .allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION, REPLACE_RESOLUTION) + .build(); + + public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new PropertyDescriptor.Builder() + .name("chunked-upload-size") + .displayName("Chunked Upload Size") + .description("Defines the size of a chunk. Used when a FlowFile's size exceeds 'Chunked Upload Threshold' and content is uploaded in smaller chunks. " + + "Minimum allowed chunk size is 256 KB, maximum allowed chunk size is 1 GB.") + .addValidator(createChunkSizeValidator()) + .defaultValue("10 MB") + .required(false) + .build(); + + public static final PropertyDescriptor CHUNKED_UPLOAD_THRESHOLD = new PropertyDescriptor.Builder() + .name("chunked-upload-threshold") + .displayName("Chunked Upload Threshold") + .description("The maximum size of the content which is uploaded at once. FlowFiles larger than this threshold are uploaded in chunks.") + .defaultValue("100 MB") + .addValidator(DATA_SIZE_VALIDATOR) + .required(false) + .build(); + + public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(asList( + GCP_CREDENTIALS_PROVIDER_SERVICE, + FOLDER_ID, + SUBFOLDER_NAME, + CREATE_SUBFOLDER, + FILE_NAME, + CONFLICT_RESOLUTION, + CHUNKED_UPLOAD_THRESHOLD, + CHUNKED_UPLOAD_SIZE, + ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS) + )); + + public static final Relationship REL_SUCCESS = + new Relationship.Builder() + .name("success") + .description("Files that have been successfully written to Google Drive are transferred to this relationship.") + .build(); + + public static final Relationship REL_FAILURE = + new Relationship.Builder() + .name("failure") + .description("Files that could not be written to Google Drive for some reason are transferred to this relationship.") + .build(); + + public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(asList( + REL_SUCCESS, + REL_FAILURE + ))); + + public static final String MULTIPART_UPLOAD_URL = "https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart"; + + private volatile Drive driveService; + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public List<ValidationResult> customValidate(ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext)); + + final long chunkUploadThreshold = validationContext.getProperty(CHUNKED_UPLOAD_THRESHOLD) + .asDataSize(DataUnit.B) + .longValue(); + + final int uploadChunkSize = validationContext.getProperty(CHUNKED_UPLOAD_SIZE) + .asDataSize(DataUnit.B) + .intValue(); + + if (uploadChunkSize > chunkUploadThreshold) { + results.add(new ValidationResult.Builder() + .subject(CHUNKED_UPLOAD_SIZE.getDisplayName()) + .explanation(format("%s should not be bigger than %s", CHUNKED_UPLOAD_SIZE.getDisplayName(), CHUNKED_UPLOAD_THRESHOLD.getDisplayName())) + .valid(false) + .build()); + } + + return results; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + String folderId = context.getProperty(FOLDER_ID).evaluateAttributeExpressions(flowFile).getValue(); + final String subfolderName = context.getProperty(SUBFOLDER_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final boolean createFolder = context.getProperty(CREATE_SUBFOLDER).asBoolean(); + final String filename = context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); + + try { + folderId = subfolderName != null ? getOrCreateParentSubfolder(subfolderName, folderId, createFolder).getId() : folderId; + + final long startNanos = System.nanoTime(); + final long size = flowFile.getSize(); + + final long chunkUploadThreshold = context.getProperty(CHUNKED_UPLOAD_THRESHOLD) + .asDataSize(DataUnit.B) + .longValue(); + + final int uploadChunkSize = context.getProperty(CHUNKED_UPLOAD_SIZE) + .asDataSize(DataUnit.B) + .intValue(); + + final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue(); + + final Optional<File> alreadyExistingFile = checkFileExistence(filename, folderId); + final File fileMetadata = alreadyExistingFile.isPresent() ? alreadyExistingFile.get() : createMetadata(filename, folderId); + + if (alreadyExistingFile.isPresent() && FAIL_RESOLUTION.equals(conflictResolution)) { + getLogger().error("File '{}' already exists in {} folder, conflict resolution is '{}'", filename, getFolderName(subfolderName), FAIL_RESOLUTION); + flowFile = addAttributes(alreadyExistingFile.get(), flowFile, session); + session.transfer(flowFile, REL_FAILURE); + return; + } + + if (alreadyExistingFile.isPresent() && IGNORE_RESOLUTION.equals(conflictResolution)) { + getLogger().info("File '{}' already exists in {} folder, conflict resolution is '{}'", filename, getFolderName(subfolderName), IGNORE_RESOLUTION); + flowFile = addAttributes(alreadyExistingFile.get(), flowFile, session); + session.transfer(flowFile, REL_SUCCESS); + return; + } + + final File uploadedFile; + + try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream bufferedInputStream = new BufferedInputStream(rawIn)) { + final InputStreamContent mediaContent = new InputStreamContent(mimeType, bufferedInputStream); + mediaContent.setLength(size); + + final DriveRequest<File> driveRequest = createDriveRequest(fileMetadata, mediaContent); + + if (size > chunkUploadThreshold) { + uploadedFile = uploadFileInChunks(driveRequest, fileMetadata, uploadChunkSize, mediaContent); + } else { + uploadedFile = driveRequest.execute(); + } + } + + if (uploadedFile != null) { + final Map<String, String> attributes = createAttributeMap(uploadedFile); + final String url = DRIVE_URL + uploadedFile.getId(); + flowFile = session.putAllAttributes(flowFile, attributes); + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, url, transferMillis); + } + session.transfer(flowFile, REL_SUCCESS); + } catch (GoogleJsonResponseException e) { + getLogger().error("Exception occurred while uploading file '{}' to {} Google Drive folder", filename, + getFolderName(subfolderName), e); + handleExpectedError(session, flowFile, e); + } catch (Exception e) { + getLogger().error("Exception occurred while uploading file '{}' to {} Google Drive folder", filename, + getFolderName(subfolderName), e); + + if (e.getCause() != null && e.getCause() instanceof GoogleJsonResponseException) { + handleExpectedError(session, flowFile, (GoogleJsonResponseException) e.getCause()); + } else { + handleUnexpectedError(session, flowFile, e); + } + } + } + + @OnScheduled + public void onScheduled(final ProcessContext context) throws IOException { + final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context); + + final HttpTransport httpTransport = new ProxyAwareTransportFactory(proxyConfiguration).create(); + + driveService = createDriveService(context, httpTransport, DriveScopes.DRIVE, DriveScopes.DRIVE_METADATA); + } + + private FlowFile addAttributes(File file, FlowFile flowFile, ProcessSession session) { + final Map<String, String> attributes = new HashMap<>(); + attributes.put(ID, file.getId()); + attributes.put(FILENAME, file.getName()); + return session.putAllAttributes(flowFile, attributes); + } + + private String getFolderName(String subFolderName) { + return subFolderName == null ? "shared" : format("'%s'", subFolderName); + } + + private DriveRequest<File> createDriveRequest(File fileMetadata, final InputStreamContent mediaContent) throws IOException { + if (fileMetadata.getId() == null) { + return driveService.files() + .create(fileMetadata, mediaContent) + .setFields("id, name, createdTime, mimeType, size"); + } else { + return driveService.files() + .update(fileMetadata.getId(), new File(), mediaContent) + .setFields("id, name, createdTime, mimeType, size"); + } + } + + private File uploadFileInChunks(DriveRequest<File> driveRequest, File fileMetadata, final int chunkSize, final InputStreamContent mediaContent) throws IOException { + final HttpResponse response = driveRequest + .getMediaHttpUploader() + .setChunkSize(chunkSize) + .setDirectUploadEnabled(false) + .upload(new GenericUrl(MULTIPART_UPLOAD_URL)); + + if (response.getStatusCode() == HttpStatusCodes.STATUS_CODE_OK) { + fileMetadata.setId(getUploadedFileId(response.getContent())); + fileMetadata.setMimeType(mediaContent.getType()); + fileMetadata.setCreatedTime(new DateTime(System.currentTimeMillis())); + fileMetadata.setSize(mediaContent.getLength()); + return fileMetadata; + } else { + throw new ProcessException(format("Upload of file '%s' to folder '%s' failed, HTTP error code: %d", fileMetadata.getName(), fileMetadata.getId(), response.getStatusCode())); + } + } + + private String getUploadedFileId(final InputStream content) { + final String contentAsString = new BufferedReader(new InputStreamReader(content, UTF_8)) + .lines() + .collect(joining("\n")); + return new JSONObject(contentAsString).getString("id"); + } + + private File getOrCreateParentSubfolder(String folderName, String parentFolderId, boolean createFolder) throws IOException { + final int indexOfPathSeparator = folderName.indexOf("/"); + + if (isMultiLevelFolder(indexOfPathSeparator, folderName)) { + final String mainFolderName = folderName.substring(0, indexOfPathSeparator); + final String subFolders = folderName.substring(indexOfPathSeparator + 1); + final File mainFolder = getOrCreateFolder(mainFolderName, parentFolderId, createFolder); + return getOrCreateParentSubfolder(subFolders, mainFolder.getId(), createFolder); + } else { + return getOrCreateFolder(folderName, parentFolderId, createFolder); + } + } + + private boolean isMultiLevelFolder(int indexOfPathSeparator, String folderName) { + return indexOfPathSeparator > 0 && indexOfPathSeparator < folderName.length() - 1; + } + + private File getOrCreateFolder(String folderName, String parentFolderId, boolean createFolder) throws IOException { + final Optional<File> existingFolder = checkFolderExistence(folderName, parentFolderId); + + if (existingFolder.isPresent()) { + return existingFolder.get(); + } + + if (createFolder) { + getLogger().debug("Create folder " + folderName + " parent id: " + parentFolderId); + final File folderMetadata = createMetadata(folderName, parentFolderId); + folderMetadata.setMimeType(DRIVE_FOLDER_MIME_TYPE); + + return driveService.files() + .create(folderMetadata) + .setFields("id, parents") + .execute(); + } else { + throw new ProcessException(format("The specified subfolder '%s' does not exist and '%s' is false.", folderName, CREATE_SUBFOLDER.getDisplayName())); + } + } + + private File createMetadata(final String name, final String parentId) { + final File metadata = new File(); + metadata.setName(name); + metadata.setParents(singletonList(parentId)); + return metadata; + } + + private Optional<File> checkFolderExistence(String folderName, String parentId) throws IOException { + return checkObjectExistence(format("mimeType='%s' and name='%s' and ('%s' in parents)", DRIVE_FOLDER_MIME_TYPE, folderName, parentId)); + } + + private Optional<File> checkFileExistence(String fileName, String parentId) throws IOException { + return checkObjectExistence(format("name='%s' and ('%s' in parents)", fileName, parentId)); + } + + private Optional<File> checkObjectExistence(String query) throws IOException { + final FileList result = driveService.files() + .list() + .setQ(query) + .setFields("files(name, id)") + .execute(); + + return result.getFiles().stream() + .findFirst(); + } + + private void handleUnexpectedError(final ProcessSession session, FlowFile flowFile, final Exception e) { + flowFile = session.putAttribute(flowFile, GoogleDriveAttributes.ERROR_MESSAGE, e.getMessage()); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + + private void handleExpectedError(final ProcessSession session, FlowFile flowFile, final GoogleJsonResponseException e) { + flowFile = session.putAttribute(flowFile, GoogleDriveAttributes.ERROR_MESSAGE, e.getMessage()); + flowFile = session.putAttribute(flowFile, GoogleDriveAttributes.ERROR_CODE, valueOf(e.getStatusCode())); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + + private static Validator createChunkSizeValidator() { + return (subject, input, context) -> { + final ValidationResult vr = StandardValidators.createDataSizeBoundsValidator(MIN_ALLOWED_CHUNK_SIZE_IN_BYTES, MAX_ALLOWED_CHUNK_SIZE_IN_BYTES) + .validate(subject, input, context); + if (!vr.isValid()) { + return vr; + } + + final long dataSizeBytes = DataUnit.parseDataSize(input, DataUnit.B).longValue(); + + if (dataSizeBytes % MIN_ALLOWED_CHUNK_SIZE_IN_BYTES != 0 ) { + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("Must be a positive multiple of " + MIN_ALLOWED_CHUNK_SIZE_IN_BYTES + " bytes") + .build(); + } + + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(true) + .build(); + }; + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 99a4df40a3..1fe8dd6535 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -25,6 +25,7 @@ org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming org.apache.nifi.processors.gcp.drive.ListGoogleDrive org.apache.nifi.processors.gcp.drive.FetchGoogleDrive +org.apache.nifi.processors.gcp.drive.PutGoogleDrive org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateImagesOperation org.apache.nifi.processors.gcp.vision.StartGcpVisionAnnotateFilesOperation org.apache.nifi.processors.gcp.vision.GetGcpVisionAnnotateImagesOperationStatus diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.FetchGoogleDrive/additionalDetails.html b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.FetchGoogleDrive/additionalDetails.html index cdcd1421fe..7b73ebb050 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.FetchGoogleDrive/additionalDetails.html +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.FetchGoogleDrive/additionalDetails.html @@ -45,6 +45,22 @@ <li>Enter the service account email.</li> </ul> </li> + <li><b>Find File ID</b> + </br> + Usually FetchGoogleDrive is used with ListGoogleDrive and 'drive.id' is set.</br> + In case 'drive.id' is not available, you can find the Drive ID of the file in the following way: + </br> + <ul> + <li>Right-click on the file and select "Get Link".</li> + <li>In the pop-up window click on "Copy Link".</li> + <li>You can obtain the file ID from the URL copied to clipboard. + For example, if the URL were <code>https://drive.google.com/file/d/16ALV9KIU_KKeNG557zyctqy2Fmzyqtq/view?usp=share_link</code>,<br> + the File ID would be <code>16ALV9KIU_KKeNG557zyctqy2Fmzyqtq</code> + </li> + </ul> + </li> + <li><b>Set File ID in 'File ID' property</b> + </li> </ol> </body> diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.FetchGoogleDrive/additionalDetails.html b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.PutGoogleDrive/additionalDetails.html similarity index 77% copy from nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.FetchGoogleDrive/additionalDetails.html copy to nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.PutGoogleDrive/additionalDetails.html index cdcd1421fe..89c408458d 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.FetchGoogleDrive/additionalDetails.html +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.drive.PutGoogleDrive/additionalDetails.html @@ -17,7 +17,7 @@ <head> <meta charset="utf-8"/> - <title>FetchGoogleDrive</title> + <title>PutGoogleDrive</title> <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/> </head> <body> @@ -40,11 +40,22 @@ <ul> <li>In Google Cloud Console navigate to IAM & Admin -> Service Accounts.</li> <li>Take a note of the email of the service account you are going to use.</li> - <li>Navigate to the folder to be listed in Google Drive.</li> + <li>Navigate to the folder in Google Drive which will be used as the base folder.</li> <li>Right-click on the Folder -> Share.</li> <li>Enter the service account email.</li> </ul> </li> + <li><b>Find Folder ID</b> + <ul> + <li>Navigate to the folder to be listed in Google Drive and enter it. The URL in your browser will include the ID at the end of + the URL. + For example, if the URL were <code>https://drive.google.com/drive/folders/1trTraPVCnX5_TNwO8d9P_bz278xWOmGm</code>, the + Folder ID would be <code>1trTraPVCnX5_TNwO8d9P_bz278xWOmGm</code> + </li> + </ul> + </li> + <li><b>Set Folder ID in 'Folder ID' property</b> + </li> </ol> </body> diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java index 0176803602..d2a16a709a 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java @@ -49,11 +49,10 @@ import java.util.Arrays; * WARNING: The creation of a file is not a synchronized operation, may need to adjust tests accordingly! */ public abstract class AbstractGoogleDriveIT<T extends GoogleDriveTrait & Processor> { - private static final String CREDENTIAL_JSON_FILE_PATH = ""; - private static final String SHARED_FOLDER_ID = ""; - + protected static final String SHARED_FOLDER_ID = ""; protected static final String DEFAULT_FILE_CONTENT = "test_content"; + private static final String CREDENTIAL_JSON_FILE_PATH = ""; private static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance(); protected T testSubject; @@ -101,6 +100,7 @@ public abstract class AbstractGoogleDriveIT<T extends GoogleDriveTrait & Process GCPCredentialsControllerService gcpCredentialsControllerService = new GCPCredentialsControllerService(); testRunner.addControllerService("gcp_credentials_provider_service", gcpCredentialsControllerService); + testRunner.setProperty(gcpCredentialsControllerService, CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE, CREDENTIAL_JSON_FILE_PATH); testRunner.enableControllerService(gcpCredentialsControllerService); testRunner.setProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, "gcp_credentials_provider_service"); @@ -140,7 +140,7 @@ public abstract class AbstractGoogleDriveIT<T extends GoogleDriveTrait & Process Drive.Files.Create create = driveService.files() .create(fileMetadata, content) - .setFields("id, name, modifiedTime"); + .setFields("id, name, modifiedTime, createdTime"); File file = create.execute(); @@ -150,4 +150,6 @@ public abstract class AbstractGoogleDriveIT<T extends GoogleDriveTrait & Process public TestRunner getTestRunner() { return testRunner; } + + } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveTest.java new file mode 100644 index 0000000000..d868d27eff --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.gcp.drive; + +import static java.util.Collections.singletonList; +import static java.util.stream.Collectors.toSet; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Answers.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.api.client.util.DateTime; +import com.google.api.services.drive.Drive; +import com.google.api.services.drive.model.File; +import java.util.Collections; +import java.util.Set; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; +import org.apache.nifi.processors.gcp.util.GoogleUtils; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.junit.jupiter.api.BeforeEach; +import org.mockito.Mock; +import org.mockito.Mockito; + +public class AbstractGoogleDriveTest { + public static final String CONTENT = "1234567890"; + public static final String TEST_FILENAME = "testFile"; + public static final String TEST_FILE_ID = "fileId"; + public static final String SUBFOLDER_NAME = "subFolderName"; + public static final String SHARED_FOLDER_ID = "sharedFolderId"; + public static final String SUBFOLDER_ID = "subFolderId"; + public static final long TEST_SIZE = 42; + public static final long CREATED_TIME = 1659707000; + public static final String TEXT_TYPE = "text/plain"; + + protected TestRunner testRunner; + + @Mock(answer = RETURNS_DEEP_STUBS) + protected Drive mockDriverService; + + + @BeforeEach + protected void setUp() throws Exception { + String gcpCredentialsControllerServiceId = "gcp_credentials_provider_service"; + + final GCPCredentialsControllerService gcpCredentialsControllerService = mock(GCPCredentialsControllerService.class, Mockito.RETURNS_DEEP_STUBS); + when(gcpCredentialsControllerService.getIdentifier()).thenReturn(gcpCredentialsControllerServiceId); + + testRunner.addControllerService(gcpCredentialsControllerServiceId, gcpCredentialsControllerService); + testRunner.enableControllerService(gcpCredentialsControllerService); + testRunner.setProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, gcpCredentialsControllerServiceId); + } + + protected void assertFlowFileAttributes(Relationship relationship) { + final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(relationship).get(0); + flowFile.assertAttributeEquals(GoogleDriveAttributes.ID, TEST_FILE_ID); + flowFile.assertAttributeEquals(GoogleDriveAttributes.FILENAME, TEST_FILENAME); + flowFile.assertAttributeEquals(GoogleDriveAttributes.TIMESTAMP, String.valueOf(new DateTime(CREATED_TIME))); + flowFile.assertAttributeEquals(GoogleDriveAttributes.SIZE, Long.toString(TEST_SIZE)); + flowFile.assertAttributeEquals(GoogleDriveAttributes.MIME_TYPE, TEXT_TYPE); + } + + protected void assertProvenanceEvent(ProvenanceEventType eventType) { + Set<ProvenanceEventType> expectedEventTypes = Collections.singleton(eventType); + Set<ProvenanceEventType> actualEventTypes = testRunner.getProvenanceEvents().stream() + .map(ProvenanceEventRecord::getEventType) + .collect(toSet()); + assertEquals(expectedEventTypes, actualEventTypes); + } + + protected void assertNoProvenanceEvent() { + assertTrue(testRunner.getProvenanceEvents().isEmpty()); + } + + protected File createFile() { + return createFile(TEST_FILE_ID, TEST_FILENAME, SUBFOLDER_ID, TEXT_TYPE); + } + + protected File createFile(String id, String name, String parentId, String mimeType) { + File file = new File(); + file.setId(id); + file.setName(name); + file.setParents(singletonList(parentId)); + file.setCreatedTime(new DateTime(CREATED_TIME)); + file.setSize(TEST_SIZE); + file.setMimeType(mimeType); + return file; + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java index 471adffba5..8b10406fc7 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java @@ -16,17 +16,18 @@ */ package org.apache.nifi.processors.gcp.drive; -import com.google.api.services.drive.model.File; -import org.apache.nifi.util.MockFlowFile; -import org.junit.jupiter.api.Test; +import static java.lang.String.valueOf; +import static java.util.Collections.singletonList; -import java.util.Arrays; +import com.google.api.services.drive.model.File; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.nifi.util.MockFlowFile; +import org.junit.jupiter.api.Test; /** * See Javadoc {@link AbstractGoogleDriveIT} for instructions how to run this test. @@ -45,11 +46,13 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive> File file = createFileWithDefaultContent("test_file.txt", mainFolderId); Map<String, String> inputFlowFileAttributes = new HashMap<>(); - inputFlowFileAttributes.put("drive.id", file.getId()); - inputFlowFileAttributes.put("filename", file.getName()); + inputFlowFileAttributes.put(GoogleDriveAttributes.ID, file.getId()); + inputFlowFileAttributes.put(GoogleDriveAttributes.FILENAME, file.getName()); + inputFlowFileAttributes.put(GoogleDriveAttributes.SIZE, valueOf(DEFAULT_FILE_CONTENT.length())); + inputFlowFileAttributes.put(GoogleDriveAttributes.MIME_TYPE, "text/plain"); - HashSet<Map<String, String>> expectedAttributes = new HashSet<>(Arrays.asList(inputFlowFileAttributes)); - List<String> expectedContent = Arrays.asList(DEFAULT_FILE_CONTENT); + HashSet<Map<String, String>> expectedAttributes = new HashSet<>(singletonList(inputFlowFileAttributes)); + List<String> expectedContent = singletonList(DEFAULT_FILE_CONTENT); // WHEN testRunner.enqueue("unimportant_data", inputFlowFileAttributes); @@ -63,17 +66,17 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive> } @Test - void testInputFlowFileReferencesMissingFile() throws Exception { + void testInputFlowFileReferencesMissingFile() { // GIVEN Map<String, String> inputFlowFileAttributes = new HashMap<>(); - inputFlowFileAttributes.put("drive.id", "missing"); - inputFlowFileAttributes.put("filename", "missing_filename"); + inputFlowFileAttributes.put(GoogleDriveAttributes.ID, "missing"); + inputFlowFileAttributes.put(GoogleDriveAttributes.FILENAME, "missing_filename"); - Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(Arrays.asList( + Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(singletonList( new HashMap<String, String>() {{ - put("drive.id", "missing"); - put("filename", "missing_filename"); - put("error.code", "404"); + put(GoogleDriveAttributes.ID, "missing"); + put(GoogleDriveAttributes.FILENAME, "missing_filename"); + put(GoogleDriveAttributes.ERROR_CODE, "404"); }} )); @@ -83,7 +86,6 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive> // THEN testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0); - checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes); } @@ -93,11 +95,11 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive> File file = createFileWithDefaultContent("test_file.txt", mainFolderId); Map<String, String> inputFlowFileAttributes = new HashMap<>(); - inputFlowFileAttributes.put("drive.id", file.getId()); - inputFlowFileAttributes.put("filename", file.getName()); + inputFlowFileAttributes.put(GoogleDriveAttributes.ID, file.getId()); + inputFlowFileAttributes.put(GoogleDriveAttributes.FILENAME, file.getName()); MockFlowFile input = new MockFlowFile(1) { - AtomicBoolean throwException = new AtomicBoolean(true); + final AtomicBoolean throwException = new AtomicBoolean(true); @Override public boolean isPenalized() { @@ -116,7 +118,7 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive> } }; - Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(Arrays.asList( + Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(singletonList( inputFlowFileAttributes )); @@ -130,10 +132,12 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive> checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes); } + @Override public Set<String> getCheckedAttributeNames() { Set<String> checkedAttributeNames = OutputChecker.super.getCheckedAttributeNames(); - checkedAttributeNames.add(FetchGoogleDrive.ERROR_CODE_ATTRIBUTE); + checkedAttributeNames.add(GoogleDriveAttributes.ERROR_CODE); + checkedAttributeNames.remove(GoogleDriveAttributes.TIMESTAMP); return checkedAttributeNames; } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveTest.java new file mode 100644 index 0000000000..3cef53db33 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.drive; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.mockito.Mockito.when; + +import com.google.api.client.http.HttpTransport; +import com.google.api.services.drive.Drive; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class FetchGoogleDriveTest extends AbstractGoogleDriveTest { + + @BeforeEach + protected void setUp() throws Exception { + final FetchGoogleDrive testSubject = new FetchGoogleDrive() { + @Override + public Drive createDriveService(ProcessContext context, HttpTransport httpTransport, String... scopes) { + return mockDriverService; + } + }; + + testRunner = TestRunners.newTestRunner(testSubject); + super.setUp(); + } + + @Test + void testFileFetchFileNameFromProperty() throws IOException { + testRunner.setProperty(FetchGoogleDrive.FILE_ID, TEST_FILE_ID); + + mockFileDownload(TEST_FILE_ID); + runWithFlowFile(); + + testRunner.assertAllFlowFilesTransferred(FetchGoogleDrive.REL_SUCCESS, 1); + assertFlowFileAttributes(FetchGoogleDrive.REL_SUCCESS); + assertProvenanceEvent(ProvenanceEventType.FETCH); + } + + @Test + void testFetchFileNameFromFlowFileAttribute() throws Exception { + final MockFlowFile mockFlowFile = new MockFlowFile(0); + final Map<String, String> attributes = new HashMap<>(); + attributes.put(GoogleDriveAttributes.ID, TEST_FILE_ID); + mockFlowFile.putAttributes(attributes); + + mockFileDownload(TEST_FILE_ID); + + testRunner.enqueue(mockFlowFile); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(FetchGoogleDrive.REL_SUCCESS, 1); + assertFlowFileAttributes(FetchGoogleDrive.REL_SUCCESS); + assertProvenanceEvent(ProvenanceEventType.FETCH); + } + + @Test + void testFileFetchError() throws Exception { + testRunner.setProperty(FetchGoogleDrive.FILE_ID, TEST_FILE_ID); + + mockFileDownloadError(TEST_FILE_ID, new RuntimeException("Error during download")); + + runWithFlowFile(); + + testRunner.assertAllFlowFilesTransferred(FetchGoogleDrive.REL_FAILURE, 1); + final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(PutGoogleDrive.REL_FAILURE); + final MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals(GoogleDriveAttributes.ERROR_MESSAGE, "Error during download"); + assertNoProvenanceEvent(); + } + + private void mockFileDownload(String fileId) throws IOException { + when(mockDriverService.files() + .get(fileId) + .executeMediaAsInputStream()).thenReturn(new ByteArrayInputStream(CONTENT.getBytes(UTF_8))); + + when(mockDriverService.files() + .get(fileId) + .setFields("id, name, createdTime, mimeType, size") + .execute()).thenReturn(createFile()); + } + + private void mockFileDownloadError(String fileId, Exception exception) throws IOException { + when(mockDriverService.files() + .get(fileId) + .executeMediaAsInputStream()) + .thenThrow(exception); + } + + private void runWithFlowFile() { + testRunner.enqueue(new MockFlowFile(0)); + testRunner.run(); + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriverTestRunnerTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveTestRunnerTest.java similarity index 92% rename from nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriverTestRunnerTest.java rename to nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveTestRunnerTest.java index c6d2ab4261..af3d324005 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriverTestRunnerTest.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveTestRunnerTest.java @@ -16,10 +16,22 @@ */ package org.apache.nifi.processors.gcp.drive; +import static java.lang.String.valueOf; +import static java.util.Arrays.asList; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import com.google.api.client.http.HttpTransport; import com.google.api.client.util.DateTime; import com.google.api.services.drive.Drive; import com.google.api.services.drive.model.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; import org.apache.nifi.json.JsonRecordSetWriter; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; @@ -32,19 +44,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class ListGoogleDriverTestRunnerTest implements OutputChecker { +public class ListGoogleDriveTestRunnerTest implements OutputChecker { private ListGoogleDrive testSubject; private TestRunner testRunner; @@ -126,7 +126,7 @@ public class ListGoogleDriverTestRunnerTest implements OutputChecker { mockFetchedGoogleDriveFileList(id, filename, size, createdTime, modifiedTime, mimeType); - List<String> expectedContents = Arrays.asList( + List<String> expectedContents = asList( "[" + "{" + "\"drive.id\":\"" + id + "\"," + @@ -159,7 +159,7 @@ public class ListGoogleDriverTestRunnerTest implements OutputChecker { .setFields("nextPageToken, files(id, name, size, createdTime, modifiedTime, mimeType)") .execute() .getFiles() - ).thenReturn(Arrays.asList( + ).thenReturn(asList( createFile( id, filename, @@ -176,13 +176,13 @@ public class ListGoogleDriverTestRunnerTest implements OutputChecker { mockFetchedGoogleDriveFileList(id, filename, size, createdTime, modifiedTime, mimeType); Map<String, String> inputFlowFileAttributes = new HashMap<>(); - inputFlowFileAttributes.put("drive.id", id); - inputFlowFileAttributes.put("filename", filename); - inputFlowFileAttributes.put("drive.size", "" + size); - inputFlowFileAttributes.put("drive.timestamp", "" + expectedTimestamp); - inputFlowFileAttributes.put("mime.type", mimeType); + inputFlowFileAttributes.put(GoogleDriveAttributes.ID, id); + inputFlowFileAttributes.put(GoogleDriveAttributes.FILENAME, filename); + inputFlowFileAttributes.put(GoogleDriveAttributes.SIZE, valueOf(size)); + inputFlowFileAttributes.put(GoogleDriveAttributes.TIMESTAMP, valueOf(expectedTimestamp)); + inputFlowFileAttributes.put(GoogleDriveAttributes.MIME_TYPE, mimeType); - HashSet<Map<String, String>> expectedAttributes = new HashSet<>(Arrays.asList(inputFlowFileAttributes)); + HashSet<Map<String, String>> expectedAttributes = new HashSet<>(asList(inputFlowFileAttributes)); // WHEN testRunner.run(); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/PutGoogleDriveIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/PutGoogleDriveIT.java new file mode 100644 index 0000000000..98ac347992 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/PutGoogleDriveIT.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.drive; + +import static org.apache.nifi.processors.gcp.drive.PutGoogleDrive.CREATE_SUBFOLDER; +import static org.apache.nifi.processors.gcp.drive.PutGoogleDrive.FILE_NAME; +import static org.apache.nifi.processors.gcp.drive.PutGoogleDrive.FOLDER_ID; +import static org.apache.nifi.processors.gcp.drive.PutGoogleDrive.SUBFOLDER_NAME; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.util.MockFlowFile; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * See Javadoc {@link AbstractGoogleDriveIT} for instructions how to run this test. + */ +public class PutGoogleDriveIT extends AbstractGoogleDriveIT<PutGoogleDrive> implements OutputChecker { + + public static final String TEST_FILENAME = "testFileName"; + + @BeforeEach + public void init() throws Exception { + super.init(); + } + + @Override + public PutGoogleDrive createTestSubject() { + return new PutGoogleDrive(); + } + + @Test + void testUploadFileToFolderById() { + // GIVEN + testRunner.setProperty(FOLDER_ID, mainFolderId); + testRunner.setProperty(FILE_NAME, TEST_FILENAME); + + // WHEN + runWithFileContent(); + + // THEN + testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1); + testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0); + } + + @Test + void testUploadFileFolderByName() { + // GIVEN + testRunner.setProperty(SUBFOLDER_NAME, "testFolderNew"); + testRunner.setProperty(FOLDER_ID, mainFolderId); + testRunner.setProperty(FILE_NAME, TEST_FILENAME); + testRunner.setProperty(CREATE_SUBFOLDER, "true"); + + // WHEN + runWithFileContent(); + + // THEN + testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1); + testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0); + final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(PutGoogleDrive.REL_SUCCESS); + final MockFlowFile ff0 = flowFiles.get(0); + assertFlowFileAttributes(ff0); + } + + @Test + void testUploadFileCreateMultiLevelFolder() throws IOException { + createFolder("existingFolder", mainFolderId); + + // GIVEN + testRunner.setProperty(SUBFOLDER_NAME, "existingFolder/new1/new2"); + testRunner.setProperty(FOLDER_ID, mainFolderId); + testRunner.setProperty(FILE_NAME, TEST_FILENAME); + testRunner.setProperty(CREATE_SUBFOLDER, "true"); + + // WHEN + runWithFileContent(); + + // THEN + testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1); + testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0); + + final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(PutGoogleDrive.REL_SUCCESS); + final MockFlowFile ff0 = flowFiles.get(0); + assertFlowFileAttributes(ff0); + } + + @Test + void testSpecifiedFolderIdDoesNotExist() { + // GIVEN + testRunner.setProperty(FOLDER_ID, "nonExistentId"); + testRunner.setProperty(FILE_NAME, "testFile4"); + + // WHEN + runWithFileContent(); + + // THEN + testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 0); + testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 1); + + final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(PutGoogleDrive.REL_FAILURE); + final MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals(GoogleDriveAttributes.ERROR_CODE, "404"); + ff0.assertAttributeExists(GoogleDriveAttributes.ERROR_MESSAGE); + } + + @Test + void testUploadedFileAlreadyExistsFailResolution() { + // GIVEN + testRunner.setProperty(FOLDER_ID, mainFolderId); + testRunner.setProperty(FILE_NAME, TEST_FILENAME); + + // WHEN + runWithFileContent(); + + // THEN + testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1); + testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0); + testRunner.clearTransferState(); + + // WHEN + runWithFileContent(); + + // THEN + testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 0); + testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 1); + + } + + @Test + void testUploadedFileAlreadyExistsOverwriteResolution() { + // GIVEN + testRunner.setProperty(FOLDER_ID, mainFolderId); + testRunner.setProperty(FILE_NAME, TEST_FILENAME); + testRunner.setProperty(PutGoogleDrive.CONFLICT_RESOLUTION, PutGoogleDrive.REPLACE_RESOLUTION); + + // WHEN + runWithFileContent(); + + // THEN + testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1); + testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0); + testRunner.clearTransferState(); + + // WHEN + runWithFileContent("012345678"); + + // THEN + testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1); + testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0); + + final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(PutGoogleDrive.REL_SUCCESS); + final MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals(GoogleDriveAttributes.SIZE, "9"); + } + + @Test + void testUploadedFileAlreadyExistsIgnoreResolution() { + // GIVEN + testRunner.setProperty(FOLDER_ID, mainFolderId); + testRunner.setProperty(FILE_NAME, TEST_FILENAME); + testRunner.setProperty(PutGoogleDrive.CONFLICT_RESOLUTION, PutGoogleDrive.IGNORE_RESOLUTION); + + // WHEN + runWithFileContent(); + + // THEN + testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1); + testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0); + testRunner.clearTransferState(); + + // WHEN + runWithFileContent(); + + // THEN + testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1); + testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0); + } + + private void runWithFileContent() { + runWithFileContent(DEFAULT_FILE_CONTENT); + } + + private void runWithFileContent(String content) { + final Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); + testRunner.enqueue(content, attributes); + testRunner.run(); + } + + private void assertFlowFileAttributes(MockFlowFile flowFile) { + flowFile.assertAttributeExists(GoogleDriveAttributes.ID); + flowFile.assertAttributeEquals(GoogleDriveAttributes.FILENAME, TEST_FILENAME); + flowFile.assertAttributeExists(GoogleDriveAttributes.TIMESTAMP); + flowFile.assertAttributeEquals(GoogleDriveAttributes.SIZE, String.valueOf(DEFAULT_FILE_CONTENT.length())); + flowFile.assertAttributeEquals(GoogleDriveAttributes.MIME_TYPE, "text/plain"); + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/PutGoogleDriveTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/PutGoogleDriveTest.java new file mode 100644 index 0000000000..b9e45e093d --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/PutGoogleDriveTest.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.gcp.drive; + +import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Collections.singletonList; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveTrait.DRIVE_FOLDER_MIME_TYPE; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.InputStreamContent; +import com.google.api.services.drive.Drive; +import com.google.api.services.drive.model.File; +import com.google.api.services.drive.model.FileList; +import com.google.gson.JsonParseException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class PutGoogleDriveTest extends AbstractGoogleDriveTest{ + + @BeforeEach + protected void setUp() throws Exception { + final PutGoogleDrive testSubject = new PutGoogleDrive() { + @Override + public Drive createDriveService(ProcessContext context, HttpTransport httpTransport, String... scopes) { + return mockDriverService; + } + }; + + testRunner = TestRunners.newTestRunner(testSubject); + super.setUp(); + testRunner.setProperty(PutGoogleDrive.FOLDER_ID, SHARED_FOLDER_ID); + } + + @Test + void testUploadChunkSizeValidity() { + testRunner.setProperty(PutGoogleDrive.CHUNKED_UPLOAD_SIZE, ""); + testRunner.assertNotValid(); + testRunner.setProperty(PutGoogleDrive.CHUNKED_UPLOAD_SIZE, "40 MB"); + testRunner.assertValid(); + testRunner.setProperty(PutGoogleDrive.CHUNKED_UPLOAD_SIZE, "1024"); + testRunner.assertNotValid(); + testRunner.setProperty(PutGoogleDrive.CHUNKED_UPLOAD_SIZE, "510 KB"); + testRunner.assertNotValid(); + testRunner.setProperty(PutGoogleDrive.CHUNKED_UPLOAD_SIZE, "2 GB"); + testRunner.assertNotValid(); + + testRunner.setProperty(PutGoogleDrive.CHUNKED_UPLOAD_THRESHOLD, "100 MB"); + testRunner.setProperty(PutGoogleDrive.CHUNKED_UPLOAD_SIZE, "110 MB"); + testRunner.assertNotValid(); + } + + @Test + void testSubfolderNameValidity() { + testRunner.setProperty(PutGoogleDrive.SUBFOLDER_NAME, "sub1"); + testRunner.assertValid(); + testRunner.setProperty(PutGoogleDrive.SUBFOLDER_NAME, "sub1/sub2"); + testRunner.assertValid(); + testRunner.setProperty(PutGoogleDrive.SUBFOLDER_NAME, "/sub1"); + testRunner.assertNotValid(); + testRunner.setProperty(PutGoogleDrive.SUBFOLDER_NAME, "/"); + testRunner.assertNotValid(); + testRunner.setProperty(PutGoogleDrive.SUBFOLDER_NAME, "sub1/"); + testRunner.assertNotValid(); + testRunner.setProperty(PutGoogleDrive.SUBFOLDER_NAME, "/sub1/"); + testRunner.assertNotValid(); + } + + @Test + void testFileUploadFileNameFromProperty() throws Exception { + testRunner.setProperty(PutGoogleDrive.FILE_NAME, TEST_FILENAME); + + mockFileUpload(createFile()); + runWithFlowFile(); + + testRunner.assertAllFlowFilesTransferred(PutGoogleDrive.REL_SUCCESS, 1); + assertFlowFileAttributes(PutGoogleDrive.REL_SUCCESS); + assertProvenanceEvent(ProvenanceEventType.SEND); + } + + @Test + void testFileUploadFileNameFromFlowFileAttribute() throws Exception { + testRunner.setProperty(PutGoogleDrive.FOLDER_ID, SHARED_FOLDER_ID); + + mockFileUpload(createFile()); + + final MockFlowFile mockFlowFile = getMockFlowFile(); + final Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), TEST_FILENAME); + mockFlowFile.putAttributes(attributes); + testRunner.enqueue(mockFlowFile); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(PutGoogleDrive.REL_SUCCESS, 1); + assertFlowFileAttributes(PutGoogleDrive.REL_SUCCESS); + assertProvenanceEvent(ProvenanceEventType.SEND); + } + + @Test + void testFileUploadFileToFolderSpecifiedByNameFolderExists() throws Exception { + testRunner.setProperty(PutGoogleDrive.SUBFOLDER_NAME, SUBFOLDER_NAME); + testRunner.setProperty(PutGoogleDrive.FILE_NAME, TEST_FILENAME); + + when(mockDriverService.files() + .list() + .setQ(format("mimeType='%s' and name='%s' and ('%s' in parents)", DRIVE_FOLDER_MIME_TYPE, SUBFOLDER_NAME, SHARED_FOLDER_ID)) + .setFields("files(name, id)") + .execute()) + .thenReturn(new FileList().setFiles(singletonList(createFile(SUBFOLDER_ID, SUBFOLDER_NAME, SHARED_FOLDER_ID, DRIVE_FOLDER_MIME_TYPE)))); + + mockFileUpload(createFile()); + + runWithFlowFile(); + testRunner.assertAllFlowFilesTransferred(PutGoogleDrive.REL_SUCCESS, 1); + assertFlowFileAttributes(PutGoogleDrive.REL_SUCCESS); + assertProvenanceEvent(ProvenanceEventType.SEND); + } + + @Test + void testFileUploadError() throws Exception { + testRunner.setProperty(PutGoogleDrive.FILE_NAME, TEST_FILENAME); + + final JsonParseException exception = new JsonParseException("Google Drive error", new FileNotFoundException()); + mockFileUploadError(exception); + + runWithFlowFile(); + + testRunner.assertAllFlowFilesTransferred(PutGoogleDrive.REL_FAILURE, 1); + List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(PutGoogleDrive.REL_FAILURE); + MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeExists(ERROR_MESSAGE); + assertNoProvenanceEvent(); + } + + @Test + void testFileAlreadyExistsFailResolution() throws Exception { + testRunner.setProperty(PutGoogleDrive.FILE_NAME, TEST_FILENAME); + + mockFileExists(); + + runWithFlowFile(); + + testRunner.assertAllFlowFilesTransferred(PutGoogleDrive.REL_FAILURE, 1); + assertNoProvenanceEvent(); + } + + @Test + void testFileAlreadyExistsIgnoreResolution() throws Exception { + testRunner.setProperty(PutGoogleDrive.FILE_NAME, TEST_FILENAME); + testRunner.setProperty(PutGoogleDrive.CONFLICT_RESOLUTION, PutGoogleDrive.IGNORE_RESOLUTION); + + mockFileExists(); + + runWithFlowFile(); + + testRunner.assertAllFlowFilesTransferred(PutGoogleDrive.REL_SUCCESS, 1); + final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(PutGoogleDrive.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(GoogleDriveAttributes.ID, TEST_FILE_ID); + flowFile.assertAttributeEquals(GoogleDriveAttributes.FILENAME, TEST_FILENAME); + assertNoProvenanceEvent(); + } + + @Test + void testFileAlreadyExistsOverwriteResolution() throws Exception { + testRunner.setProperty(PutGoogleDrive.FILE_NAME, TEST_FILENAME); + testRunner.setProperty(PutGoogleDrive.CONFLICT_RESOLUTION, PutGoogleDrive.REPLACE_RESOLUTION); + + mockFileExists(); + + mockFileUpdate(createFile()); + + runWithFlowFile(); + + testRunner.assertAllFlowFilesTransferred(PutGoogleDrive.REL_SUCCESS, 1); + assertFlowFileAttributes(PutGoogleDrive.REL_SUCCESS); + assertProvenanceEvent(ProvenanceEventType.SEND); + } + + private MockFlowFile getMockFlowFile() { + MockFlowFile inputFlowFile = new MockFlowFile(0); + inputFlowFile.setData(CONTENT.getBytes(UTF_8)); + return inputFlowFile; + } + + private void runWithFlowFile() { + MockFlowFile mockFlowFile = getMockFlowFile(); + testRunner.enqueue(mockFlowFile); + testRunner.run(); + } + + private void mockFileUpload(File uploadedFile) throws IOException { + when(mockDriverService.files() + .create(any(File.class), any(InputStreamContent.class)) + .setFields("id, name, createdTime, mimeType, size") + .execute()) + .thenReturn(uploadedFile); + } + + private void mockFileUpdate(File uploadedFile) throws IOException { + when(mockDriverService.files() + .update(eq(uploadedFile.getId()), any(File.class), any(InputStreamContent.class)) + .setFields("id, name, createdTime, mimeType, size") + .execute()) + .thenReturn(uploadedFile); + } + + private void mockFileUploadError(Exception exception) throws IOException { + when(mockDriverService.files() + .create(any(File.class), any(InputStreamContent.class))) + .thenThrow(exception); + } + + private void mockFileExists() throws IOException { + when(mockDriverService.files() + .list() + .setQ(format("name='%s' and ('%s' in parents)", TEST_FILENAME, SHARED_FOLDER_ID)) + .setFields("files(name, id)") + .execute()) + .thenReturn(new FileList().setFiles(singletonList(createFile()))); + } +} \ No newline at end of file