This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 2fae3bb9a8 NIFI-10281 - Added FetchGoogleDrive processor. Consolidated record- and attribute output for ListGoogleDrive. (Record fields and flowfile attributes are now the same.) 2fae3bb9a8 is described below commit 2fae3bb9a8a8c046b9430469b16d66044a39d9dc Author: Tamas Palfy <tpa...@apache.org> AuthorDate: Tue Jul 26 20:00:23 2022 +0200 NIFI-10281 - Added FetchGoogleDrive processor. Consolidated record- and attribute output for ListGoogleDrive. (Record fields and flowfile attributes are now the same.) This closes #6248. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../processors/gcp/drive/FetchGoogleDrive.java | 238 ++++++++++++++ .../processors/gcp/drive/GoogleDriveFileInfo.java | 21 +- .../gcp/drive/GoogleDriveFlowFileAttribute.java | 56 ++++ .../processors/gcp/drive/GoogleDriveTrait.java | 69 ++++ .../nifi/processors/gcp/drive/ListGoogleDrive.java | 76 ++--- .../services/org.apache.nifi.processor.Processor | 3 +- .../gcp/drive/AbstractGoogleDriveIT.java | 153 +++++++++ .../processors/gcp/drive/FetchGoogleDriveIT.java | 361 +++++++++++++++++++++ .../processors/gcp/drive/ListGoogleDriveIT.java | 130 +------- .../gcp/drive/ListGoogleDriveSimpleTest.java | 152 +++++++++ .../gcp/drive/ListGoogleDriverTestRunnerTest.java | 219 +++++++++++++ .../nifi/processors/gcp/drive/OutputChecker.java | 70 ++++ 12 files changed, 1371 insertions(+), 177 deletions(-) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java new file mode 100644 index 0000000000..9804999854 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.drive; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.services.drive.Drive; +import com.google.api.services.drive.DriveScopes; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory; +import org.apache.nifi.processors.gcp.util.GoogleUtils; +import org.apache.nifi.proxy.ProxyConfiguration; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"google", "drive", "storage", "fetch"}) +@CapabilityDescription("Fetches files from a Google Drive Folder. Designed to be used in tandem with ListGoogleDrive.") +@SeeAlso({ListGoogleDrive.class}) +@WritesAttributes({ + @WritesAttribute(attribute = FetchGoogleDrive.ERROR_CODE_ATTRIBUTE, description = "The error code returned by Google Drive when the fetch of a file fails"), + @WritesAttribute(attribute = FetchGoogleDrive.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by Google Drive when the fetch of a file fails") +}) +public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTrait { + public static final String ERROR_CODE_ATTRIBUTE = "error.code"; + public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message"; + + public static final PropertyDescriptor FILE_ID = new PropertyDescriptor + .Builder().name("drive-file-id") + .displayName("File ID") + .description("The Drive ID of the File to fetch") + .required(true) + .defaultValue("${drive.id}") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for reading incoming Google Driver File meta-data as NiFi Records." + + " If not set, the Processor expects as attributes of a separate flowfile for each File to fetch.") + .identifiesControllerService(RecordReaderFactory.class) + .required(false) + .build(); + + public static final Relationship REL_SUCCESS = + new Relationship.Builder() + .name("success") + .description("A flowfile will be routed here for each successfully fetched File.") + .build(); + + public static final Relationship REL_FAILURE = + new Relationship.Builder().name("failure") + .description("A flowfile will be routed here for each File for which fetch was attempted but failed.") + .build(); + + public static final Relationship REL_INPUT_FAILURE = + new Relationship.Builder().name("input_failure") + .description("The incoming flowfile will be routed here if it's content could not be processed.") + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, + FILE_ID, + RECORD_READER, + ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS) + )); + + public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_SUCCESS, + REL_FAILURE, + REL_INPUT_FAILURE + ))); + + private volatile Drive driveService; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) throws IOException { + final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context); + + driveService = createDriveService( + context, + new ProxyAwareTransportFactory(proxyConfiguration).create(), + DriveScopes.DRIVE, DriveScopes.DRIVE_FILE + ); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + if (context.getProperty(RECORD_READER).isSet()) { + RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + + try (InputStream inFlowFile = session.read(flowFile)) { + final Map<String, String> flowFileAttributes = flowFile.getAttributes(); + final RecordReader reader = recordReaderFactory.createRecordReader(flowFileAttributes, inFlowFile, flowFile.getSize(), getLogger()); + + Record record; + while ((record = reader.nextRecord()) != null) { + String fileId = record.getAsString(GoogleDriveFileInfo.ID); + FlowFile outFlowFile = session.create(flowFile); + try { + addAttributes(session, outFlowFile, record); + + fetchFile(fileId, session, outFlowFile); + + session.transfer(outFlowFile, REL_SUCCESS); + } catch (GoogleJsonResponseException e) { + handleErrorResponse(session, fileId, outFlowFile, e); + } catch (Exception e) { + handleUnexpectedError(session, outFlowFile, fileId, e); + } + } + session.remove(flowFile); + } catch (IOException | MalformedRecordException | SchemaNotFoundException e) { + getLogger().error("Couldn't read file metadata content as records from incoming flowfile", e); + + session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage()); + + session.transfer(flowFile, REL_INPUT_FAILURE); + } catch (Exception e) { + getLogger().error("Unexpected error while processing incoming flowfile", e); + + session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage()); + + session.transfer(flowFile, REL_INPUT_FAILURE); + } + } else { + String fileId = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue(); + FlowFile outFlowFile = flowFile; + try { + fetchFile(fileId, session, outFlowFile); + + session.transfer(outFlowFile, REL_SUCCESS); + } catch (GoogleJsonResponseException e) { + handleErrorResponse(session, fileId, flowFile, e); + } catch (Exception e) { + handleUnexpectedError(session, flowFile, fileId, e); + } + } + session.commitAsync(); + } + + private void addAttributes(ProcessSession session, FlowFile outFlowFile, Record record) { + Map<String, String> attributes = new HashMap<>(); + + for (GoogleDriveFlowFileAttribute attribute : GoogleDriveFlowFileAttribute.values()) { + Optional.ofNullable(attribute.getValue(record)) + .ifPresent(value -> attributes.put(attribute.getName(), value)); + } + + session.putAllAttributes(outFlowFile, attributes); + } + + void fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws IOException { + InputStream driveFileInputStream = driveService + .files() + .get(fileId) + .executeMediaAsInputStream(); + + session.importFrom(driveFileInputStream, outFlowFile); + } + + private void handleErrorResponse(ProcessSession session, String fileId, FlowFile outFlowFile, GoogleJsonResponseException e) { + getLogger().error("Couldn't fetch file with id '{}'", fileId, e); + + session.putAttribute(outFlowFile, ERROR_CODE_ATTRIBUTE, "" + e.getStatusCode()); + session.putAttribute(outFlowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage()); + + session.transfer(outFlowFile, REL_FAILURE); + } + + private void handleUnexpectedError(ProcessSession session, FlowFile flowFile, String fileId, Exception e) { + getLogger().error("Unexpected error while fetching and processing file with id '{}'", fileId, e); + + session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, "N/A"); + session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage()); + + session.transfer(flowFile, REL_FAILURE); + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFileInfo.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFileInfo.java index 6395757f0b..f4a1a0a3cf 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFileInfo.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFileInfo.java @@ -30,23 +30,23 @@ import java.util.List; import java.util.Map; public class GoogleDriveFileInfo implements ListableEntity { + public static final String ID = "drive.id"; + public static final String FILENAME = "filename"; + public static final String SIZE = "drive.size"; + public static final String TIMESTAMP = "drive.timestamp"; + public static final String MIME_TYPE = "mime.type"; - private static final RecordSchema SCHEMA; - private static final String ID = "id"; - private static final String FILENAME = "filename"; - private static final String SIZE = "size"; - private static final String CREATED_TIME = "createdTime"; - private static final String MODIFIED_TIME = "modifiedTime"; - private static final String MIME_TYPE = "mimeType"; + private static final RecordSchema SCHEMA; static { final List<RecordField> recordFields = new ArrayList<>(); + recordFields.add(new RecordField(ID, RecordFieldType.STRING.getDataType(), false)); recordFields.add(new RecordField(FILENAME, RecordFieldType.STRING.getDataType(), false)); recordFields.add(new RecordField(SIZE, RecordFieldType.LONG.getDataType(), false)); - recordFields.add(new RecordField(CREATED_TIME, RecordFieldType.TIMESTAMP.getDataType(), false)); - recordFields.add(new RecordField(MODIFIED_TIME, RecordFieldType.TIMESTAMP.getDataType(), false)); + recordFields.add(new RecordField(TIMESTAMP, RecordFieldType.LONG.getDataType(), false)); recordFields.add(new RecordField(MIME_TYPE, RecordFieldType.STRING.getDataType())); + SCHEMA = new SimpleRecordSchema(recordFields); } @@ -84,8 +84,7 @@ public class GoogleDriveFileInfo implements ListableEntity { values.put(ID, getId()); values.put(FILENAME, getName()); values.put(SIZE, getSize()); - values.put(CREATED_TIME, getCreatedTime()); - values.put(MODIFIED_TIME, getModifiedTime()); + values.put(TIMESTAMP, getTimestamp()); values.put(MIME_TYPE, getMimeType()); return new MapRecord(SCHEMA, values); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFlowFileAttribute.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFlowFileAttribute.java new file mode 100644 index 0000000000..19f73fa398 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveFlowFileAttribute.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.drive; + +import org.apache.nifi.serialization.record.Record; + +import java.util.Optional; +import java.util.function.Function; + +public enum GoogleDriveFlowFileAttribute { + ID(GoogleDriveFileInfo.ID, GoogleDriveFileInfo::getId), + FILE_NAME(GoogleDriveFileInfo.FILENAME, GoogleDriveFileInfo::getName), + SIZE(GoogleDriveFileInfo.SIZE, fileInfo -> Optional.ofNullable(fileInfo.getSize()) + .map(String::valueOf) + .orElse(null) + ), + TIME_STAMP(GoogleDriveFileInfo.TIMESTAMP, fileInfo -> Optional.ofNullable(fileInfo.getTimestamp()) + .map(String::valueOf) + .orElse(null) + ), + MIME_TYPE(GoogleDriveFileInfo.MIME_TYPE, GoogleDriveFileInfo::getMimeType); + + private final String name; + private final Function<GoogleDriveFileInfo, String> fromFileInfo; + + GoogleDriveFlowFileAttribute(String attributeName, Function<GoogleDriveFileInfo, String> fromFileInfo) { + this.name = attributeName; + this.fromFileInfo = fromFileInfo; + } + + public String getName() { + return name; + } + + public String getValue(Record record) { + return record.getAsString(name); + } + + public String getValue(GoogleDriveFileInfo fileInfo) { + return fromFileInfo.apply(fileInfo); + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveTrait.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveTrait.java new file mode 100644 index 0000000000..5794c0b598 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveTrait.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.drive; + +import com.google.api.client.http.HttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.gson.GsonFactory; +import com.google.api.services.drive.Drive; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.auth.oauth2.GoogleCredentials; +import org.apache.nifi.gcp.credentials.service.GCPCredentialsService; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.gcp.util.GoogleUtils; + +import java.util.Arrays; +import java.util.Collection; + +public interface GoogleDriveTrait { + String APPLICATION_NAME = "NiFi"; + + JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance(); + + default Drive createDriveService(ProcessContext context, HttpTransport httpTransport, String... scopes) { + Drive driveService = new Drive.Builder( + httpTransport, + JSON_FACTORY, + getHttpCredentialsAdapter( + context, + Arrays.asList(scopes) + ) + ) + .setApplicationName(APPLICATION_NAME) + .build(); + + return driveService; + } + + default HttpCredentialsAdapter getHttpCredentialsAdapter( + final ProcessContext context, + final Collection<String> scopes + ) { + GoogleCredentials googleCredentials = getGoogleCredentials(context); + + HttpCredentialsAdapter httpCredentialsAdapter = new HttpCredentialsAdapter(googleCredentials.createScoped(scopes)); + + return httpCredentialsAdapter; + } + + default GoogleCredentials getGoogleCredentials(final ProcessContext context) { + final GCPCredentialsService gcpCredentialsService = context.getProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE) + .asControllerService(GCPCredentialsService.class); + + return gcpCredentialsService.getGoogleCredentials(); + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/ListGoogleDrive.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/ListGoogleDrive.java index b141f1ca53..e5fcbf8f20 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/ListGoogleDrive.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/ListGoogleDrive.java @@ -17,14 +17,11 @@ package org.apache.nifi.processors.gcp.drive; import com.google.api.client.http.HttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.client.json.gson.GsonFactory; +import com.google.api.client.util.DateTime; import com.google.api.services.drive.Drive; import com.google.api.services.drive.DriveScopes; import com.google.api.services.drive.model.File; import com.google.api.services.drive.model.FileList; -import com.google.auth.http.HttpCredentialsAdapter; -import com.google.auth.oauth2.GoogleCredentials; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; @@ -33,6 +30,7 @@ import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; @@ -41,7 +39,6 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.Scope; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.gcp.credentials.service.GCPCredentialsService; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.list.AbstractListProcessor; @@ -64,6 +61,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; @PrimaryNodeOnly @@ -74,24 +72,20 @@ import java.util.concurrent.TimeUnit; "Or - in case the 'Record Writer' property is set - the entire result is written as records to a single flowfile. " + "This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the " + "previous node left off without duplicating all of the data.") +@SeeAlso({FetchGoogleDrive.class}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@WritesAttributes({@WritesAttribute(attribute = "drive.id", description = "The id of the file"), - @WritesAttribute(attribute = "filename", description = "The name of the file"), - @WritesAttribute(attribute = "drive.size", description = "The size of the file"), - @WritesAttribute(attribute = "drive.timestamp", description = "The last modified time or created time (whichever is greater) of the file." + +@WritesAttributes({@WritesAttribute(attribute = GoogleDriveFileInfo.ID, description = "The id of the file"), + @WritesAttribute(attribute = GoogleDriveFileInfo.FILENAME, description = "The name of the file"), + @WritesAttribute(attribute = GoogleDriveFileInfo.SIZE, description = "The size of the file"), + @WritesAttribute(attribute = GoogleDriveFileInfo.TIMESTAMP, description = "The last modified time or created time (whichever is greater) of the file." + " The reason for this is that the original modified date of a file is preserved when uploaded to Google Drive." + " 'Created time' takes the time when the upload occurs. However uploaded files can still be modified later."), - @WritesAttribute(attribute = "mime.type", description = "MimeType of the file")}) + @WritesAttribute(attribute = GoogleDriveFileInfo.MIME_TYPE, description = "MimeType of the file")}) @Stateful(scopes = {Scope.CLUSTER}, description = "The processor stores necessary data to be able to keep track what files have been listed already." + " What exactly needs to be stored depends on the 'Listing Strategy'." + " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up" + " where the previous node left off, without duplicating the data.") -public class ListGoogleDrive extends AbstractListProcessor<GoogleDriveFileInfo> { - private static final String APPLICATION_NAME = "NiFi"; - private static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance(); - - private volatile HttpTransport httpTransport; - +public class ListGoogleDrive extends AbstractListProcessor<GoogleDriveFileInfo> implements GoogleDriveTrait { public static final PropertyDescriptor FOLDER_ID = new PropertyDescriptor.Builder() .name("folder-id") .displayName("Folder ID") @@ -156,6 +150,8 @@ public class ListGoogleDrive extends AbstractListProcessor<GoogleDriveFileInfo> ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS) )); + private volatile Drive driveService; + @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return PROPERTIES; @@ -171,11 +167,11 @@ public class ListGoogleDrive extends AbstractListProcessor<GoogleDriveFileInfo> final ProcessContext context ) { final Map<String, String> attributes = new HashMap<>(); - attributes.put("drive.id", entity.getId()); - attributes.put("filename", entity.getName()); - attributes.put("drive.size", String.valueOf(entity.getSize())); - attributes.put("drive.timestamp", String.valueOf(entity.getTimestamp())); - attributes.put("mime.type", entity.getMimeType()); + + for (GoogleDriveFlowFileAttribute attribute : GoogleDriveFlowFileAttribute.values()) { + Optional.ofNullable(attribute.getValue(entity)) + .ifPresent(value -> attributes.put(attribute.getName(), value)); + } return attributes; } @@ -183,7 +179,10 @@ public class ListGoogleDrive extends AbstractListProcessor<GoogleDriveFileInfo> @OnScheduled public void onScheduled(final ProcessContext context) throws IOException { final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context); - httpTransport = new ProxyAwareTransportFactory(proxyConfiguration).create(); + + HttpTransport httpTransport = new ProxyAwareTransportFactory(proxyConfiguration).create(); + + driveService = createDriveService(context, httpTransport, DriveScopes.DRIVE_METADATA_READONLY); } @Override @@ -230,17 +229,6 @@ public class ListGoogleDrive extends AbstractListProcessor<GoogleDriveFileInfo> final Boolean recursive = context.getProperty(RECURSIVE_SEARCH).asBoolean(); final Long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); - Drive driveService = new Drive.Builder( - this.httpTransport, - JSON_FACTORY, - getHttpCredentialsAdapter( - context, - Arrays.asList(DriveScopes.DRIVE_METADATA_READONLY) - ) - ) - .setApplicationName(APPLICATION_NAME) - .build(); - StringBuilder queryBuilder = new StringBuilder(); queryBuilder.append(buildQueryForDirs(driveService, folderId, recursive)); queryBuilder.append(" and (mimeType != 'application/vnd.google-apps.folder')"); @@ -276,8 +264,8 @@ public class ListGoogleDrive extends AbstractListProcessor<GoogleDriveFileInfo> .id(file.getId()) .fileName(file.getName()) .size(file.getSize()) - .createdTime(file.getCreatedTime().getValue()) - .modifiedTime(file.getModifiedTime().getValue()) + .createdTime(Optional.ofNullable(file.getCreatedTime()).map(DateTime::getValue).orElse(0L)) + .modifiedTime(Optional.ofNullable(file.getModifiedTime()).map(DateTime::getValue).orElse(0L)) .mimeType(file.getMimeType()); listing.add(builder.build()); @@ -294,24 +282,6 @@ public class ListGoogleDrive extends AbstractListProcessor<GoogleDriveFileInfo> return performListing(context, null, ListingMode.CONFIGURATION_VERIFICATION).size(); } - HttpCredentialsAdapter getHttpCredentialsAdapter( - final ProcessContext context, - final Collection<String> scopes - ) { - GoogleCredentials googleCredentials = getGoogleCredentials(context); - - HttpCredentialsAdapter httpCredentialsAdapter = new HttpCredentialsAdapter(googleCredentials.createScoped(scopes)); - - return httpCredentialsAdapter; - } - - private GoogleCredentials getGoogleCredentials(final ProcessContext context) { - final GCPCredentialsService gcpCredentialsService = context.getProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE) - .asControllerService(GCPCredentialsService.class); - - return gcpCredentialsService.getGoogleCredentials(); - } - private static String buildQueryForDirs( final Drive service, final String folderId, diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 32001116d7..799e648ff3 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -22,4 +22,5 @@ org.apache.nifi.processors.gcp.pubsub.lite.PublishGCPubSubLite org.apache.nifi.processors.gcp.pubsub.lite.ConsumeGCPubSubLite org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming -org.apache.nifi.processors.gcp.drive.ListGoogleDrive \ No newline at end of file +org.apache.nifi.processors.gcp.drive.ListGoogleDrive +org.apache.nifi.processors.gcp.drive.FetchGoogleDrive \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java new file mode 100644 index 0000000000..0176803602 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.drive; + +import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; +import com.google.api.client.http.AbstractInputStreamContent; +import com.google.api.client.http.ByteArrayContent; +import com.google.api.client.http.javanet.NetHttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.gson.GsonFactory; +import com.google.api.services.drive.Drive; +import com.google.api.services.drive.DriveScopes; +import com.google.api.services.drive.model.File; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors; +import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; +import org.apache.nifi.processors.gcp.util.GoogleUtils; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +/** + * Set the following constants before running:<br /> + * <br /> + * CREDENTIAL_JSON_FILE_PATH - A Service Account credentials JSON file.<br /> + * SHARED_FOLDER_ID - The ID of a Folder that is shared with the Service Account. The test will create files and sub-folders within this folder.<br /> + * <br /> + * Created files and folders are cleaned up, but it's advisable to dedicate a folder for this test so that it can be cleaned up easily should the test fail to do so. + * <br /><br /> + * WARNING: The creation of a file is not a synchronized operation, may need to adjust tests accordingly! + */ +public abstract class AbstractGoogleDriveIT<T extends GoogleDriveTrait & Processor> { + private static final String CREDENTIAL_JSON_FILE_PATH = ""; + private static final String SHARED_FOLDER_ID = ""; + + protected static final String DEFAULT_FILE_CONTENT = "test_content"; + + private static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance(); + + protected T testSubject; + protected TestRunner testRunner; + + protected Drive driveService; + + protected String mainFolderId; + + protected abstract T createTestSubject(); + + @BeforeEach + protected void init() throws Exception { + testSubject = createTestSubject(); + testRunner = createTestRunner(); + + NetHttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport(); + + driveService = new Drive.Builder( + httpTransport, + JSON_FACTORY, + testSubject.getHttpCredentialsAdapter( + testRunner.getProcessContext(), + DriveScopes.all() + ) + ) + .setApplicationName(this.getClass().getSimpleName()) + .build(); + + File mainFolder = createFolder("main", SHARED_FOLDER_ID); + mainFolderId = mainFolder.getId(); + } + + @AfterEach + protected void tearDown() throws Exception { + if (driveService != null) { + driveService.files() + .delete(mainFolderId) + .execute(); + } + } + + protected TestRunner createTestRunner() throws Exception { + TestRunner testRunner = TestRunners.newTestRunner(testSubject); + + GCPCredentialsControllerService gcpCredentialsControllerService = new GCPCredentialsControllerService(); + testRunner.addControllerService("gcp_credentials_provider_service", gcpCredentialsControllerService); + testRunner.setProperty(gcpCredentialsControllerService, CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE, CREDENTIAL_JSON_FILE_PATH); + testRunner.enableControllerService(gcpCredentialsControllerService); + testRunner.setProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, "gcp_credentials_provider_service"); + + return testRunner; + } + + protected File createFolder(String folderName, String... parentFolderIds) throws IOException { + File fileMetaData = new File(); + fileMetaData.setName(folderName); + + if (parentFolderIds != null) { + fileMetaData.setParents(Arrays.asList(parentFolderIds)); + } + + fileMetaData.setMimeType("application/vnd.google-apps.folder"); + + Drive.Files.Create create = driveService.files() + .create(fileMetaData) + .setFields("id"); + + File file = create.execute(); + + return file; + } + + protected File createFileWithDefaultContent(String name, String... folderIds) throws IOException { + return createFile(name, DEFAULT_FILE_CONTENT, folderIds); + } + + protected File createFile(String name, String fileContent, String... folderIds) throws IOException { + File fileMetadata = new File(); + fileMetadata.setName(name); + fileMetadata.setParents(Arrays.asList(folderIds)); + + AbstractInputStreamContent content = new ByteArrayContent("text/plain", fileContent.getBytes(StandardCharsets.UTF_8)); + + Drive.Files.Create create = driveService.files() + .create(fileMetadata, content) + .setFields("id, name, modifiedTime"); + + File file = create.execute(); + + return file; + } + + public TestRunner getTestRunner() { + return testRunner; + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java new file mode 100644 index 0000000000..48487b2609 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.drive; + +import com.google.api.services.drive.model.File; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.json.JsonTreeReader; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.util.MockFlowFile; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * See Javadoc {@link AbstractGoogleDriveIT} for instructions how to run this test. + */ +public class FetchGoogleDriveIT extends AbstractGoogleDriveIT<FetchGoogleDrive> implements OutputChecker { + @Override + public FetchGoogleDrive createTestSubject() { + FetchGoogleDrive testSubject = new FetchGoogleDrive(); + + return testSubject; + } + + @Test + void testFetchSingleFileByInputAttributes() throws Exception { + // GIVEN + File file = createFileWithDefaultContent("test_file.txt", mainFolderId); + + Map<String, String> inputFlowFileAttributes = new HashMap<>(); + inputFlowFileAttributes.put("drive.id", file.getId()); + inputFlowFileAttributes.put("filename", file.getName()); + + HashSet<Map<String, String>> expectedAttributes = new HashSet<>(Arrays.asList(inputFlowFileAttributes)); + List<String> expectedContent = Arrays.asList(DEFAULT_FILE_CONTENT); + + // WHEN + testRunner.enqueue("unimportant_data", inputFlowFileAttributes); + testRunner.run(); + + // THEN + testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0); + testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0); + + checkAttributes(FetchGoogleDrive.REL_SUCCESS, expectedAttributes); + checkContent(FetchGoogleDrive.REL_SUCCESS, expectedContent); + } + + @Test + void testInputFlowFileReferencesMissingFile() throws Exception { + // GIVEN + Map<String, String> inputFlowFileAttributes = new HashMap<>(); + inputFlowFileAttributes.put("drive.id", "missing"); + inputFlowFileAttributes.put("filename", "missing_filename"); + + Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(Arrays.asList( + new HashMap<String, String>() {{ + put("drive.id", "missing"); + put("filename", "missing_filename"); + put("error.code", "404"); + }} + )); + + // WHEN + testRunner.enqueue("unimportant_data", inputFlowFileAttributes); + testRunner.run(); + + // THEN + testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0); + testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0); + + checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes); + } + + @Test + void testInputFlowFileThrowsExceptionBeforeFetching() throws Exception { + // GIVEN + File file = createFileWithDefaultContent("test_file.txt", mainFolderId); + + Map<String, String> inputFlowFileAttributes = new HashMap<>(); + inputFlowFileAttributes.put("drive.id", file.getId()); + inputFlowFileAttributes.put("filename", file.getName()); + + MockFlowFile input = new MockFlowFile(1) { + AtomicBoolean throwException = new AtomicBoolean(true); + + @Override + public boolean isPenalized() { + // We want to throw exception only once because the exception handling itself calls this again + if (throwException.get()) { + throwException.set(false); + throw new RuntimeException("Intentional exception"); + } else { + return super.isPenalized(); + } + } + + @Override + public Map<String, String> getAttributes() { + return inputFlowFileAttributes; + } + }; + + Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(Arrays.asList( + new HashMap<String, String>() {{ + putAll(inputFlowFileAttributes); + put("error.code", "N/A"); + }} + )); + + // WHEN + testRunner.enqueue(input); + testRunner.run(); + + // THEN + testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0); + testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0); + + checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes); + } + + @Test + void testFetchMultipleFilesByInputRecords() throws Exception { + // GIVEN + addJsonRecordReaderFactory(); + + File file1 = createFile("test_file_1.txt", "test_content_1", mainFolderId); + File file2 = createFile("test_file_2.txt", "test_content_2", mainFolderId); + + String input = "[" + + "{" + + "\"drive.id\":\"" + file1.getId() + "\"," + + "\"filename\":\"" + file1.getName() + "\"" + + "}," + + "{" + + "\"drive.id\":\"" + file2.getId() + "\"," + + "\"filename\":\"" + file2.getName() + "\"" + + "}" + + "]"; + + List<String> expectedContent = Arrays.asList( + "test_content_1", + "test_content_2" + ); + + Set<Map<String, String>> expectedAttributes = new HashSet<>(Arrays.asList( + new HashMap<String, String>() {{ + put("drive.id", "" + file1.getId()); + put("filename", file1.getName()); + }}, + new HashMap<String, String>() {{ + put("drive.id", "" + file2.getId()); + put("filename", file2.getName()); + }} + )); + + // WHEN + testRunner.enqueue(input); + testRunner.run(); + + // THEN + testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0); + testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0); + + checkContent(FetchGoogleDrive.REL_SUCCESS, expectedContent); + checkAttributes(FetchGoogleDrive.REL_SUCCESS, expectedAttributes); + } + + @Test + void testInputRecordReferencesMissingFile() throws Exception { + // GIVEN + addJsonRecordReaderFactory(); + + String input = "[" + + "{" + + "\"drive.id\":\"missing\"," + + "\"filename\":\"missing_filename\"" + + "}" + + "]"; + + Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(Arrays.asList( + new HashMap<String, String>() {{ + put("drive.id", "missing"); + put("filename", "missing_filename"); + put("error.code", "404"); + }} + )); + + // WHEN + testRunner.enqueue(input); + testRunner.run(); + + // THEN + testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0); + testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0); + + checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes); + } + + @Test + void testInputRecordsAreInvalid() throws Exception { + // GIVEN + addJsonRecordReaderFactory(); + + String input = "invalid json"; + + List<String> expectedContents = Arrays.asList("invalid json"); + + // WHEN + testRunner.enqueue(input); + testRunner.run(); + + // THEN + testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0); + testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0); + + checkContent(FetchGoogleDrive.REL_INPUT_FAILURE, expectedContents); + } + + @Test + void testThrowExceptionBeforeRecordsAreProcessed() throws Exception { + // GIVEN + addJsonRecordReaderFactory(); + + File file = createFile("test_file.txt", mainFolderId); + + String validInputContent = "[" + + "{" + + "\"drive.id\":\"" + file.getId() + "\"," + + "\"filename\":\"" + file.getName() + "\"" + + "}" + + "]"; + + MockFlowFile input = new MockFlowFile(1) { + @Override + public Map<String, String> getAttributes() { + throw new RuntimeException("Intentional exception"); + } + + @Override + public String getContent() { + return validInputContent; + } + }; + + List<String> expectedContents = Arrays.asList(validInputContent); + + // WHEN + testRunner.enqueue(input); + testRunner.run(); + + // THEN + testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0); + testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0); + + checkContent(FetchGoogleDrive.REL_INPUT_FAILURE, expectedContents); + } + + @Test + void testOneInputRecordOutOfManyThrowsUnexpectedException() throws Exception { + // GIVEN + AtomicReference<String> fileIdToThrowException = new AtomicReference<>(); + + testSubject = new FetchGoogleDrive() { + @Override + void fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws IOException { + if (fileId.equals(fileIdToThrowException.get())) { + throw new RuntimeException(fileId + " intentionally forces exception"); + } + super.fetchFile(fileId, session, outFlowFile); + } + }; + testRunner = createTestRunner(); + + addJsonRecordReaderFactory(); + + File file1 = createFile("test_file_1.txt", "test_content_1", mainFolderId); + File file2 = createFile("test_file_2.txt", "test_content_2", mainFolderId); + + String input = "[" + + "{" + + "\"drive.id\":\"" + file1.getId() + "\"," + + "\"filename\":\"" + file1.getName() + "\"" + + "}," + + "{" + + "\"drive.id\":\"" + file2.getId() + "\"," + + "\"filename\":\"" + file2.getName() + "\"" + + "}" + + "]"; + + fileIdToThrowException.set(file2.getId()); + + Set<Map<String, String>> expectedSuccessAttributes = new HashSet<>(Arrays.asList( + new HashMap<String, String>() {{ + put("drive.id", file1.getId()); + put("filename", file1.getName()); + }} + )); + List<String> expectedSuccessContents = Arrays.asList("test_content_1"); + + Set<Map<String, String>> expectedFailureAttributes = new HashSet<>(Arrays.asList( + new HashMap<String, String>() {{ + put("drive.id", file2.getId()); + put("filename", file2.getName()); + put(FetchGoogleDrive.ERROR_CODE_ATTRIBUTE, "N/A"); + }} + )); + + // WHEN + testRunner.enqueue(input); + testRunner.run(); + + // THEN + testRunner.assertTransferCount(FetchGoogleDrive.REL_INPUT_FAILURE, 0); + + checkAttributes(FetchGoogleDrive.REL_SUCCESS, expectedSuccessAttributes); + checkContent(FetchGoogleDrive.REL_SUCCESS, expectedSuccessContents); + + checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes); + checkContent(FetchGoogleDrive.REL_FAILURE, Arrays.asList("")); + } + + private void addJsonRecordReaderFactory() throws InitializationException { + RecordReaderFactory recordReader = new JsonTreeReader(); + testRunner.addControllerService("record_reader", recordReader); + testRunner.enableControllerService(recordReader); + testRunner.setProperty(FetchGoogleDrive.RECORD_READER, "record_reader"); + } + + public Set<String> getCheckedAttributeNames() { + Set<String> checkedAttributeNames = OutputChecker.super.getCheckedAttributeNames(); + + checkedAttributeNames.add(FetchGoogleDrive.ERROR_CODE_ATTRIBUTE); + + return checkedAttributeNames; + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveIT.java index 6c184bc504..f352539064 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveIT.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveIT.java @@ -16,27 +16,11 @@ */ package org.apache.nifi.processors.gcp.drive; -import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; -import com.google.api.client.http.AbstractInputStreamContent; -import com.google.api.client.http.ByteArrayContent; -import com.google.api.client.http.javanet.NetHttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.client.json.gson.GsonFactory; -import com.google.api.services.drive.Drive; -import com.google.api.services.drive.DriveScopes; import com.google.api.services.drive.model.File; -import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors; -import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; -import org.apache.nifi.processors.gcp.util.GoogleUtils; import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -47,63 +31,20 @@ import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; /** - * Set the following constants before running:<br /> - * <br /> - * CREDENTIAL_JSON_FILE_PATH - A Service Account credentials JSON file.<br /> - * SHARED_FOLDER_ID - The ID of a Folder that is shared with the Service Account. The test will create files and sub-folders within this folder.<br /> - * <br /> - * Created files and folders are cleaned up, but it's advisable to dedicate a folder for this test so that it can be cleaned up easily should the test fail to do so. - * <br /><br /> - * WARNING: The creation of a file is not a synchronized operation so tests may fail because the processor may not list all of them. + * See Javadoc {@link AbstractGoogleDriveIT} for instructions how to run this test. */ -public class ListGoogleDriveIT { - public static final String CREDENTIAL_JSON_FILE_PATH = ""; - public static final String SHARED_FOLDER_ID = ""; - - public static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance(); - - private TestRunner testRunner; - - private Drive driveService; - - private String mainFolderId; - +public class ListGoogleDriveIT extends AbstractGoogleDriveIT<ListGoogleDrive> { @BeforeEach public void init() throws Exception { - ListGoogleDrive testSubject = new ListGoogleDrive(); - - testRunner = TestRunners.newTestRunner(testSubject); - - GCPCredentialsControllerService gcpCredentialsControllerService = new GCPCredentialsControllerService(); - testRunner.addControllerService("gcp_credentials_provider_service", gcpCredentialsControllerService); - testRunner.setProperty(gcpCredentialsControllerService, CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE, CREDENTIAL_JSON_FILE_PATH); - testRunner.enableControllerService(gcpCredentialsControllerService); - testRunner.setProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, "gcp_credentials_provider_service"); - - NetHttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport(); - - driveService = new Drive.Builder( - httpTransport, - JSON_FACTORY, - testSubject.getHttpCredentialsAdapter( - testRunner.getProcessContext(), - DriveScopes.all() - ) - ) - .setApplicationName(this.getClass().getSimpleName()) - .build(); - - File file = createFolder("main", SHARED_FOLDER_ID); - - mainFolderId = file.getId(); + super.init(); testRunner.setProperty(ListGoogleDrive.FOLDER_ID, mainFolderId); } - @AfterEach - void tearDown() throws IOException { - driveService.files() - .delete(mainFolderId) - .execute(); + @Override + public ListGoogleDrive createTestSubject() { + ListGoogleDrive testSubject = new ListGoogleDrive(); + + return testSubject; } @Test @@ -114,18 +55,18 @@ public class ListGoogleDriveIT { File main_sub1_sub1 = createFolder("main_sub1_sub1", main_sub1.getId()); - createFile("main_file1", mainFolderId); - createFile("main_file2", mainFolderId); - createFile("main_file3", mainFolderId); + createFileWithDefaultContent("main_file1", mainFolderId); + createFileWithDefaultContent("main_file2", mainFolderId); + createFileWithDefaultContent("main_file3", mainFolderId); - createFile("main_sub1_file1", main_sub1.getId()); + createFileWithDefaultContent("main_sub1_file1", main_sub1.getId()); - createFile("main_sub2_file1", main_sub2.getId()); - createFile("main_sub2_file2", main_sub2.getId()); + createFileWithDefaultContent("main_sub2_file1", main_sub2.getId()); + createFileWithDefaultContent("main_sub2_file2", main_sub2.getId()); - createFile("main_sub1_sub1_file1", main_sub1_sub1.getId()); - createFile("main_sub1_sub1_file2", main_sub1_sub1.getId()); - createFile("main_sub1_sub1_file3", main_sub1_sub1.getId()); + createFileWithDefaultContent("main_sub1_sub1_file1", main_sub1_sub1.getId()); + createFileWithDefaultContent("main_sub1_sub1_file2", main_sub1_sub1.getId()); + createFileWithDefaultContent("main_sub1_sub1_file3", main_sub1_sub1.getId()); Set<String> expectedFileNames = new HashSet<>(Arrays.asList( "main_file1", "main_file2", "main_file3", @@ -179,7 +120,7 @@ public class ListGoogleDriveIT { // GIVEN testRunner.setProperty(ListGoogleDrive.MIN_AGE, "15 s"); - createFile("main_file", mainFolderId); + createFileWithDefaultContent("main_file", mainFolderId); // Make sure the file 'arrives' and could be listed Thread.sleep(5000); @@ -218,39 +159,4 @@ public class ListGoogleDriveIT { assertEquals(expectedFileNames, actualFileNames); } - private File createFolder(String folderName, String... parentFolderIds) throws IOException { - File fileMetaData = new File(); - fileMetaData.setName(folderName); - - if (parentFolderIds != null) { - fileMetaData.setParents(Arrays.asList(parentFolderIds)); - } - - fileMetaData.setMimeType("application/vnd.google-apps.folder"); - - Drive.Files.Create create = driveService.files() - .create(fileMetaData) - .setFields("id"); - - File file = create.execute(); - - return file; - } - - private File createFile(String name, String... folderIds) throws IOException { - File fileMetadata = new File(); - fileMetadata.setName(name); - fileMetadata.setParents(Arrays.asList(folderIds)); - - AbstractInputStreamContent content = new ByteArrayContent("text/plain", "test_content".getBytes(StandardCharsets.UTF_8)); - - Drive.Files.Create create = driveService.files() - .create(fileMetadata, content) - .setFields("id, modifiedTime"); - - File file = create.execute(); - - return file; - } - } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveSimpleTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveSimpleTest.java new file mode 100644 index 0000000000..e29fc62a57 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriveSimpleTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.drive; + +import com.google.api.client.http.HttpTransport; +import com.google.api.client.util.DateTime; +import com.google.api.services.drive.Drive; +import com.google.api.services.drive.model.File; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.EqualsWrapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; + +import static org.apache.nifi.util.EqualsWrapper.wrapList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Answers.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ListGoogleDriveSimpleTest { + private ListGoogleDrive testSubject; + + private ProcessContext mockProcessContext; + private Drive mockDriverService; + + private String listingModeAsString = "EXECUTION"; + + @BeforeEach + void setUp() throws Exception { + mockProcessContext = mock(ProcessContext.class, RETURNS_DEEP_STUBS); + mockDriverService = mock(Drive.class, RETURNS_DEEP_STUBS); + + testSubject = new ListGoogleDrive() { + @Override + protected List<GoogleDriveFileInfo> performListing(ProcessContext context, Long minTimestamp, ListingMode ignoredListingMode) throws IOException { + ListingMode acquiredListingMode = ListingMode.valueOf(listingModeAsString); + + return super.performListing(context, minTimestamp, acquiredListingMode); + } + + @Override + public Drive createDriveService(ProcessContext context, HttpTransport httpTransport, String... scopes) { + return mockDriverService; + } + }; + + testSubject.onScheduled(mockProcessContext); + } + + @Test + void testCreatedListableEntityContainsCorrectData() throws Exception { + // GIVEN + Long minTimestamp = 0L; + listingModeAsString = "EXECUTION"; + + String id = "id_1"; + String filename = "file_name_1"; + Long size = 125L; + long createdTime = 123456L; + long modifiedTime = 234567L; + String mimeType = "mime_type_1"; + + when(mockDriverService.files() + .list() + .setQ("('null' in parents) and (mimeType != 'application/vnd.google-apps.folder') and (mimeType != 'application/vnd.google-apps.shortcut') and trashed = false") + .setPageToken(null) + .setFields("nextPageToken, files(id, name, size, createdTime, modifiedTime, mimeType)") + .execute() + .getFiles() + ).thenReturn(Arrays.asList( + createFile( + id, + filename, + size, + new DateTime(createdTime), + new DateTime(modifiedTime), + mimeType + ) + )); + + List<GoogleDriveFileInfo> expected = Arrays.asList( + new GoogleDriveFileInfo.Builder() + .id(id) + .fileName(filename) + .size(size) + .createdTime(createdTime) + .modifiedTime(modifiedTime) + .mimeType(mimeType) + .build() + ); + + // WHEN + List<GoogleDriveFileInfo> actual = testSubject.performListing(mockProcessContext, minTimestamp, null); + + // THEN + List<Function<GoogleDriveFileInfo, Object>> propertyProviders = Arrays.asList( + GoogleDriveFileInfo::getId, + GoogleDriveFileInfo::getIdentifier, + GoogleDriveFileInfo::getName, + GoogleDriveFileInfo::getSize, + GoogleDriveFileInfo::getTimestamp, + GoogleDriveFileInfo::getCreatedTime, + GoogleDriveFileInfo::getModifiedTime, + GoogleDriveFileInfo::getMimeType + ); + + List<EqualsWrapper<GoogleDriveFileInfo>> expectedWrapper = wrapList(expected, propertyProviders); + List<EqualsWrapper<GoogleDriveFileInfo>> actualWrapper = wrapList(actual, propertyProviders); + + assertEquals(expectedWrapper, actualWrapper); + } + + private File createFile( + String id, + String name, + Long size, + DateTime createdTime, + DateTime modifiedTime, + String mimeType + ) { + File file = new File(); + + file + .setId(id) + .setName(name) + .setMimeType(mimeType) + .setCreatedTime(createdTime) + .setModifiedTime(modifiedTime) + .setSize(size); + + return file; + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriverTestRunnerTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriverTestRunnerTest.java new file mode 100644 index 0000000000..c6d2ab4261 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/ListGoogleDriverTestRunnerTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.drive; + +import com.google.api.client.http.HttpTransport; +import com.google.api.client.util.DateTime; +import com.google.api.services.drive.Drive; +import com.google.api.services.drive.model.File; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; +import org.apache.nifi.processors.gcp.util.GoogleUtils; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ListGoogleDriverTestRunnerTest implements OutputChecker { + private ListGoogleDrive testSubject; + private TestRunner testRunner; + + private Drive mockDriverService; + + private String folderId = "folderId"; + + @BeforeEach + void setUp() throws Exception { + mockDriverService = mock(Drive.class, Mockito.RETURNS_DEEP_STUBS); + + testSubject = new ListGoogleDrive() { + @Override + protected List<GoogleDriveFileInfo> performListing(ProcessContext context, Long minTimestamp, ListingMode ignoredListingMode) throws IOException { + return super.performListing(context, minTimestamp, ListingMode.EXECUTION); + } + + @Override + public Drive createDriveService(ProcessContext context, HttpTransport httpTransport, String... scopes) { + return mockDriverService; + } + }; + + testRunner = TestRunners.newTestRunner(testSubject); + + String gcpCredentialsControllerServiceId = "gcp_credentials_provider_service"; + + GCPCredentialsControllerService gcpCredentialsControllerService = mock(GCPCredentialsControllerService.class, RETURNS_DEEP_STUBS); + when(gcpCredentialsControllerService.getIdentifier()).thenReturn(gcpCredentialsControllerServiceId); + + testRunner.addControllerService(gcpCredentialsControllerServiceId, gcpCredentialsControllerService); + testRunner.enableControllerService(gcpCredentialsControllerService); + testRunner.setProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, gcpCredentialsControllerServiceId); + + testRunner.setProperty(ListGoogleDrive.FOLDER_ID, folderId); + } + + @Test + void testOutputAsAttributesWhereTimestampIsCreatedTime() throws Exception { + // GIVEN + String id = "id_1"; + String filename = "file_name_1"; + Long size = 125L; + Long createdTime = 123456L; + Long modifiedTime = null; + String mimeType = "mime_type_1"; + + // WHEN + // THEN + testOutputAsAttributes(id, filename, size, createdTime, modifiedTime, mimeType, createdTime); + } + + @Test + void testOutputAsAttributesWhereTimestampIsModifiedTime() throws Exception { + // GIVEN + String id = "id_1"; + String filename = "file_name_1"; + Long size = 125L; + Long createdTime = 123456L; + Long modifiedTime = 123456L + 1L; + String mimeType = "mime_type_1"; + + // WHEN + // THEN + testOutputAsAttributes(id, filename, size, createdTime, modifiedTime, mimeType, modifiedTime); + } + + @Test + void testOutputAsContent() throws Exception { + // GIVEN + String id = "id_1"; + String filename = "file_name_1"; + Long size = 125L; + Long createdTime = 123456L; + Long modifiedTime = 123456L + 1L; + String mimeType = "mime_type_1"; + + addJsonRecordWriterFactory(); + + mockFetchedGoogleDriveFileList(id, filename, size, createdTime, modifiedTime, mimeType); + + List<String> expectedContents = Arrays.asList( + "[" + + "{" + + "\"drive.id\":\"" + id + "\"," + + "\"filename\":\"" + filename + "\"," + + "\"drive.size\":" + size + "," + + "\"drive.timestamp\":" + modifiedTime + "," + + "\"mime.type\":\"" + mimeType + "\"" + + "}" + + "]"); + + // WHEN + testRunner.run(); + + // THEN + checkContent(ListGoogleDrive.REL_SUCCESS, expectedContents); + } + + private void addJsonRecordWriterFactory() throws InitializationException { + RecordSetWriterFactory recordSetWriter = new JsonRecordSetWriter(); + testRunner.addControllerService("record_writer", recordSetWriter); + testRunner.enableControllerService(recordSetWriter); + testRunner.setProperty(ListGoogleDrive.RECORD_WRITER, "record_writer"); + } + + private void mockFetchedGoogleDriveFileList(String id, String filename, Long size, Long createdTime, Long modifiedTime, String mimeType) throws IOException { + when(mockDriverService.files() + .list() + .setQ("('" + folderId + "' in parents) and (mimeType != 'application/vnd.google-apps.folder') and (mimeType != 'application/vnd.google-apps.shortcut') and trashed = false") + .setPageToken(null) + .setFields("nextPageToken, files(id, name, size, createdTime, modifiedTime, mimeType)") + .execute() + .getFiles() + ).thenReturn(Arrays.asList( + createFile( + id, + filename, + size, + Optional.ofNullable(createdTime).map(DateTime::new).orElse(null), + Optional.ofNullable(modifiedTime).map(DateTime::new).orElse(null), + mimeType + ) + )); + } + + private void testOutputAsAttributes(String id, String filename, Long size, Long createdTime, Long modifiedTime, String mimeType, Long expectedTimestamp) throws IOException { + // GIVEN + mockFetchedGoogleDriveFileList(id, filename, size, createdTime, modifiedTime, mimeType); + + Map<String, String> inputFlowFileAttributes = new HashMap<>(); + inputFlowFileAttributes.put("drive.id", id); + inputFlowFileAttributes.put("filename", filename); + inputFlowFileAttributes.put("drive.size", "" + size); + inputFlowFileAttributes.put("drive.timestamp", "" + expectedTimestamp); + inputFlowFileAttributes.put("mime.type", mimeType); + + HashSet<Map<String, String>> expectedAttributes = new HashSet<>(Arrays.asList(inputFlowFileAttributes)); + + // WHEN + testRunner.run(); + + // THEN + checkAttributes(ListGoogleDrive.REL_SUCCESS, expectedAttributes); + } + + private File createFile( + String id, + String name, + Long size, + DateTime createdTime, + DateTime modifiedTime, + String mimeType + ) { + File file = new File(); + + file + .setId(id) + .setName(name) + .setMimeType(mimeType) + .setCreatedTime(createdTime) + .setModifiedTime(modifiedTime) + .setSize(size); + + return file; + } + + @Override + public TestRunner getTestRunner() { + return testRunner; + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/OutputChecker.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/OutputChecker.java new file mode 100644 index 0000000000..1c49f6ecfe --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/OutputChecker.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.drive; + +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public interface OutputChecker { + TestRunner getTestRunner(); + + default void checkAttributes(Relationship relationship, Set<Map<String, String>> expectedAttributes) { + getTestRunner().assertTransferCount(relationship, expectedAttributes.size()); + List<MockFlowFile> flowFiles = getTestRunner().getFlowFilesForRelationship(relationship); + + Set<String> checkedAttributeNames = getCheckedAttributeNames(); + + Set<Map<String, String>> actualAttributes = flowFiles.stream() + .map(flowFile -> flowFile.getAttributes().entrySet().stream() + .filter(attributeNameAndValue -> checkedAttributeNames.contains(attributeNameAndValue.getKey())) + .filter(entry -> entry.getKey() != null && entry.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) + + ) + .collect(Collectors.toSet()); + + assertEquals(expectedAttributes, actualAttributes); + } + + default Set<String> getCheckedAttributeNames() { + Set<String> checkedAttributeNames = Arrays.stream(GoogleDriveFlowFileAttribute.values()) + .map(GoogleDriveFlowFileAttribute::getName) + .collect(Collectors.toSet()); + + return checkedAttributeNames; + } + + default void checkContent(Relationship relationship, List<String> expectedContent) { + getTestRunner().assertTransferCount(relationship, expectedContent.size()); + List<MockFlowFile> flowFiles = getTestRunner().getFlowFilesForRelationship(relationship); + + List<String> actualContent = flowFiles.stream() + .map(flowFile -> flowFile.getContent()) + .collect(Collectors.toList()); + + assertEquals(expectedContent, actualContent); + } +}