This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2fae3bb9a8 NIFI-10281 - Added FetchGoogleDrive processor. Consolidated 
record- and attribute output for ListGoogleDrive. (Record fields and flowfile 
attributes are now the same.)
2fae3bb9a8 is described below

commit 2fae3bb9a8a8c046b9430469b16d66044a39d9dc
Author: Tamas Palfy <tpa...@apache.org>
AuthorDate: Tue Jul 26 20:00:23 2022 +0200

    NIFI-10281 - Added FetchGoogleDrive processor. Consolidated record- and 
attribute output for ListGoogleDrive. (Record fields and flowfile attributes 
are now the same.)
    
    This closes #6248.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../processors/gcp/drive/FetchGoogleDrive.java     | 238 ++++++++++++++
 .../processors/gcp/drive/GoogleDriveFileInfo.java  |  21 +-
 .../gcp/drive/GoogleDriveFlowFileAttribute.java    |  56 ++++
 .../processors/gcp/drive/GoogleDriveTrait.java     |  69 ++++
 .../nifi/processors/gcp/drive/ListGoogleDrive.java |  76 ++---
 .../services/org.apache.nifi.processor.Processor   |   3 +-
 .../gcp/drive/AbstractGoogleDriveIT.java           | 153 +++++++++
 .../processors/gcp/drive/FetchGoogleDriveIT.java   | 361 +++++++++++++++++++++
 .../processors/gcp/drive/ListGoogleDriveIT.java    | 130 +-------
 .../gcp/drive/ListGoogleDriveSimpleTest.java       | 152 +++++++++
 .../gcp/drive/ListGoogleDriverTestRunnerTest.java  | 219 +++++++++++++
 .../nifi/processors/gcp/drive/OutputChecker.java   |  70 ++++
 12 files changed, 1371 insertions(+), 177 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java
new file mode 100644
index 0000000000..9804999854
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.DriveScopes;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
+import org.apache.nifi.processors.gcp.util.GoogleUtils;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"google", "drive", "storage", "fetch"})
+@CapabilityDescription("Fetches files from a Google Drive Folder. Designed to 
be used in tandem with ListGoogleDrive.")
+@SeeAlso({ListGoogleDrive.class})
+@WritesAttributes({
+        @WritesAttribute(attribute = FetchGoogleDrive.ERROR_CODE_ATTRIBUTE, 
description = "The error code returned by Google Drive when the fetch of a file 
fails"),
+        @WritesAttribute(attribute = FetchGoogleDrive.ERROR_MESSAGE_ATTRIBUTE, 
description = "The error message returned by Google Drive when the fetch of a 
file fails")
+})
+public class FetchGoogleDrive extends AbstractProcessor implements 
GoogleDriveTrait {
+    public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+    public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+    public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
+            .Builder().name("drive-file-id")
+            .displayName("File ID")
+            .description("The Drive ID of the File to fetch")
+            .required(true)
+            .defaultValue("${drive.id}")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for reading 
incoming Google Driver File meta-data as NiFi Records."
+                    + " If not set, the Processor expects as attributes of a 
separate flowfile for each File to fetch.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(false)
+            .build();
+
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder()
+                    .name("success")
+                    .description("A flowfile will be routed here for each 
successfully fetched File.")
+                    .build();
+
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description("A flowfile will be routed here for each File 
for which fetch was attempted but failed.")
+                    .build();
+
+    public static final Relationship REL_INPUT_FAILURE =
+            new Relationship.Builder().name("input_failure")
+                    .description("The incoming flowfile will be routed here if 
it's content could not be processed.")
+                    .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE,
+            FILE_ID,
+            RECORD_READER,
+            ProxyConfiguration.createProxyConfigPropertyDescriptor(false, 
ProxyAwareTransportFactory.PROXY_SPECS)
+    ));
+
+    public static final Set<Relationship> relationships = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE,
+            REL_INPUT_FAILURE
+    )));
+
+    private volatile Drive driveService;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) throws IOException {
+        final ProxyConfiguration proxyConfiguration = 
ProxyConfiguration.getConfiguration(context);
+
+        driveService = createDriveService(
+                context,
+                new ProxyAwareTransportFactory(proxyConfiguration).create(),
+                DriveScopes.DRIVE, DriveScopes.DRIVE_FILE
+        );
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (context.getProperty(RECORD_READER).isSet()) {
+            RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+            try (InputStream inFlowFile = session.read(flowFile)) {
+                final Map<String, String> flowFileAttributes = 
flowFile.getAttributes();
+                final RecordReader reader = 
recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, 
flowFile.getSize(), getLogger());
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    String fileId = record.getAsString(GoogleDriveFileInfo.ID);
+                    FlowFile outFlowFile = session.create(flowFile);
+                    try {
+                        addAttributes(session, outFlowFile, record);
+
+                        fetchFile(fileId, session, outFlowFile);
+
+                        session.transfer(outFlowFile, REL_SUCCESS);
+                    } catch (GoogleJsonResponseException e) {
+                        handleErrorResponse(session, fileId, outFlowFile, e);
+                    } catch (Exception e) {
+                        handleUnexpectedError(session, outFlowFile, fileId, e);
+                    }
+                }
+                session.remove(flowFile);
+            } catch (IOException | MalformedRecordException | 
SchemaNotFoundException e) {
+                getLogger().error("Couldn't read file metadata content as 
records from incoming flowfile", e);
+
+                session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, 
e.getMessage());
+
+                session.transfer(flowFile, REL_INPUT_FAILURE);
+            } catch (Exception e) {
+                getLogger().error("Unexpected error while processing incoming 
flowfile", e);
+
+                session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, 
e.getMessage());
+
+                session.transfer(flowFile, REL_INPUT_FAILURE);
+            }
+        } else {
+            String fileId = 
context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
+            FlowFile outFlowFile = flowFile;
+            try {
+                fetchFile(fileId, session, outFlowFile);
+
+                session.transfer(outFlowFile, REL_SUCCESS);
+            } catch (GoogleJsonResponseException e) {
+                handleErrorResponse(session, fileId, flowFile, e);
+            } catch (Exception e) {
+                handleUnexpectedError(session, flowFile, fileId, e);
+            }
+        }
+        session.commitAsync();
+    }
+
+    private void addAttributes(ProcessSession session, FlowFile outFlowFile, 
Record record) {
+        Map<String, String> attributes = new HashMap<>();
+
+        for (GoogleDriveFlowFileAttribute attribute : 
GoogleDriveFlowFileAttribute.values()) {
+            Optional.ofNullable(attribute.getValue(record))
+                    .ifPresent(value -> attributes.put(attribute.getName(), 
value));
+        }
+
+        session.putAllAttributes(outFlowFile, attributes);
+    }
+
+    void fetchFile(String fileId, ProcessSession session, FlowFile 
outFlowFile) throws IOException {
+        InputStream driveFileInputStream = driveService
+                .files()
+                .get(fileId)
+                .executeMediaAsInputStream();
+
+        session.importFrom(driveFileInputStream, outFlowFile);
+    }
+
+    private void handleErrorResponse(ProcessSession session, String fileId, 
FlowFile outFlowFile, GoogleJsonResponseException e) {
+        getLogger().error("Couldn't fetch file with id '{}'", fileId, e);
+
+        session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, "" + 
e.getStatusCode());
+        session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, 
e.getMessage());
+
+        session.transfer(outFlowFile, REL_FAILURE);
+    }
+
+    private void handleUnexpectedError(ProcessSession session, FlowFile 
flowFile, String fileId, Exception e) {
+        getLogger().error("Unexpected error while fetching and processing file 
with id '{}'", fileId, e);
+
+        session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, "N/A");
+        session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, 
e.getMessage());
+
+        session.transfer(flowFile, REL_FAILURE);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFileInfo.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFileInfo.java
index 6395757f0b..f4a1a0a3cf 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFileInfo.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFileInfo.java
@@ -30,23 +30,23 @@ import java.util.List;
 import java.util.Map;
 
 public class GoogleDriveFileInfo implements ListableEntity {
+    public static final String ID = "drive.id";
+    public static final String FILENAME = "filename";
+    public static final String SIZE = "drive.size";
+    public static final String TIMESTAMP = "drive.timestamp";
+    public static final String MIME_TYPE = "mime.type";
 
-    private static final RecordSchema SCHEMA;
-    private static final String ID = "id";
-    private static final String FILENAME = "filename";
-    private static final String SIZE = "size";
-    private static final String CREATED_TIME = "createdTime";
-    private static final String MODIFIED_TIME = "modifiedTime";
-    private static final String MIME_TYPE = "mimeType";
+    private  static final RecordSchema SCHEMA;
 
     static {
         final List<RecordField> recordFields = new ArrayList<>();
+
         recordFields.add(new RecordField(ID, 
RecordFieldType.STRING.getDataType(), false));
         recordFields.add(new RecordField(FILENAME, 
RecordFieldType.STRING.getDataType(), false));
         recordFields.add(new RecordField(SIZE, 
RecordFieldType.LONG.getDataType(), false));
-        recordFields.add(new RecordField(CREATED_TIME, 
RecordFieldType.TIMESTAMP.getDataType(), false));
-        recordFields.add(new RecordField(MODIFIED_TIME, 
RecordFieldType.TIMESTAMP.getDataType(), false));
+        recordFields.add(new RecordField(TIMESTAMP, 
RecordFieldType.LONG.getDataType(), false));
         recordFields.add(new RecordField(MIME_TYPE, 
RecordFieldType.STRING.getDataType()));
+
         SCHEMA = new SimpleRecordSchema(recordFields);
     }
 
@@ -84,8 +84,7 @@ public class GoogleDriveFileInfo implements ListableEntity {
         values.put(ID, getId());
         values.put(FILENAME, getName());
         values.put(SIZE, getSize());
-        values.put(CREATED_TIME, getCreatedTime());
-        values.put(MODIFIED_TIME, getModifiedTime());
+        values.put(TIMESTAMP, getTimestamp());
         values.put(MIME_TYPE, getMimeType());
 
         return new MapRecord(SCHEMA, values);
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFlowFileAttribute.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFlowFileAttribute.java
new file mode 100644
index 0000000000..19f73fa398
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFlowFileAttribute.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+public enum GoogleDriveFlowFileAttribute {
+    ID(GoogleDriveFileInfo.ID, GoogleDriveFileInfo::getId),
+    FILE_NAME(GoogleDriveFileInfo.FILENAME, GoogleDriveFileInfo::getName),
+    SIZE(GoogleDriveFileInfo.SIZE, fileInfo -> 
Optional.ofNullable(fileInfo.getSize())
+            .map(String::valueOf)
+            .orElse(null)
+    ),
+    TIME_STAMP(GoogleDriveFileInfo.TIMESTAMP, fileInfo -> 
Optional.ofNullable(fileInfo.getTimestamp())
+            .map(String::valueOf)
+            .orElse(null)
+    ),
+    MIME_TYPE(GoogleDriveFileInfo.MIME_TYPE, GoogleDriveFileInfo::getMimeType);
+
+    private final String name;
+    private final Function<GoogleDriveFileInfo, String> fromFileInfo;
+
+    GoogleDriveFlowFileAttribute(String attributeName, 
Function<GoogleDriveFileInfo, String> fromFileInfo) {
+        this.name = attributeName;
+        this.fromFileInfo = fromFileInfo;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getValue(Record record) {
+        return record.getAsString(name);
+    }
+
+    public String getValue(GoogleDriveFileInfo fileInfo) {
+        return fromFileInfo.apply(fileInfo);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveTrait.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveTrait.java
new file mode 100644
index 0000000000..5794c0b598
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveTrait.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.gson.GsonFactory;
+import com.google.api.services.drive.Drive;
+import com.google.auth.http.HttpCredentialsAdapter;
+import com.google.auth.oauth2.GoogleCredentials;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.gcp.util.GoogleUtils;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+public interface GoogleDriveTrait {
+    String APPLICATION_NAME = "NiFi";
+
+    JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance();
+
+    default Drive createDriveService(ProcessContext context, HttpTransport 
httpTransport, String... scopes) {
+        Drive driveService = new Drive.Builder(
+                httpTransport,
+                JSON_FACTORY,
+                getHttpCredentialsAdapter(
+                        context,
+                        Arrays.asList(scopes)
+                )
+        )
+                .setApplicationName(APPLICATION_NAME)
+                .build();
+
+        return driveService;
+    }
+
+    default HttpCredentialsAdapter getHttpCredentialsAdapter(
+            final ProcessContext context,
+            final Collection<String> scopes
+    ) {
+        GoogleCredentials googleCredentials = getGoogleCredentials(context);
+
+        HttpCredentialsAdapter httpCredentialsAdapter = new 
HttpCredentialsAdapter(googleCredentials.createScoped(scopes));
+
+        return httpCredentialsAdapter;
+    }
+
+    default GoogleCredentials getGoogleCredentials(final ProcessContext 
context) {
+        final GCPCredentialsService gcpCredentialsService = 
context.getProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE)
+                .asControllerService(GCPCredentialsService.class);
+
+        return gcpCredentialsService.getGoogleCredentials();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/ListGoogleDrive.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/ListGoogleDrive.java
index b141f1ca53..e5fcbf8f20 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/ListGoogleDrive.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/ListGoogleDrive.java
@@ -17,14 +17,11 @@
 package org.apache.nifi.processors.gcp.drive;
 
 import com.google.api.client.http.HttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.json.gson.GsonFactory;
+import com.google.api.client.util.DateTime;
 import com.google.api.services.drive.Drive;
 import com.google.api.services.drive.DriveScopes;
 import com.google.api.services.drive.model.File;
 import com.google.api.services.drive.model.FileList;
-import com.google.auth.http.HttpCredentialsAdapter;
-import com.google.auth.oauth2.GoogleCredentials;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
@@ -33,6 +30,7 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -41,7 +39,6 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.list.AbstractListProcessor;
@@ -64,6 +61,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 @PrimaryNodeOnly
@@ -74,24 +72,20 @@ import java.util.concurrent.TimeUnit;
         "Or - in case the 'Record Writer' property is set - the entire result 
is written as records to a single flowfile. " +
         "This Processor is designed to run on Primary Node only in a cluster. 
If the primary node changes, the new Primary Node will pick up where the " +
         "previous node left off without duplicating all of the data.")
+@SeeAlso({FetchGoogleDrive.class})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@WritesAttributes({@WritesAttribute(attribute = "drive.id", description = "The 
id of the file"),
-        @WritesAttribute(attribute = "filename", description = "The name of 
the file"),
-        @WritesAttribute(attribute = "drive.size", description = "The size of 
the file"),
-        @WritesAttribute(attribute = "drive.timestamp", description = "The 
last modified time or created time (whichever is greater) of the file." +
+@WritesAttributes({@WritesAttribute(attribute = GoogleDriveFileInfo.ID, 
description = "The id of the file"),
+        @WritesAttribute(attribute = GoogleDriveFileInfo.FILENAME, description 
= "The name of the file"),
+        @WritesAttribute(attribute = GoogleDriveFileInfo.SIZE, description = 
"The size of the file"),
+        @WritesAttribute(attribute = GoogleDriveFileInfo.TIMESTAMP, 
description = "The last modified time or created time (whichever is greater) of 
the file." +
                 " The reason for this is that the original modified date of a 
file is preserved when uploaded to Google Drive." +
                 " 'Created time' takes the time when the upload occurs. 
However uploaded files can still be modified later."),
-        @WritesAttribute(attribute = "mime.type", description = "MimeType of 
the file")})
+        @WritesAttribute(attribute = GoogleDriveFileInfo.MIME_TYPE, 
description = "MimeType of the file")})
 @Stateful(scopes = {Scope.CLUSTER}, description = "The processor stores 
necessary data to be able to keep track what files have been listed already." +
         " What exactly needs to be stored depends on the 'Listing Strategy'." +
         " State is stored across the cluster so that this Processor can be run 
on Primary Node only and if a new Primary Node is selected, the new node can 
pick up" +
         " where the previous node left off, without duplicating the data.")
-public class ListGoogleDrive extends 
AbstractListProcessor<GoogleDriveFileInfo> {
-    private static final String APPLICATION_NAME = "NiFi";
-    private static final JsonFactory JSON_FACTORY = 
GsonFactory.getDefaultInstance();
-
-    private volatile HttpTransport httpTransport;
-
+public class ListGoogleDrive extends 
AbstractListProcessor<GoogleDriveFileInfo> implements GoogleDriveTrait {
     public static final PropertyDescriptor FOLDER_ID = new 
PropertyDescriptor.Builder()
             .name("folder-id")
             .displayName("Folder ID")
@@ -156,6 +150,8 @@ public class ListGoogleDrive extends 
AbstractListProcessor<GoogleDriveFileInfo>
             ProxyConfiguration.createProxyConfigPropertyDescriptor(false, 
ProxyAwareTransportFactory.PROXY_SPECS)
     ));
 
+    private volatile Drive driveService;
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return PROPERTIES;
@@ -171,11 +167,11 @@ public class ListGoogleDrive extends 
AbstractListProcessor<GoogleDriveFileInfo>
             final ProcessContext context
     ) {
         final Map<String, String> attributes = new HashMap<>();
-        attributes.put("drive.id", entity.getId());
-        attributes.put("filename", entity.getName());
-        attributes.put("drive.size", String.valueOf(entity.getSize()));
-        attributes.put("drive.timestamp", 
String.valueOf(entity.getTimestamp()));
-        attributes.put("mime.type", entity.getMimeType());
+
+        for (GoogleDriveFlowFileAttribute attribute : 
GoogleDriveFlowFileAttribute.values()) {
+            Optional.ofNullable(attribute.getValue(entity))
+                    .ifPresent(value -> attributes.put(attribute.getName(), 
value));
+        }
 
         return attributes;
     }
@@ -183,7 +179,10 @@ public class ListGoogleDrive extends 
AbstractListProcessor<GoogleDriveFileInfo>
     @OnScheduled
     public void onScheduled(final ProcessContext context) throws IOException {
         final ProxyConfiguration proxyConfiguration = 
ProxyConfiguration.getConfiguration(context);
-        httpTransport = new 
ProxyAwareTransportFactory(proxyConfiguration).create();
+
+        HttpTransport httpTransport = new 
ProxyAwareTransportFactory(proxyConfiguration).create();
+
+        driveService = createDriveService(context, httpTransport, 
DriveScopes.DRIVE_METADATA_READONLY);
     }
 
     @Override
@@ -230,17 +229,6 @@ public class ListGoogleDrive extends 
AbstractListProcessor<GoogleDriveFileInfo>
         final Boolean recursive = 
context.getProperty(RECURSIVE_SEARCH).asBoolean();
         final Long minAge = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
 
-        Drive driveService = new Drive.Builder(
-                this.httpTransport,
-                JSON_FACTORY,
-                getHttpCredentialsAdapter(
-                        context,
-                        Arrays.asList(DriveScopes.DRIVE_METADATA_READONLY)
-                )
-        )
-                .setApplicationName(APPLICATION_NAME)
-                .build();
-
         StringBuilder queryBuilder = new StringBuilder();
         queryBuilder.append(buildQueryForDirs(driveService, folderId, 
recursive));
         queryBuilder.append(" and (mimeType != 
'application/vnd.google-apps.folder')");
@@ -276,8 +264,8 @@ public class ListGoogleDrive extends 
AbstractListProcessor<GoogleDriveFileInfo>
                         .id(file.getId())
                         .fileName(file.getName())
                         .size(file.getSize())
-                        .createdTime(file.getCreatedTime().getValue())
-                        .modifiedTime(file.getModifiedTime().getValue())
+                        
.createdTime(Optional.ofNullable(file.getCreatedTime()).map(DateTime::getValue).orElse(0L))
+                        
.modifiedTime(Optional.ofNullable(file.getModifiedTime()).map(DateTime::getValue).orElse(0L))
                         .mimeType(file.getMimeType());
 
                 listing.add(builder.build());
@@ -294,24 +282,6 @@ public class ListGoogleDrive extends 
AbstractListProcessor<GoogleDriveFileInfo>
         return performListing(context, null, 
ListingMode.CONFIGURATION_VERIFICATION).size();
     }
 
-    HttpCredentialsAdapter getHttpCredentialsAdapter(
-            final ProcessContext context,
-            final Collection<String> scopes
-    ) {
-        GoogleCredentials googleCredentials = getGoogleCredentials(context);
-
-        HttpCredentialsAdapter httpCredentialsAdapter = new 
HttpCredentialsAdapter(googleCredentials.createScoped(scopes));
-
-        return httpCredentialsAdapter;
-    }
-
-    private GoogleCredentials getGoogleCredentials(final ProcessContext 
context) {
-        final GCPCredentialsService gcpCredentialsService = 
context.getProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE)
-                .asControllerService(GCPCredentialsService.class);
-
-        return gcpCredentialsService.getGoogleCredentials();
-    }
-
     private static String buildQueryForDirs(
             final Drive service,
             final String folderId,
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 32001116d7..799e648ff3 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -22,4 +22,5 @@ org.apache.nifi.processors.gcp.pubsub.lite.PublishGCPubSubLite
 org.apache.nifi.processors.gcp.pubsub.lite.ConsumeGCPubSubLite
 org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
 org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming
-org.apache.nifi.processors.gcp.drive.ListGoogleDrive
\ No newline at end of file
+org.apache.nifi.processors.gcp.drive.ListGoogleDrive
+org.apache.nifi.processors.gcp.drive.FetchGoogleDrive
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java
new file mode 100644
index 0000000000..0176803602
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
+import com.google.api.client.http.AbstractInputStreamContent;
+import com.google.api.client.http.ByteArrayContent;
+import com.google.api.client.http.javanet.NetHttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.gson.GsonFactory;
+import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.DriveScopes;
+import com.google.api.services.drive.model.File;
+import org.apache.nifi.processor.Processor;
+import 
org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.processors.gcp.util.GoogleUtils;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+/**
+ * Set the following constants before running:<br />
+ * <br />
+ * CREDENTIAL_JSON_FILE_PATH - A Service Account credentials JSON file.<br />
+ * SHARED_FOLDER_ID - The ID of a Folder that is shared with the Service 
Account. The test will create files and sub-folders within this folder.<br />
+ * <br />
+ * Created files and folders are cleaned up, but it's advisable to dedicate a 
folder for this test so that it can be cleaned up easily should the test fail 
to do so.
+ * <br /><br />
+ * WARNING: The creation of a file is not a synchronized operation, may need 
to adjust tests accordingly!
+ */
+public abstract class AbstractGoogleDriveIT<T extends GoogleDriveTrait & 
Processor> {
+    private static final String CREDENTIAL_JSON_FILE_PATH = "";
+    private static final String SHARED_FOLDER_ID = "";
+
+    protected static final String DEFAULT_FILE_CONTENT = "test_content";
+
+    private static final JsonFactory JSON_FACTORY = 
GsonFactory.getDefaultInstance();
+
+    protected T testSubject;
+    protected TestRunner testRunner;
+
+    protected Drive driveService;
+
+    protected String mainFolderId;
+
+    protected abstract T createTestSubject();
+
+    @BeforeEach
+    protected void init() throws Exception {
+        testSubject = createTestSubject();
+        testRunner = createTestRunner();
+
+        NetHttpTransport httpTransport = 
GoogleNetHttpTransport.newTrustedTransport();
+
+        driveService = new Drive.Builder(
+                httpTransport,
+                JSON_FACTORY,
+                testSubject.getHttpCredentialsAdapter(
+                        testRunner.getProcessContext(),
+                        DriveScopes.all()
+                )
+        )
+                .setApplicationName(this.getClass().getSimpleName())
+                .build();
+
+        File mainFolder = createFolder("main", SHARED_FOLDER_ID);
+        mainFolderId = mainFolder.getId();
+    }
+
+    @AfterEach
+    protected void tearDown() throws Exception {
+        if (driveService != null) {
+            driveService.files()
+                    .delete(mainFolderId)
+                    .execute();
+        }
+    }
+
+    protected TestRunner createTestRunner() throws Exception {
+        TestRunner testRunner = TestRunners.newTestRunner(testSubject);
+
+        GCPCredentialsControllerService gcpCredentialsControllerService = new 
GCPCredentialsControllerService();
+        testRunner.addControllerService("gcp_credentials_provider_service", 
gcpCredentialsControllerService);
+        testRunner.setProperty(gcpCredentialsControllerService, 
CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE, 
CREDENTIAL_JSON_FILE_PATH);
+        testRunner.enableControllerService(gcpCredentialsControllerService);
+        testRunner.setProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, 
"gcp_credentials_provider_service");
+
+        return testRunner;
+    }
+
+    protected File createFolder(String folderName, String... parentFolderIds) 
throws IOException {
+        File fileMetaData = new File();
+        fileMetaData.setName(folderName);
+
+        if (parentFolderIds != null) {
+            fileMetaData.setParents(Arrays.asList(parentFolderIds));
+        }
+
+        fileMetaData.setMimeType("application/vnd.google-apps.folder");
+
+        Drive.Files.Create create = driveService.files()
+                .create(fileMetaData)
+                .setFields("id");
+
+        File file = create.execute();
+
+        return file;
+    }
+
+    protected File createFileWithDefaultContent(String name, String... 
folderIds) throws IOException {
+        return createFile(name, DEFAULT_FILE_CONTENT, folderIds);
+    }
+
+    protected File createFile(String name, String fileContent, String... 
folderIds) throws IOException {
+        File fileMetadata = new File();
+        fileMetadata.setName(name);
+        fileMetadata.setParents(Arrays.asList(folderIds));
+
+        AbstractInputStreamContent content = new 
ByteArrayContent("text/plain", fileContent.getBytes(StandardCharsets.UTF_8));
+
+        Drive.Files.Create create = driveService.files()
+                .create(fileMetadata, content)
+                .setFields("id, name, modifiedTime");
+
+        File file = create.execute();
+
+        return file;
+    }
+
+    public TestRunner getTestRunner() {
+        return testRunner;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java
new file mode 100644
index 0000000000..48487b2609
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+import com.google.api.services.drive.model.File;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * See Javadoc {@link AbstractGoogleDriveIT} for instructions how to run this 
test.
+ */
+public class FetchGoogleDriveIT extends 
AbstractGoogleDriveIT<FetchGoogleDrive> implements OutputChecker {
+    @Override
+    public FetchGoogleDrive createTestSubject() {
+        FetchGoogleDrive testSubject = new FetchGoogleDrive();
+
+        return testSubject;
+    }
+
+    @Test
+    void testFetchSingleFileByInputAttributes() throws Exception {
+        // GIVEN
+        File file = createFileWithDefaultContent("test_file.txt", 
mainFolderId);
+
+        Map<String, String> inputFlowFileAttributes = new HashMap<>();
+        inputFlowFileAttributes.put("drive.id", file.getId());
+        inputFlowFileAttributes.put("filename", file.getName());
+
+        HashSet<Map<String, String>> expectedAttributes = new 
HashSet<>(Arrays.asList(inputFlowFileAttributes));
+        List<String> expectedContent = Arrays.asList(DEFAULT_FILE_CONTENT);
+
+        // WHEN
+        testRunner.enqueue("unimportant_data", inputFlowFileAttributes);
+        testRunner.run();
+
+        // THEN
+        testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0);
+        testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
+
+        checkAttributes(FetchGoogleDrive.REL_SUCCESS, expectedAttributes);
+        checkContent(FetchGoogleDrive.REL_SUCCESS, expectedContent);
+    }
+
+    @Test
+    void testInputFlowFileReferencesMissingFile() throws Exception {
+        // GIVEN
+        Map<String, String> inputFlowFileAttributes = new HashMap<>();
+        inputFlowFileAttributes.put("drive.id", "missing");
+        inputFlowFileAttributes.put("filename", "missing_filename");
+
+        Set<Map<String, String>> expectedFailureAttributes = new 
HashSet<>(Arrays.asList(
+                new HashMap<String, String>() {{
+                    put("drive.id", "missing");
+                    put("filename", "missing_filename");
+                    put("error.code", "404");
+                }}
+        ));
+
+        // WHEN
+        testRunner.enqueue("unimportant_data", inputFlowFileAttributes);
+        testRunner.run();
+
+        // THEN
+        testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
+        testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
+
+        checkAttributes(FetchGoogleDrive.REL_FAILURE, 
expectedFailureAttributes);
+    }
+
+    @Test
+    void testInputFlowFileThrowsExceptionBeforeFetching() throws Exception {
+        // GIVEN
+        File file = createFileWithDefaultContent("test_file.txt", 
mainFolderId);
+
+        Map<String, String> inputFlowFileAttributes = new HashMap<>();
+        inputFlowFileAttributes.put("drive.id", file.getId());
+        inputFlowFileAttributes.put("filename", file.getName());
+
+        MockFlowFile input = new MockFlowFile(1) {
+            AtomicBoolean throwException = new AtomicBoolean(true);
+
+            @Override
+            public boolean isPenalized() {
+                // We want to throw exception only once because the exception 
handling itself calls this again
+                if (throwException.get()) {
+                    throwException.set(false);
+                    throw new RuntimeException("Intentional exception");
+                } else {
+                    return super.isPenalized();
+                }
+            }
+
+            @Override
+            public Map<String, String> getAttributes() {
+                return inputFlowFileAttributes;
+            }
+        };
+
+        Set<Map<String, String>> expectedFailureAttributes = new 
HashSet<>(Arrays.asList(
+                new HashMap<String, String>() {{
+                    putAll(inputFlowFileAttributes);
+                    put("error.code", "N/A");
+                }}
+        ));
+
+        // WHEN
+        testRunner.enqueue(input);
+        testRunner.run();
+
+        // THEN
+        testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
+        testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
+
+        checkAttributes(FetchGoogleDrive.REL_FAILURE, 
expectedFailureAttributes);
+    }
+
+    @Test
+    void testFetchMultipleFilesByInputRecords() throws Exception {
+        // GIVEN
+        addJsonRecordReaderFactory();
+
+        File file1 = createFile("test_file_1.txt", "test_content_1", 
mainFolderId);
+        File file2 = createFile("test_file_2.txt", "test_content_2", 
mainFolderId);
+
+        String input = "[" +
+                "{" +
+                "\"drive.id\":\"" + file1.getId() + "\"," +
+                "\"filename\":\"" + file1.getName() + "\"" +
+                "}," +
+                "{" +
+                "\"drive.id\":\"" + file2.getId() + "\"," +
+                "\"filename\":\"" + file2.getName() + "\"" +
+                "}" +
+                "]";
+
+        List<String> expectedContent = Arrays.asList(
+                "test_content_1",
+                "test_content_2"
+        );
+
+        Set<Map<String, String>> expectedAttributes = new 
HashSet<>(Arrays.asList(
+                new HashMap<String, String>() {{
+                    put("drive.id", "" + file1.getId());
+                    put("filename", file1.getName());
+                }},
+                new HashMap<String, String>() {{
+                    put("drive.id", "" + file2.getId());
+                    put("filename", file2.getName());
+                }}
+        ));
+
+        // WHEN
+        testRunner.enqueue(input);
+        testRunner.run();
+
+        // THEN
+        testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0);
+        testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
+
+        checkContent(FetchGoogleDrive.REL_SUCCESS, expectedContent);
+        checkAttributes(FetchGoogleDrive.REL_SUCCESS, expectedAttributes);
+    }
+
+    @Test
+    void testInputRecordReferencesMissingFile() throws Exception {
+        // GIVEN
+        addJsonRecordReaderFactory();
+
+        String input = "[" +
+                "{" +
+                "\"drive.id\":\"missing\"," +
+                "\"filename\":\"missing_filename\"" +
+                "}" +
+                "]";
+
+        Set<Map<String, String>> expectedFailureAttributes = new 
HashSet<>(Arrays.asList(
+                new HashMap<String, String>() {{
+                    put("drive.id", "missing");
+                    put("filename", "missing_filename");
+                    put("error.code", "404");
+                }}
+        ));
+
+        // WHEN
+        testRunner.enqueue(input);
+        testRunner.run();
+
+        // THEN
+        testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
+        testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
+
+        checkAttributes(FetchGoogleDrive.REL_FAILURE, 
expectedFailureAttributes);
+    }
+
+    @Test
+    void testInputRecordsAreInvalid() throws Exception {
+        // GIVEN
+        addJsonRecordReaderFactory();
+
+        String input = "invalid json";
+
+        List<String> expectedContents = Arrays.asList("invalid json");
+
+        // WHEN
+        testRunner.enqueue(input);
+        testRunner.run();
+
+        // THEN
+        testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
+        testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0);
+
+        checkContent(FetchGoogleDrive.REL_INPUT_FAILURE, expectedContents);
+    }
+
+    @Test
+    void testThrowExceptionBeforeRecordsAreProcessed() throws Exception {
+        // GIVEN
+        addJsonRecordReaderFactory();
+
+        File file = createFile("test_file.txt", mainFolderId);
+
+        String validInputContent = "[" +
+                "{" +
+                "\"drive.id\":\"" + file.getId() + "\"," +
+                "\"filename\":\"" + file.getName() + "\"" +
+                "}" +
+                "]";
+
+        MockFlowFile input = new MockFlowFile(1) {
+            @Override
+            public Map<String, String> getAttributes() {
+                throw new RuntimeException("Intentional exception");
+            }
+
+            @Override
+            public String getContent() {
+                return validInputContent;
+            }
+        };
+
+        List<String> expectedContents = Arrays.asList(validInputContent);
+
+        // WHEN
+        testRunner.enqueue(input);
+        testRunner.run();
+
+        // THEN
+        testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
+        testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0);
+
+        checkContent(FetchGoogleDrive.REL_INPUT_FAILURE, expectedContents);
+    }
+
+    @Test
+    void testOneInputRecordOutOfManyThrowsUnexpectedException() throws 
Exception {
+        // GIVEN
+        AtomicReference<String> fileIdToThrowException = new 
AtomicReference<>();
+
+        testSubject = new FetchGoogleDrive() {
+            @Override
+            void fetchFile(String fileId, ProcessSession session, FlowFile 
outFlowFile) throws IOException {
+                if (fileId.equals(fileIdToThrowException.get())) {
+                    throw new RuntimeException(fileId + " intentionally forces 
exception");
+                }
+                super.fetchFile(fileId, session, outFlowFile);
+            }
+        };
+        testRunner = createTestRunner();
+
+        addJsonRecordReaderFactory();
+
+        File file1 = createFile("test_file_1.txt", "test_content_1", 
mainFolderId);
+        File file2 = createFile("test_file_2.txt", "test_content_2", 
mainFolderId);
+
+        String input = "[" +
+                "{" +
+                "\"drive.id\":\"" + file1.getId() + "\"," +
+                "\"filename\":\"" + file1.getName() + "\"" +
+                "}," +
+                "{" +
+                "\"drive.id\":\"" + file2.getId() + "\"," +
+                "\"filename\":\"" + file2.getName() + "\"" +
+                "}" +
+                "]";
+
+        fileIdToThrowException.set(file2.getId());
+
+        Set<Map<String, String>> expectedSuccessAttributes = new 
HashSet<>(Arrays.asList(
+                new HashMap<String, String>() {{
+                    put("drive.id", file1.getId());
+                    put("filename", file1.getName());
+                }}
+        ));
+        List<String> expectedSuccessContents = Arrays.asList("test_content_1");
+
+        Set<Map<String, String>> expectedFailureAttributes = new 
HashSet<>(Arrays.asList(
+                new HashMap<String, String>() {{
+                    put("drive.id", file2.getId());
+                    put("filename", file2.getName());
+                    put(FetchGoogleDrive.ERROR_CODE_ATTRIBUTE, "N/A");
+                }}
+        ));
+
+        // WHEN
+        testRunner.enqueue(input);
+        testRunner.run();
+
+        // THEN
+        testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0);
+
+        checkAttributes(FetchGoogleDrive.REL_SUCCESS, 
expectedSuccessAttributes);
+        checkContent(FetchGoogleDrive.REL_SUCCESS, expectedSuccessContents);
+
+        checkAttributes(FetchGoogleDrive.REL_FAILURE, 
expectedFailureAttributes);
+        checkContent(FetchGoogleDrive.REL_FAILURE, Arrays.asList(""));
+    }
+
+    private void addJsonRecordReaderFactory() throws InitializationException {
+        RecordReaderFactory recordReader = new JsonTreeReader();
+        testRunner.addControllerService("record_reader", recordReader);
+        testRunner.enableControllerService(recordReader);
+        testRunner.setProperty(FetchGoogleDrive.RECORD_READER, 
"record_reader");
+    }
+
+    public Set<String> getCheckedAttributeNames() {
+        Set<String> checkedAttributeNames = 
OutputChecker.super.getCheckedAttributeNames();
+
+        checkedAttributeNames.add(FetchGoogleDrive.ERROR_CODE_ATTRIBUTE);
+
+        return checkedAttributeNames;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveIT.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveIT.java
index 6c184bc504..f352539064 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveIT.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveIT.java
@@ -16,27 +16,11 @@
  */
 package org.apache.nifi.processors.gcp.drive;
 
-import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
-import com.google.api.client.http.AbstractInputStreamContent;
-import com.google.api.client.http.ByteArrayContent;
-import com.google.api.client.http.javanet.NetHttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.json.gson.GsonFactory;
-import com.google.api.services.drive.Drive;
-import com.google.api.services.drive.DriveScopes;
 import com.google.api.services.drive.model.File;
-import 
org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
-import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
-import org.apache.nifi.processors.gcp.util.GoogleUtils;
 import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -47,63 +31,20 @@ import java.util.stream.Collectors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
- * Set the following constants before running:<br />
- * <br />
- * CREDENTIAL_JSON_FILE_PATH - A Service Account credentials JSON file.<br />
- * SHARED_FOLDER_ID - The ID of a Folder that is shared with the Service 
Account. The test will create files and sub-folders within this folder.<br />
- * <br />
- * Created files and folders are cleaned up, but it's advisable to dedicate a 
folder for this test so that it can be cleaned up easily should the test fail 
to do so.
- * <br /><br />
- * WARNING: The creation of a file is not a synchronized operation so tests 
may fail because the processor may not list all of them.
+ * See Javadoc {@link AbstractGoogleDriveIT} for instructions how to run this 
test.
  */
-public class ListGoogleDriveIT {
-    public static final String CREDENTIAL_JSON_FILE_PATH = "";
-    public static final String SHARED_FOLDER_ID = "";
-
-    public static final JsonFactory JSON_FACTORY = 
GsonFactory.getDefaultInstance();
-
-    private TestRunner testRunner;
-
-    private Drive driveService;
-
-    private String mainFolderId;
-
+public class ListGoogleDriveIT extends AbstractGoogleDriveIT<ListGoogleDrive> {
     @BeforeEach
     public void init() throws Exception {
-        ListGoogleDrive testSubject = new ListGoogleDrive();
-
-        testRunner = TestRunners.newTestRunner(testSubject);
-
-        GCPCredentialsControllerService gcpCredentialsControllerService = new 
GCPCredentialsControllerService();
-        testRunner.addControllerService("gcp_credentials_provider_service", 
gcpCredentialsControllerService);
-        testRunner.setProperty(gcpCredentialsControllerService, 
CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE, 
CREDENTIAL_JSON_FILE_PATH);
-        testRunner.enableControllerService(gcpCredentialsControllerService);
-        testRunner.setProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, 
"gcp_credentials_provider_service");
-
-        NetHttpTransport httpTransport = 
GoogleNetHttpTransport.newTrustedTransport();
-
-        driveService = new Drive.Builder(
-                httpTransport,
-                JSON_FACTORY,
-                testSubject.getHttpCredentialsAdapter(
-                        testRunner.getProcessContext(),
-                        DriveScopes.all()
-                )
-        )
-                .setApplicationName(this.getClass().getSimpleName())
-                .build();
-
-        File file = createFolder("main", SHARED_FOLDER_ID);
-
-        mainFolderId = file.getId();
+        super.init();
         testRunner.setProperty(ListGoogleDrive.FOLDER_ID, mainFolderId);
     }
 
-    @AfterEach
-    void tearDown() throws IOException {
-        driveService.files()
-                .delete(mainFolderId)
-                .execute();
+    @Override
+    public ListGoogleDrive createTestSubject() {
+        ListGoogleDrive testSubject = new ListGoogleDrive();
+
+        return testSubject;
     }
 
     @Test
@@ -114,18 +55,18 @@ public class ListGoogleDriveIT {
 
         File main_sub1_sub1 = createFolder("main_sub1_sub1", 
main_sub1.getId());
 
-        createFile("main_file1", mainFolderId);
-        createFile("main_file2", mainFolderId);
-        createFile("main_file3", mainFolderId);
+        createFileWithDefaultContent("main_file1", mainFolderId);
+        createFileWithDefaultContent("main_file2", mainFolderId);
+        createFileWithDefaultContent("main_file3", mainFolderId);
 
-        createFile("main_sub1_file1", main_sub1.getId());
+        createFileWithDefaultContent("main_sub1_file1", main_sub1.getId());
 
-        createFile("main_sub2_file1", main_sub2.getId());
-        createFile("main_sub2_file2", main_sub2.getId());
+        createFileWithDefaultContent("main_sub2_file1", main_sub2.getId());
+        createFileWithDefaultContent("main_sub2_file2", main_sub2.getId());
 
-        createFile("main_sub1_sub1_file1", main_sub1_sub1.getId());
-        createFile("main_sub1_sub1_file2", main_sub1_sub1.getId());
-        createFile("main_sub1_sub1_file3", main_sub1_sub1.getId());
+        createFileWithDefaultContent("main_sub1_sub1_file1", 
main_sub1_sub1.getId());
+        createFileWithDefaultContent("main_sub1_sub1_file2", 
main_sub1_sub1.getId());
+        createFileWithDefaultContent("main_sub1_sub1_file3", 
main_sub1_sub1.getId());
 
         Set<String> expectedFileNames = new HashSet<>(Arrays.asList(
                 "main_file1", "main_file2", "main_file3",
@@ -179,7 +120,7 @@ public class ListGoogleDriveIT {
         // GIVEN
         testRunner.setProperty(ListGoogleDrive.MIN_AGE, "15 s");
 
-        createFile("main_file", mainFolderId);
+        createFileWithDefaultContent("main_file", mainFolderId);
 
         // Make sure the file 'arrives' and could be listed
         Thread.sleep(5000);
@@ -218,39 +159,4 @@ public class ListGoogleDriveIT {
         assertEquals(expectedFileNames, actualFileNames);
     }
 
-    private File createFolder(String folderName, String... parentFolderIds) 
throws IOException {
-        File fileMetaData = new File();
-        fileMetaData.setName(folderName);
-
-        if (parentFolderIds != null) {
-            fileMetaData.setParents(Arrays.asList(parentFolderIds));
-        }
-
-        fileMetaData.setMimeType("application/vnd.google-apps.folder");
-
-        Drive.Files.Create create = driveService.files()
-                .create(fileMetaData)
-                .setFields("id");
-
-        File file = create.execute();
-
-        return file;
-    }
-
-    private File createFile(String name, String... folderIds) throws 
IOException {
-        File fileMetadata = new File();
-        fileMetadata.setName(name);
-        fileMetadata.setParents(Arrays.asList(folderIds));
-
-        AbstractInputStreamContent content = new 
ByteArrayContent("text/plain", "test_content".getBytes(StandardCharsets.UTF_8));
-
-        Drive.Files.Create create = driveService.files()
-                .create(fileMetadata, content)
-                .setFields("id, modifiedTime");
-
-        File file = create.execute();
-
-        return file;
-    }
-
 }
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveSimpleTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveSimpleTest.java
new file mode 100644
index 0000000000..e29fc62a57
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveSimpleTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.util.DateTime;
+import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.model.File;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.EqualsWrapper;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.apache.nifi.util.EqualsWrapper.wrapList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ListGoogleDriveSimpleTest {
+    private ListGoogleDrive testSubject;
+
+    private ProcessContext mockProcessContext;
+    private Drive mockDriverService;
+
+    private String listingModeAsString = "EXECUTION";
+
+    @BeforeEach
+    void setUp() throws Exception {
+        mockProcessContext = mock(ProcessContext.class, RETURNS_DEEP_STUBS);
+        mockDriverService = mock(Drive.class, RETURNS_DEEP_STUBS);
+
+        testSubject = new ListGoogleDrive() {
+            @Override
+            protected List<GoogleDriveFileInfo> performListing(ProcessContext 
context, Long minTimestamp, ListingMode ignoredListingMode) throws IOException {
+                ListingMode acquiredListingMode = 
ListingMode.valueOf(listingModeAsString);
+
+                return super.performListing(context, minTimestamp, 
acquiredListingMode);
+            }
+
+            @Override
+            public Drive createDriveService(ProcessContext context, 
HttpTransport httpTransport, String... scopes) {
+                return mockDriverService;
+            }
+        };
+
+        testSubject.onScheduled(mockProcessContext);
+    }
+
+    @Test
+    void testCreatedListableEntityContainsCorrectData() throws Exception {
+        // GIVEN
+        Long minTimestamp = 0L;
+        listingModeAsString = "EXECUTION";
+
+        String id = "id_1";
+        String filename = "file_name_1";
+        Long size = 125L;
+        long createdTime = 123456L;
+        long modifiedTime = 234567L;
+        String mimeType = "mime_type_1";
+
+        when(mockDriverService.files()
+                .list()
+                .setQ("('null' in parents) and (mimeType != 
'application/vnd.google-apps.folder') and (mimeType != 
'application/vnd.google-apps.shortcut') and trashed = false")
+                .setPageToken(null)
+                .setFields("nextPageToken, files(id, name, size, createdTime, 
modifiedTime, mimeType)")
+                .execute()
+                .getFiles()
+        ).thenReturn(Arrays.asList(
+                createFile(
+                        id,
+                        filename,
+                        size,
+                        new DateTime(createdTime),
+                        new DateTime(modifiedTime),
+                        mimeType
+                )
+        ));
+
+        List<GoogleDriveFileInfo> expected = Arrays.asList(
+                new GoogleDriveFileInfo.Builder()
+                        .id(id)
+                        .fileName(filename)
+                        .size(size)
+                        .createdTime(createdTime)
+                        .modifiedTime(modifiedTime)
+                        .mimeType(mimeType)
+                        .build()
+        );
+
+        // WHEN
+        List<GoogleDriveFileInfo> actual = 
testSubject.performListing(mockProcessContext, minTimestamp, null);
+
+        // THEN
+        List<Function<GoogleDriveFileInfo, Object>> propertyProviders = 
Arrays.asList(
+                GoogleDriveFileInfo::getId,
+                GoogleDriveFileInfo::getIdentifier,
+                GoogleDriveFileInfo::getName,
+                GoogleDriveFileInfo::getSize,
+                GoogleDriveFileInfo::getTimestamp,
+                GoogleDriveFileInfo::getCreatedTime,
+                GoogleDriveFileInfo::getModifiedTime,
+                GoogleDriveFileInfo::getMimeType
+        );
+
+        List<EqualsWrapper<GoogleDriveFileInfo>> expectedWrapper = 
wrapList(expected, propertyProviders);
+        List<EqualsWrapper<GoogleDriveFileInfo>> actualWrapper = 
wrapList(actual, propertyProviders);
+
+        assertEquals(expectedWrapper, actualWrapper);
+    }
+
+    private File createFile(
+            String id,
+            String name,
+            Long size,
+            DateTime createdTime,
+            DateTime modifiedTime,
+            String mimeType
+    ) {
+        File file = new File();
+
+        file
+                .setId(id)
+                .setName(name)
+                .setMimeType(mimeType)
+                .setCreatedTime(createdTime)
+                .setModifiedTime(modifiedTime)
+                .setSize(size);
+
+        return file;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriverTestRunnerTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriverTestRunnerTest.java
new file mode 100644
index 0000000000..c6d2ab4261
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriverTestRunnerTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.util.DateTime;
+import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.model.File;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.processor.ProcessContext;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.processors.gcp.util.GoogleUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ListGoogleDriverTestRunnerTest implements OutputChecker {
+    private ListGoogleDrive testSubject;
+    private TestRunner testRunner;
+
+    private Drive mockDriverService;
+
+    private String folderId = "folderId";
+
+    @BeforeEach
+    void setUp() throws Exception {
+        mockDriverService = mock(Drive.class, Mockito.RETURNS_DEEP_STUBS);
+
+        testSubject = new ListGoogleDrive() {
+            @Override
+            protected List<GoogleDriveFileInfo> performListing(ProcessContext 
context, Long minTimestamp, ListingMode ignoredListingMode) throws IOException {
+                return super.performListing(context, minTimestamp, 
ListingMode.EXECUTION);
+            }
+
+            @Override
+            public Drive createDriveService(ProcessContext context, 
HttpTransport httpTransport, String... scopes) {
+                return mockDriverService;
+            }
+        };
+
+        testRunner = TestRunners.newTestRunner(testSubject);
+
+        String gcpCredentialsControllerServiceId = 
"gcp_credentials_provider_service";
+
+        GCPCredentialsControllerService gcpCredentialsControllerService = 
mock(GCPCredentialsControllerService.class, RETURNS_DEEP_STUBS);
+        
when(gcpCredentialsControllerService.getIdentifier()).thenReturn(gcpCredentialsControllerServiceId);
+
+        testRunner.addControllerService(gcpCredentialsControllerServiceId, 
gcpCredentialsControllerService);
+        testRunner.enableControllerService(gcpCredentialsControllerService);
+        testRunner.setProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, 
gcpCredentialsControllerServiceId);
+
+        testRunner.setProperty(ListGoogleDrive.FOLDER_ID, folderId);
+    }
+
+    @Test
+    void testOutputAsAttributesWhereTimestampIsCreatedTime() throws Exception {
+        // GIVEN
+        String id = "id_1";
+        String filename = "file_name_1";
+        Long size = 125L;
+        Long createdTime = 123456L;
+        Long modifiedTime = null;
+        String mimeType = "mime_type_1";
+
+        // WHEN
+        // THEN
+        testOutputAsAttributes(id, filename, size, createdTime, modifiedTime, 
mimeType, createdTime);
+    }
+
+    @Test
+    void testOutputAsAttributesWhereTimestampIsModifiedTime() throws Exception 
{
+        // GIVEN
+        String id = "id_1";
+        String filename = "file_name_1";
+        Long size = 125L;
+        Long createdTime = 123456L;
+        Long modifiedTime = 123456L + 1L;
+        String mimeType = "mime_type_1";
+
+        // WHEN
+        // THEN
+        testOutputAsAttributes(id, filename, size, createdTime, modifiedTime, 
mimeType, modifiedTime);
+    }
+
+    @Test
+    void testOutputAsContent() throws Exception {
+        // GIVEN
+        String id = "id_1";
+        String filename = "file_name_1";
+        Long size = 125L;
+        Long createdTime = 123456L;
+        Long modifiedTime = 123456L + 1L;
+        String mimeType = "mime_type_1";
+
+        addJsonRecordWriterFactory();
+
+        mockFetchedGoogleDriveFileList(id, filename, size, createdTime, 
modifiedTime, mimeType);
+
+        List<String> expectedContents = Arrays.asList(
+                "[" +
+                        "{" +
+                        "\"drive.id\":\"" + id + "\"," +
+                        "\"filename\":\"" + filename + "\"," +
+                        "\"drive.size\":" + size + "," +
+                        "\"drive.timestamp\":" + modifiedTime + "," +
+                        "\"mime.type\":\"" + mimeType + "\"" +
+                        "}" +
+                        "]");
+
+        // WHEN
+        testRunner.run();
+
+        // THEN
+        checkContent(ListGoogleDrive.REL_SUCCESS, expectedContents);
+    }
+
+    private void addJsonRecordWriterFactory() throws InitializationException {
+        RecordSetWriterFactory recordSetWriter = new JsonRecordSetWriter();
+        testRunner.addControllerService("record_writer", recordSetWriter);
+        testRunner.enableControllerService(recordSetWriter);
+        testRunner.setProperty(ListGoogleDrive.RECORD_WRITER, "record_writer");
+    }
+
+    private void mockFetchedGoogleDriveFileList(String id, String filename, 
Long size, Long createdTime, Long modifiedTime, String mimeType) throws 
IOException {
+        when(mockDriverService.files()
+                .list()
+                .setQ("('" + folderId + "' in parents) and (mimeType != 
'application/vnd.google-apps.folder') and (mimeType != 
'application/vnd.google-apps.shortcut') and trashed = false")
+                .setPageToken(null)
+                .setFields("nextPageToken, files(id, name, size, createdTime, 
modifiedTime, mimeType)")
+                .execute()
+                .getFiles()
+        ).thenReturn(Arrays.asList(
+                createFile(
+                        id,
+                        filename,
+                        size,
+                        
Optional.ofNullable(createdTime).map(DateTime::new).orElse(null),
+                        
Optional.ofNullable(modifiedTime).map(DateTime::new).orElse(null),
+                        mimeType
+                )
+        ));
+    }
+
+    private void testOutputAsAttributes(String id, String filename, Long size, 
Long createdTime, Long modifiedTime, String mimeType, Long expectedTimestamp) 
throws IOException {
+        // GIVEN
+        mockFetchedGoogleDriveFileList(id, filename, size, createdTime, 
modifiedTime, mimeType);
+
+        Map<String, String> inputFlowFileAttributes = new HashMap<>();
+        inputFlowFileAttributes.put("drive.id", id);
+        inputFlowFileAttributes.put("filename", filename);
+        inputFlowFileAttributes.put("drive.size", "" + size);
+        inputFlowFileAttributes.put("drive.timestamp", "" + expectedTimestamp);
+        inputFlowFileAttributes.put("mime.type", mimeType);
+
+        HashSet<Map<String, String>> expectedAttributes = new 
HashSet<>(Arrays.asList(inputFlowFileAttributes));
+
+        // WHEN
+        testRunner.run();
+
+        // THEN
+        checkAttributes(ListGoogleDrive.REL_SUCCESS, expectedAttributes);
+    }
+
+    private File createFile(
+            String id,
+            String name,
+            Long size,
+            DateTime createdTime,
+            DateTime modifiedTime,
+            String mimeType
+    ) {
+        File file = new File();
+
+        file
+                .setId(id)
+                .setName(name)
+                .setMimeType(mimeType)
+                .setCreatedTime(createdTime)
+                .setModifiedTime(modifiedTime)
+                .setSize(size);
+
+        return file;
+    }
+
+    @Override
+    public TestRunner getTestRunner() {
+        return testRunner;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/OutputChecker.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/OutputChecker.java
new file mode 100644
index 0000000000..1c49f6ecfe
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/OutputChecker.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public interface OutputChecker {
+    TestRunner getTestRunner();
+
+    default void checkAttributes(Relationship relationship, Set<Map<String, 
String>> expectedAttributes) {
+        getTestRunner().assertTransferCount(relationship, 
expectedAttributes.size());
+        List<MockFlowFile> flowFiles = 
getTestRunner().getFlowFilesForRelationship(relationship);
+
+        Set<String> checkedAttributeNames = getCheckedAttributeNames();
+
+        Set<Map<String, String>> actualAttributes = flowFiles.stream()
+                .map(flowFile -> flowFile.getAttributes().entrySet().stream()
+                        .filter(attributeNameAndValue -> 
checkedAttributeNames.contains(attributeNameAndValue.getKey()))
+                        .filter(entry -> entry.getKey() != null && 
entry.getValue() != null)
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue))
+
+                )
+                .collect(Collectors.toSet());
+
+        assertEquals(expectedAttributes, actualAttributes);
+    }
+
+    default Set<String> getCheckedAttributeNames() {
+        Set<String> checkedAttributeNames = 
Arrays.stream(GoogleDriveFlowFileAttribute.values())
+                .map(GoogleDriveFlowFileAttribute::getName)
+                .collect(Collectors.toSet());
+
+        return checkedAttributeNames;
+    }
+
+    default void checkContent(Relationship relationship, List<String> 
expectedContent) {
+        getTestRunner().assertTransferCount(relationship, 
expectedContent.size());
+        List<MockFlowFile> flowFiles = 
getTestRunner().getFlowFilesForRelationship(relationship);
+
+        List<String> actualContent = flowFiles.stream()
+                .map(flowFile -> flowFile.getContent())
+                .collect(Collectors.toList());
+
+        assertEquals(expectedContent, actualContent);
+    }
+}

Reply via email to