This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 025109a9d8 NIFI-12123: This closes #7786. Added additional @UseCase and @MultiProcessorUseCase annotations as well as some trivial cleanup that was flagged by IntelliJ such as making inner class static, updating some deprecated references, etc. 025109a9d8 is described below commit 025109a9d89b78136e888c69b8fbb817a75ae33e Author: Mark Payne <marka...@hotmail.com> AuthorDate: Mon Sep 25 09:04:44 2023 -0400 NIFI-12123: This closes #7786. Added additional @UseCase and @MultiProcessorUseCase annotations as well as some trivial cleanup that was flagged by IntelliJ such as making inner class static, updating some deprecated references, etc. Signed-off-by: Joseph Witt <joew...@apache.org> --- .../azure/storage/FetchAzureBlobStorage_v12.java | 44 +++++++++--- .../azure/storage/FetchAzureDataLakeStorage.java | 42 ++++++++++-- .../processors/gcp/drive/FetchGoogleDrive.java | 80 +++++++++++++--------- .../processors/gcp/storage/FetchGCSObject.java | 53 ++++++++++---- .../apache/nifi/processors/standard/FetchFTP.java | 35 +++++++++- .../apache/nifi/processors/standard/FetchSFTP.java | 33 ++++++++- .../processors/standard/RemoveRecordField.java | 22 +++++- 7 files changed, 243 insertions(+), 66 deletions(-) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java index 4d9db2d5b1..18ca4a6d0f 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java @@ -20,11 +20,20 @@ import com.azure.storage.blob.BlobClient; import com.azure.storage.blob.BlobContainerClient; import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.models.BlobRange; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.MultiProcessorUseCase; +import org.apache.nifi.annotation.documentation.ProcessorConfiguration; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; @@ -40,15 +49,6 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12; import org.apache.nifi.processors.azure.ClientSideEncryptionSupport; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER; @@ -81,6 +81,32 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR @WritesAttribute(attribute = ATTR_NAME_LANG, description = ATTR_DESCRIPTION_LANG), @WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = ATTR_DESCRIPTION_TIMESTAMP), @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)}) +@MultiProcessorUseCase( + description = "Retrieve all files in an Azure Blob Storage container", + keywords = {"azure", "blob", "storage", "state", "retrieve", "fetch", "all", "stream"}, + configurations = { + @ProcessorConfiguration( + processorClass = ListAzureBlobStorage_v12.class, + configuration = """ + The "Container Name" property should be set to the name of the Blob Storage Container that files reside in. \ + If the flow being built is to be reused elsewhere, it's a good idea to parameterize this property by setting it to something like `#{AZURE_CONTAINER}`. + + The "Storage Credentials" property should specify an instance of the AzureStorageCredentialsService_v12 in order to provide credentials for accessing the storage container. + + The 'success' Relationship of this Processor is then connected to FetchAzureBlobStorage_v12. + """ + ), + @ProcessorConfiguration( + processorClass = FetchAzureBlobStorage_v12.class, + configuration = """ + "Container Name" = "${azure.container}" + "Blob Name" = "${azure.blobname}" + + The "Storage Credentials" property should specify an instance of the AzureStorageCredentialsService_v12 in order to provide credentials for accessing the storage container. + """ + ) + } +) public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 implements ClientSideEncryptionSupport { public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java index 5e8ebeb618..d977120d33 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java @@ -24,11 +24,17 @@ import com.azure.storage.file.datalake.DataLakeServiceClient; import com.azure.storage.file.datalake.models.DataLakeStorageException; import com.azure.storage.file.datalake.models.DownloadRetryOptions; import com.azure.storage.file.datalake.models.FileRange; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.MultiProcessorUseCase; +import org.apache.nifi.annotation.documentation.ProcessorConfiguration; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; @@ -42,20 +48,44 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; - @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) @SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class, ListAzureDataLakeStorage.class}) -@CapabilityDescription("Fetch the provided file from Azure Data Lake Storage") +@CapabilityDescription("Fetch the specified file from Azure Data Lake Storage") @InputRequirement(Requirement.INPUT_REQUIRED) @WritesAttributes({ @WritesAttribute(attribute = "azure.datalake.storage.statusCode", description = "The HTTP error code (if available) from the failed operation"), @WritesAttribute(attribute = "azure.datalake.storage.errorCode", description = "The Azure Data Lake Storage moniker of the failed operation"), @WritesAttribute(attribute = "azure.datalake.storage.errorMessage", description = "The Azure Data Lake Storage error message from the failed operation") }) +@MultiProcessorUseCase( + description = "Retrieve all files in an Azure DataLake Storage directory", + keywords = {"azure", "datalake", "adls", "state", "retrieve", "fetch", "all", "stream"}, + configurations = { + @ProcessorConfiguration( + processorClass = ListAzureDataLakeStorage.class, + configuration = """ + The "Filesystem Name" property should be set to the name of the Azure Filesystem (also known as a Container) that files reside in. \ + If the flow being built is to be reused elsewhere, it's a good idea to parameterize this property by setting it to something like `#{AZURE_FILESYSTEM}`. + Configure the "Directory Name" property to specify the name of the directory in the file system. \ + If the flow being built is to be reused elsewhere, it's a good idea to parameterize this property by setting it to something like `#{AZURE_DIRECTORY}`. + + The "ADLS Credentials" property should specify an instance of the ADLSCredentialsService in order to provide credentials for accessing the filesystem. + + The 'success' Relationship of this Processor is then connected to FetchAzureDataLakeStorage. + """ + ), + @ProcessorConfiguration( + processorClass = FetchAzureDataLakeStorage.class, + configuration = """ + "Filesystem Name" = "${azure.filesystem}" + "Directory Name" = "${azure.directory}" + "File Name" = "${azure.filename}" + + The "ADLS Credentials" property should specify an instance of the ADLSCredentialsService in order to provide credentials for accessing the filesystem. + """ + ) + } +) public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor { public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java index bbdb555042..5cae9e97d9 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java @@ -20,11 +20,20 @@ import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.services.drive.Drive; import com.google.api.services.drive.DriveScopes; import com.google.api.services.drive.model.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.MultiProcessorUseCase; +import org.apache.nifi.annotation.documentation.ProcessorConfiguration; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -43,17 +52,6 @@ import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory; import org.apache.nifi.processors.gcp.util.GoogleUtils; import org.apache.nifi.proxy.ProxyConfiguration; -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE; import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC; import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE; @@ -82,6 +80,33 @@ import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTA @WritesAttribute(attribute = ERROR_CODE, description = ERROR_CODE_DESC), @WritesAttribute(attribute = ERROR_MESSAGE, description = ERROR_MESSAGE_DESC) }) +@MultiProcessorUseCase( + description = "Retrieve all files in a Google Drive folder", + keywords = {"google", "drive", "google cloud", "state", "retrieve", "fetch", "all", "stream"}, + configurations = { + @ProcessorConfiguration( + processorClass = ListGoogleDrive.class, + configuration = """ + The "Folder ID" property should be set to the ID of the Google Drive folder that files reside in. \ + See processor documentation / additional details for more information on how to determine a Google Drive folder's ID. + If the flow being built is to be reused elsewhere, it's a good idea to parameterize \ + this property by setting it to something like `#{GOOGLE_DRIVE_FOLDER_ID}`. + + The "GCP Credentials Provider Service" property should specify an instance of the GCPCredentialsService in order to provide credentials for accessing the folder. + + The 'success' Relationship of this Processor is then connected to FetchGoogleDrive. + """ + ), + @ProcessorConfiguration( + processorClass = FetchGoogleDrive.class, + configuration = """ + "File ID" = "${drive.id}" + + The "GCP Credentials Provider Service" property should specify an instance of the GCPCredentialsService in order to provide credentials for accessing the bucket. + """ + ) + } +) public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTrait { // Google Docs Export Types @@ -195,8 +220,6 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr - - public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile will be routed here for each successfully fetched File.") @@ -207,7 +230,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr .description("A FlowFile will be routed here for each File for which fetch was attempted but failed.") .build(); - private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + private static final List<PropertyDescriptor> PROPERTIES = List.of( GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, FILE_ID, ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS), @@ -215,12 +238,9 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr GOOGLE_SPREADSHEET_EXPORT_TYPE, GOOGLE_PRESENTATION_EXPORT_TYPE, GOOGLE_DRAWING_EXPORT_TYPE - )); + ); - public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( - REL_SUCCESS, - REL_FAILURE - ))); + public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE); private volatile Drive driveService; @@ -279,20 +299,14 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr return null; } - switch (mimeType) { - case "application/vnd.google-apps.document": - return context.getProperty(GOOGLE_DOC_EXPORT_TYPE).getValue(); - case "application/vnd.google-apps.spreadsheet": - return context.getProperty(GOOGLE_SPREADSHEET_EXPORT_TYPE).getValue(); - case "application/vnd.google-apps.presentation": - return context.getProperty(GOOGLE_PRESENTATION_EXPORT_TYPE).getValue(); - case "application/vnd.google-apps.drawing": - return context.getProperty(GOOGLE_DRAWING_EXPORT_TYPE).getValue(); - case "application/vnd.google-apps.script": - return "application/vnd.google-apps.script+json"; - default: - return null; - } + return switch (mimeType) { + case "application/vnd.google-apps.document" -> context.getProperty(GOOGLE_DOC_EXPORT_TYPE).getValue(); + case "application/vnd.google-apps.spreadsheet" -> context.getProperty(GOOGLE_SPREADSHEET_EXPORT_TYPE).getValue(); + case "application/vnd.google-apps.presentation" -> context.getProperty(GOOGLE_PRESENTATION_EXPORT_TYPE).getValue(); + case "application/vnd.google-apps.drawing" -> context.getProperty(GOOGLE_DRAWING_EXPORT_TYPE).getValue(); + case "application/vnd.google-apps.script" -> "application/vnd.google-apps.script+json"; + default -> null; + }; } private FlowFile fetchFile(final String fileId, final ProcessSession session, final ProcessContext context, final FlowFile flowFile, final Map<String, String> attributeMap) throws IOException { diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java index 50ca5bb6d7..ee9a6d6b5a 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java @@ -22,6 +22,14 @@ import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; import org.apache.commons.io.input.BoundedInputStream; import org.apache.commons.io.output.CountingOutputStream; @@ -30,6 +38,8 @@ import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.MultiProcessorUseCase; +import org.apache.nifi.annotation.documentation.ProcessorConfiguration; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.ConfigVerificationResult; @@ -45,15 +55,6 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import java.io.IOException; -import java.io.InputStream; -import java.nio.channels.Channels; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR; import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_DESC; import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CACHE_CONTROL_ATTR; @@ -129,6 +130,33 @@ import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC; @WritesAttribute(attribute = OWNER_TYPE_ATTR, description = OWNER_TYPE_DESC), @WritesAttribute(attribute = URI_ATTR, description = URI_DESC) }) +@MultiProcessorUseCase( + description = "Retrieve all files in a Google Compute Storage (GCS) bucket", + keywords = {"gcp", "gcs", "google cloud", "google compute storage", "state", "retrieve", "fetch", "all", "stream"}, + configurations = { + @ProcessorConfiguration( + processorClass = ListGCSBucket.class, + configuration = """ + The "Bucket" property should be set to the name of the GCS bucket that files reside in. If the flow being built is to be reused elsewhere, it's a good idea to parameterize \ + this property by setting it to something like `#{GCS_SOURCE_BUCKET}`. + Configure the "Project ID" property to reflect the ID of your Google Compute Cloud Project. + + The "GCP Credentials Provider Service" property should specify an instance of the GCPCredentialsService in order to provide credentials for accessing the bucket. + + The 'success' Relationship of this Processor is then connected to FetchGCSObject. + """ + ), + @ProcessorConfiguration( + processorClass = FetchGCSObject.class, + configuration = """ + "Bucket" = "${gcs.bucket}" + "Name" = "${filename}" + + The "GCP Credentials Provider Service" property should specify an instance of the GCPCredentialsService in order to provide credentials for accessing the bucket. + """ + ) + } +) public class FetchGCSObject extends AbstractGCSProcessor { public static final PropertyDescriptor BUCKET = new PropertyDescriptor .Builder().name("gcs-bucket") @@ -219,7 +247,7 @@ public class FetchGCSObject extends AbstractGCSProcessor { try { final FetchedBlob blob = fetchBlob(context, storage, attributes); - final CountingOutputStream out = new CountingOutputStream(NullOutputStream.NULL_OUTPUT_STREAM); + final CountingOutputStream out = new CountingOutputStream(NullOutputStream.INSTANCE); IOUtils.copy(blob.contents, out); final long byteCount = out.getByteCount(); results.add(new ConfigVerificationResult.Builder() @@ -253,9 +281,6 @@ public class FetchGCSObject extends AbstractGCSProcessor { final Storage storage = getCloudService(); - final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L); - final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null); - try { final FetchedBlob blob = fetchBlob(context, storage, flowFile.getAttributes()); flowFile = session.importFrom(blob.contents, flowFile); @@ -328,7 +353,7 @@ public class FetchGCSObject extends AbstractGCSProcessor { return blobSourceOptions; } - private class FetchedBlob { + private static class FetchedBlob { private final InputStream contents; private final Blob blob; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java index cf6df69d1e..d920cb1085 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java @@ -25,16 +25,19 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.MultiProcessorUseCase; +import org.apache.nifi.annotation.documentation.ProcessorConfiguration; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.FTPTransfer; +import org.apache.nifi.processors.standard.util.FileTransfer; // Note that we do not use @SupportsBatching annotation. This processor cannot support batching because it must ensure that session commits happen before remote files are deleted. + @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"ftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"}) @CapabilityDescription("Fetches the content of a file from a remote FTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.") @@ -46,6 +49,36 @@ import org.apache.nifi.processors.standard.util.FTPTransfer; @WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"), @WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute") }) +@MultiProcessorUseCase( + description = "Retrieve all files in a directory of an FTP Server", + keywords = {"ftp", "file", "transform", "state", "retrieve", "fetch", "all", "stream"}, + configurations = { + @ProcessorConfiguration( + processorClass = ListFTP.class, + configuration = """ + The "Hostname" property should be set to the fully qualified hostname of the FTP Server. It's a good idea to parameterize \ + this property by setting it to something like `#{FTP_SERVER}`. + The "Remote Path" property must be set to the directory on the FTP Server where the files reside. If the flow being built is to be reused elsewhere, \ + it's a good idea to parameterize this property by setting it to something like `#{FTP_REMOTE_PATH}`. + Configure the "Username" property to the appropriate username for logging into the FTP Server. It's usually a good idea to parameterize this property \ + by setting it to something like `#{FTP_USERNAME}`. + Configure the "Password" property to the appropriate password for the provided username. It's usually a good idea to parameterize this property \ + by setting it to something like `#{FTP_PASSWORD}`. + + The 'success' Relationship of this Processor is then connected to FetchFTP. + """ + ), + @ProcessorConfiguration( + processorClass = FetchFTP.class, + configuration = """ + "Hostname" = "${ftp.remote.host}" + "Remote File" = "${path}/${filename}" + "Username" = "${ftp.listing.user}" + "Password" = "#{FTP_PASSWORD}" + """ + ) + } +) public class FetchFTP extends FetchFileTransfer { @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java index 69281e5695..9fde9114e4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java @@ -20,12 +20,13 @@ package org.apache.nifi.processors.standard; import java.util.ArrayList; import java.util.Collection; import java.util.List; - import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.MultiProcessorUseCase; +import org.apache.nifi.annotation.documentation.ProcessorConfiguration; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; @@ -48,6 +49,36 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer; @WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"), @WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute") }) +@MultiProcessorUseCase( + description = "Retrieve all files in a directory of an SFTP Server", + keywords = {"sftp", "secure", "file", "transform", "state", "retrieve", "fetch", "all", "stream"}, + configurations = { + @ProcessorConfiguration( + processorClass = ListSFTP.class, + configuration = """ + The "Hostname" property should be set to the fully qualified hostname of the FTP Server. It's a good idea to parameterize \ + this property by setting it to something like `#{SFTP_SERVER}`. + The "Remote Path" property must be set to the directory on the FTP Server where the files reside. If the flow being built is to be reused elsewhere, \ + it's a good idea to parameterize this property by setting it to something like `#{SFTP_REMOTE_PATH}`. + Configure the "Username" property to the appropriate username for logging into the FTP Server. It's usually a good idea to parameterize this property \ + by setting it to something like `#{SFTP_USERNAME}`. + Configure the "Password" property to the appropriate password for the provided username. It's usually a good idea to parameterize this property \ + by setting it to something like `#{SFTP_PASSWORD}`. + + The 'success' Relationship of this Processor is then connected to FetchSFTP. + """ + ), + @ProcessorConfiguration( + processorClass = FetchSFTP.class, + configuration = """ + "Hostname" = "${sftp.remote.host}" + "Remote File" = "${path}/${filename}" + "Username" = "${sftp.listing.user}" + "Password" = "#{SFTP_PASSWORD}" + """ + ) + } +) public class FetchSFTP extends FetchFileTransfer { @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RemoveRecordField.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RemoveRecordField.java index c1d38c4326..7b3b8df86f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RemoveRecordField.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RemoveRecordField.java @@ -31,6 +31,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; @@ -58,10 +59,27 @@ import org.apache.nifi.serialization.record.Record; @WritesAttributes({ @WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer.") }) -@DynamicProperty(name = "(Ignored)", value = "A RecordPath to the field to be removed.", - description = "Allows users to specify fields to remove that match the RecordPath.", +@DynamicProperty(name = "A description of the field to remove", + value = "A RecordPath to the field to be removed.", + description = "Any field that matches the RecordPath set as the value will be removed.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) @SeeAlso({UpdateRecord.class}) +@UseCase( + description = "Remove one or more fields from a Record", + keywords = {"record", "field", "drop", "remove", "delete", "expunge", "recordpath"}, + configuration = """ + Configure the Record Reader according to the incoming data format. + Configure the Record Writer according to the desired output format. + + For each field that you want to remove, add a single new property to the Processor. + The name of the property can be anything but it's recommended to use a brief description of the field. + The value of the property is a RecordPath that matches the field to remove. + + For example, to remove the `name` and `email` fields, add two Properties: + `name` = `/name` + `email` = `/email` + """ +) public class RemoveRecordField extends AbstractRecordProcessor { private volatile RecordPathCache recordPathCache;