http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java index f0061b8..fe277df 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.util.List; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.util.StandardValidators; @@ -34,6 +35,8 @@ public interface FileTransfer extends Closeable { InputStream getInputStream(String remoteFileName) throws IOException; + InputStream getInputStream(String remoteFileName, FlowFile flowFile) throws IOException; + void flush() throws IOException; FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException; @@ -51,127 +54,127 @@ public interface FileTransfer extends Closeable { void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException; public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() - .name("Hostname") - .description("The fully qualified hostname or IP address of the remote system") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .required(true) - .expressionLanguageSupported(true) - .build(); + .name("Hostname") + .description("The fully qualified hostname or IP address of the remote system") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() - .name("Username") - .description("Username") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .required(true) - .build(); + .name("Username") + .description("Username") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() - .name("Password") - .description("Password for the user account") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .required(false) - .sensitive(true) - .build(); + .name("Password") + .description("Password for the user account") + .addValidator(Validator.VALID) + .required(false) + .sensitive(true) + .build(); public static final PropertyDescriptor DATA_TIMEOUT = new PropertyDescriptor.Builder() - .name("Data Timeout") - .description("Amount of time to wait before timing out while transferring data") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .defaultValue("30 sec") - .build(); + .name("Data Timeout") + .description("When transferring a file between the local and remote system, this value specifies how long is allowed to elapse without any data being transferred between systems") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("30 sec") + .build(); public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder() - .name("Connection Timeout") - .description("Amount of time to wait before timing out while creating a connection") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .defaultValue("30 sec") - .build(); + .name("Connection Timeout") + .description("Amount of time to wait before timing out while creating a connection") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("30 sec") + .build(); public static final PropertyDescriptor REMOTE_PATH = new PropertyDescriptor.Builder() - .name("Remote Path") - .description("The path on the remote system from which to pull or push files") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("Remote Path") + .description("The path on the remote system from which to pull or push files") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor CREATE_DIRECTORY = new PropertyDescriptor.Builder() - .name("Create Directory") - .description("Specifies whether or not the remote directory should be created if it does not exist.") - .required(true) - .allowableValues("true", "false") - .defaultValue("false") - .build(); + .name("Create Directory") + .description("Specifies whether or not the remote directory should be created if it does not exist.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); public static final PropertyDescriptor USE_COMPRESSION = new PropertyDescriptor.Builder() - .name("Use Compression") - .description("Indicates whether or not ZLIB compression should be used when transferring files") - .allowableValues("true", "false") - .defaultValue("false") - .required(true) - .build(); + .name("Use Compression") + .description("Indicates whether or not ZLIB compression should be used when transferring files") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); // GET-specific properties public static final PropertyDescriptor RECURSIVE_SEARCH = new PropertyDescriptor.Builder() - .name("Search Recursively") - .description("If true, will pull files from arbitrarily nested subdirectories; otherwise, will not traverse subdirectories") - .required(true) - .defaultValue("false") - .allowableValues("true", "false") - .build(); + .name("Search Recursively") + .description("If true, will pull files from arbitrarily nested subdirectories; otherwise, will not traverse subdirectories") + .required(true) + .defaultValue("false") + .allowableValues("true", "false") + .build(); public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder() - .name("File Filter Regex") - .description("Provides a Java Regular Expression for filtering Filenames; if a filter is supplied, only files whose names match that Regular Expression will be fetched") - .required(false) - .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .build(); + .name("File Filter Regex") + .description("Provides a Java Regular Expression for filtering Filenames; if a filter is supplied, only files whose names match that Regular Expression will be fetched") + .required(false) + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); public static final PropertyDescriptor PATH_FILTER_REGEX = new PropertyDescriptor.Builder() - .name("Path Filter Regex") - .description("When " + RECURSIVE_SEARCH.getName() + " is true, then only subdirectories whose path matches the given Regular Expression will be scanned") - .required(false) - .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .build(); + .name("Path Filter Regex") + .description("When " + RECURSIVE_SEARCH.getName() + " is true, then only subdirectories whose path matches the given Regular Expression will be scanned") + .required(false) + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); public static final PropertyDescriptor MAX_SELECTS = new PropertyDescriptor.Builder() - .name("Max Selects") - .description("The maximum number of files to pull in a single connection") - .defaultValue("100") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .build(); + .name("Max Selects") + .description("The maximum number of files to pull in a single connection") + .defaultValue("100") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); public static final PropertyDescriptor REMOTE_POLL_BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Remote Poll Batch Size") - .description("The value specifies how many file paths to find in a given directory on the remote system when doing a file listing. This value " - + "in general should not need to be modified but when polling against a remote system with a tremendous number of files this value can " - + "be critical. Setting this value too high can result very poor performance and setting it too low can cause the flow to be slower " - + "than normal.") - .defaultValue("5000") - .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) - .required(true) - .build(); + .name("Remote Poll Batch Size") + .description("The value specifies how many file paths to find in a given directory on the remote system when doing a file listing. This value " + + "in general should not need to be modified but when polling against a remote system with a tremendous number of files this value can " + + "be critical. Setting this value too high can result very poor performance and setting it too low can cause the flow to be slower " + + "than normal.") + .defaultValue("5000") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .required(true) + .build(); public static final PropertyDescriptor DELETE_ORIGINAL = new PropertyDescriptor.Builder() - .name("Delete Original") - .description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred") - .defaultValue("true") - .allowableValues("true", "false") - .required(true) - .build(); + .name("Delete Original") + .description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred") + .defaultValue("true") + .allowableValues("true", "false") + .required(true) + .build(); public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder() - .name("Polling Interval") - .description("Determines how long to wait between fetching the listing for new files") - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .required(true) - .defaultValue("60 sec") - .build(); + .name("Polling Interval") + .description("Determines how long to wait between fetching the listing for new files") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .required(true) + .defaultValue("60 sec") + .build(); public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder() - .name("Ignore Dotted Files") - .description("If true, files whose names begin with a dot (\".\") will be ignored") - .allowableValues("true", "false") - .defaultValue("true") - .required(true) - .build(); + .name("Ignore Dotted Files") + .description("If true, files whose names begin with a dot (\".\") will be ignored") + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); public static final PropertyDescriptor USE_NATURAL_ORDERING = new PropertyDescriptor.Builder() - .name("Use Natural Ordering") - .description("If true, will pull files in the order in which they are naturally listed; otherwise, the order in which the files will be pulled is not defined") - .allowableValues("true", "false") - .defaultValue("false") - .required(true) - .build(); + .name("Use Natural Ordering") + .description("If true, will pull files in the order in which they are naturally listed; otherwise, the order in which the files will be pulled is not defined") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); // PUT-specific properties public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ"; @@ -183,77 +186,77 @@ public interface FileTransfer extends Closeable { public static final String CONFLICT_RESOLUTION_FAIL = "FAIL"; public static final String CONFLICT_RESOLUTION_NONE = "NONE"; public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder() - .name("Conflict Resolution") - .description("Determines how to handle the problem of filename collisions") - .required(true) - .allowableValues(CONFLICT_RESOLUTION_REPLACE, CONFLICT_RESOLUTION_IGNORE, CONFLICT_RESOLUTION_RENAME, CONFLICT_RESOLUTION_REJECT, CONFLICT_RESOLUTION_FAIL, CONFLICT_RESOLUTION_NONE) - .defaultValue(CONFLICT_RESOLUTION_NONE) - .build(); + .name("Conflict Resolution") + .description("Determines how to handle the problem of filename collisions") + .required(true) + .allowableValues(CONFLICT_RESOLUTION_REPLACE, CONFLICT_RESOLUTION_IGNORE, CONFLICT_RESOLUTION_RENAME, CONFLICT_RESOLUTION_REJECT, CONFLICT_RESOLUTION_FAIL, CONFLICT_RESOLUTION_NONE) + .defaultValue(CONFLICT_RESOLUTION_NONE) + .build(); public static final PropertyDescriptor REJECT_ZERO_BYTE = new PropertyDescriptor.Builder() - .name("Reject Zero-Byte Files") - .description("Determines whether or not Zero-byte files should be rejected without attempting to transfer") - .allowableValues("true", "false") - .defaultValue("true") - .build(); + .name("Reject Zero-Byte Files") + .description("Determines whether or not Zero-byte files should be rejected without attempting to transfer") + .allowableValues("true", "false") + .defaultValue("true") + .build(); public static final PropertyDescriptor DOT_RENAME = new PropertyDescriptor.Builder() - .name("Dot Rename") - .description("If true, then the filename of the sent file is prepended with a \".\" and then renamed back to the " - + "original once the file is completely sent. Otherwise, there is no rename. This property is ignored if the " - + "Temporary Filename property is set.") - .allowableValues("true", "false") - .defaultValue("true") - .build(); + .name("Dot Rename") + .description("If true, then the filename of the sent file is prepended with a \".\" and then renamed back to the " + + "original once the file is completely sent. Otherwise, there is no rename. This property is ignored if the " + + "Temporary Filename property is set.") + .allowableValues("true", "false") + .defaultValue("true") + .build(); public static final PropertyDescriptor TEMP_FILENAME = new PropertyDescriptor.Builder() - .name("Temporary Filename") - .description("If set, the filename of the sent file will be equal to the value specified during the transfer and after successful " - + "completion will be renamed to the original filename. If this value is set, the Dot Rename property is ignored.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .required(false) - .build(); + .name("Temporary Filename") + .description("If set, the filename of the sent file will be equal to the value specified during the transfer and after successful " + + "completion will be renamed to the original filename. If this value is set, the Dot Rename property is ignored.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .build(); public static final PropertyDescriptor LAST_MODIFIED_TIME = new PropertyDescriptor.Builder() - .name("Last Modified Time") - .description("The lastModifiedTime to assign to the file after transferring it. If not set, the lastModifiedTime will not be changed. " - + "Format must be yyyy-MM-dd'T'HH:mm:ssZ. You may also use expression language such as ${file.lastModifiedTime}. If the value " - + "is invalid, the processor will not be invalid but will fail to change lastModifiedTime of the file.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("Last Modified Time") + .description("The lastModifiedTime to assign to the file after transferring it. If not set, the lastModifiedTime will not be changed. " + + "Format must be yyyy-MM-dd'T'HH:mm:ssZ. You may also use expression language such as ${file.lastModifiedTime}. If the value " + + "is invalid, the processor will not be invalid but will fail to change lastModifiedTime of the file.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor PERMISSIONS = new PropertyDescriptor.Builder() - .name("Permissions") - .description("The permissions to assign to the file after transferring it. Format must be either UNIX rwxrwxrwx with a - in place of " - + "denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). If not set, the permissions will not be changed. You may " - + "also use expression language such as ${file.permissions}. If the value is invalid, the processor will not be invalid but will " - + "fail to change permissions of the file.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("Permissions") + .description("The permissions to assign to the file after transferring it. Format must be either UNIX rwxrwxrwx with a - in place of " + + "denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). If not set, the permissions will not be changed. You may " + + "also use expression language such as ${file.permissions}. If the value is invalid, the processor will not be invalid but will " + + "fail to change permissions of the file.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder() - .name("Remote Owner") - .description("Integer value representing the User ID to set on the file after transferring it. If not set, the owner will not be set. " - + "You may also use expression language such as ${file.owner}. If the value is invalid, the processor will not be invalid but " - + "will fail to change the owner of the file.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("Remote Owner") + .description("Integer value representing the User ID to set on the file after transferring it. If not set, the owner will not be set. " + + "You may also use expression language such as ${file.owner}. If the value is invalid, the processor will not be invalid but " + + "will fail to change the owner of the file.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder() - .name("Remote Group") - .description("Integer value representing the Group ID to set on the file after transferring it. If not set, the group will not be set. " - + "You may also use expression language such as ${file.group}. If the value is invalid, the processor will not be invalid but " - + "will fail to change the group of the file.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("Remote Group") + .description("Integer value representing the Group ID to set on the file after transferring it. If not set, the group will not be set. " + + "You may also use expression language such as ${file.group}. If the value is invalid, the processor will not be invalid but " + + "will fail to change the group of the file.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Batch Size") - .description("The maximum number of FlowFiles to send in a single connection") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("500") - .build(); + .name("Batch Size") + .description("The maximum number of FlowFiles to send in a single connection") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("500") + .build(); }
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java new file mode 100644 index 0000000..6e019ff --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard.util; + +public interface ListableEntity { + + /** + * @return The name of the remote entity + */ + String getName(); + + /** + * @return the identifier of the remote entity. This may or may not be the same as the name of the + * entity but should be unique across all entities. + */ + String getIdentifier(); + + + /** + * @return the timestamp for this entity so that we can be efficient about not performing listings of the same + * entities multiple times + */ + long getTimestamp(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PermissionDeniedException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PermissionDeniedException.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PermissionDeniedException.java new file mode 100644 index 0000000..465995e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PermissionDeniedException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard.util; + +import java.io.IOException; + +public class PermissionDeniedException extends IOException { + private static final long serialVersionUID = -6215434916883053982L; + + public PermissionDeniedException(final String message) { + super(message); + } + + public PermissionDeniedException(final String message, final Throwable t) { + super(message, t); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java index 19955e7..c28f275 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.standard.util; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.nio.file.Path; @@ -51,45 +52,45 @@ import com.jcraft.jsch.SftpException; public class SFTPTransfer implements FileTransfer { public static final PropertyDescriptor PRIVATE_KEY_PATH = new PropertyDescriptor.Builder() - .name("Private Key Path") - .description("The fully qualified path to the Private Key file") - .required(false) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .build(); + .name("Private Key Path") + .description("The fully qualified path to the Private Key file") + .required(false) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); public static final PropertyDescriptor PRIVATE_KEY_PASSPHRASE = new PropertyDescriptor.Builder() - .name("Private Key Passphrase") - .description("Password for the private key") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .sensitive(true) - .build(); + .name("Private Key Passphrase") + .description("Password for the private key") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); public static final PropertyDescriptor HOST_KEY_FILE = new PropertyDescriptor.Builder() - .name("Host Key File") - .description("If supplied, the given file will be used as the Host Key; otherwise, no use host key file will be used") - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .required(false) - .build(); + .name("Host Key File") + .description("If supplied, the given file will be used as the Host Key; otherwise, no use host key file will be used") + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .required(false) + .build(); public static final PropertyDescriptor STRICT_HOST_KEY_CHECKING = new PropertyDescriptor.Builder() - .name("Strict Host Key Checking") - .description("Indicates whether or not strict enforcement of hosts keys should be applied") - .allowableValues("true", "false") - .defaultValue("false") - .required(true) - .build(); + .name("Strict Host Key Checking") + .description("Indicates whether or not strict enforcement of hosts keys should be applied") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() - .name("Port") - .description("The port that the remote system is listening on for file transfers") - .addValidator(StandardValidators.PORT_VALIDATOR) - .required(true) - .defaultValue("22") - .build(); + .name("Port") + .description("The port that the remote system is listening on for file transfers") + .addValidator(StandardValidators.PORT_VALIDATOR) + .required(true) + .defaultValue("22") + .build(); public static final PropertyDescriptor USE_KEEPALIVE_ON_TIMEOUT = new PropertyDescriptor.Builder() - .name("Send Keep Alive On Timeout") - .description("Indicates whether or not to send a single Keep Alive message when SSH socket times out") - .allowableValues("true", "false") - .defaultValue("true") - .required(true) - .build(); + .name("Send Keep Alive On Timeout") + .description("Indicates whether or not to send a single Keep Alive message when SSH socket times out") + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); /** * Dynamic property which is used to decide if the {@link #ensureDirectoryExists(FlowFile, File)} method should perform a {@link ChannelSftp#ls(String)} before calling @@ -99,12 +100,12 @@ public class SFTPTransfer implements FileTransfer { * This property is dynamic until deemed a worthy inclusion as proper. */ public static final PropertyDescriptor DISABLE_DIRECTORY_LISTING = new PropertyDescriptor.Builder() - .name("Disable Directory Listing") - .description("Disables directory listings before operations which might fail, such as configurations which create directory structures.") - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .dynamic(true) - .defaultValue("false") - .build(); + .name("Disable Directory Listing") + .description("Disables directory listings before operations which might fail, such as configurations which create directory structures.") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .dynamic(true) + .defaultValue("false") + .build(); private final ProcessorLog logger; @@ -133,7 +134,16 @@ public class SFTPTransfer implements FileTransfer { public List<FileInfo> getListing() throws IOException { final String path = ctx.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions().getValue(); final int depth = 0; - final int maxResults = ctx.getProperty(FileTransfer.REMOTE_POLL_BATCH_SIZE).asInteger(); + + final int maxResults; + final PropertyValue batchSizeValue = ctx.getProperty(FileTransfer.REMOTE_POLL_BATCH_SIZE); + if (batchSizeValue == null) { + maxResults = Integer.MAX_VALUE; + } else { + final Integer configuredValue = batchSizeValue.asInteger(); + maxResults = configuredValue == null ? Integer.MAX_VALUE : configuredValue; + } + final List<FileInfo> listing = new ArrayList<>(1000); getListing(path, depth, maxResults, listing); return listing; @@ -222,7 +232,15 @@ public class SFTPTransfer implements FileTransfer { sftp.ls(path, filter); } } catch (final SftpException e) { - throw new IOException("Failed to obtain file listing for " + (path == null ? "current directory" : path), e); + final String pathDesc = path == null ? "current directory" : path; + switch (e.id) { + case ChannelSftp.SSH_FX_NO_SUCH_FILE: + throw new FileNotFoundException("Could not perform listing on " + pathDesc + " because could not find the file on the remote server"); + case ChannelSftp.SSH_FX_PERMISSION_DENIED: + throw new PermissionDeniedException("Could not perform listing on " + pathDesc + " due to insufficient permissions"); + default: + throw new IOException("Failed to obtain file listing for " + pathDesc, e); + } } for (final LsEntry entry : subDirs) { @@ -251,24 +269,36 @@ public class SFTPTransfer implements FileTransfer { } FileInfo.Builder builder = new FileInfo.Builder() - .filename(entry.getFilename()) - .fullPathFileName(newFullForwardPath) - .directory(entry.getAttrs().isDir()) - .size(entry.getAttrs().getSize()) - .lastModifiedTime(entry.getAttrs().getMTime() * 1000L) - .permissions(perms) - .owner(Integer.toString(entry.getAttrs().getUId())) - .group(Integer.toString(entry.getAttrs().getGId())); + .filename(entry.getFilename()) + .fullPathFileName(newFullForwardPath) + .directory(entry.getAttrs().isDir()) + .size(entry.getAttrs().getSize()) + .lastModifiedTime(entry.getAttrs().getMTime() * 1000L) + .permissions(perms) + .owner(Integer.toString(entry.getAttrs().getUId())) + .group(Integer.toString(entry.getAttrs().getGId())); return builder.build(); } @Override public InputStream getInputStream(final String remoteFileName) throws IOException { - final ChannelSftp sftp = getChannel(null); + return getInputStream(remoteFileName, null); + } + + @Override + public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException { + final ChannelSftp sftp = getChannel(flowFile); try { return sftp.get(remoteFileName); } catch (final SftpException e) { - throw new IOException("Failed to obtain file content for " + remoteFileName, e); + switch (e.id) { + case ChannelSftp.SSH_FX_NO_SUCH_FILE: + throw new FileNotFoundException("Could not find file " + remoteFileName + " on remote SFTP Server"); + case ChannelSftp.SSH_FX_PERMISSION_DENIED: + throw new PermissionDeniedException("Insufficient permissions to read file " + remoteFileName + " from remote SFTP Server", e); + default: + throw new IOException("Failed to obtain file content for " + remoteFileName, e); + } } } @@ -283,7 +313,14 @@ public class SFTPTransfer implements FileTransfer { try { sftp.rm(fullPath); } catch (final SftpException e) { - throw new IOException("Failed to delete remote file " + fullPath, e); + switch (e.id) { + case ChannelSftp.SSH_FX_NO_SUCH_FILE: + throw new FileNotFoundException("Could not find file " + remoteFileName + " to remove from remote SFTP Server"); + case ChannelSftp.SSH_FX_PERMISSION_DENIED: + throw new PermissionDeniedException("Insufficient permissions to delete file " + remoteFileName + " from remote SFTP Server", e); + default: + throw new IOException("Failed to delete remote file " + fullPath, e); + } } } @@ -333,10 +370,10 @@ public class SFTPTransfer implements FileTransfer { if (directoryName.getParent() != null && !directoryName.getParentFile().equals(new File(File.separator))) { ensureDirectoryExists(flowFile, directoryName.getParentFile()); } - logger.debug("Remote Directory {} does not exist; creating it", new Object[]{remoteDirectory}); + logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory}); try { channel.mkdir(remoteDirectory); - logger.debug("Created {}", new Object[]{remoteDirectory}); + logger.debug("Created {}", new Object[] {remoteDirectory}); } catch (final SftpException e) { throw new IOException("Failed to create remote directory " + remoteDirectory + " due to " + e, e); } @@ -358,9 +395,9 @@ public class SFTPTransfer implements FileTransfer { final JSch jsch = new JSch(); try { - final Session session = jsch.getSession(ctx.getProperty(USERNAME).getValue(), - ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(), - ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger().intValue()); + final Session session = jsch.getSession(ctx.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue(), + ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(), + ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger().intValue()); final String hostKeyVal = ctx.getProperty(HOST_KEY_FILE).getValue(); if (hostKeyVal != null) { @@ -371,7 +408,8 @@ public class SFTPTransfer implements FileTransfer { properties.setProperty("StrictHostKeyChecking", ctx.getProperty(STRICT_HOST_KEY_CHECKING).asBoolean() ? "yes" : "no"); properties.setProperty("PreferredAuthentications", "publickey,password"); - if (ctx.getProperty(FileTransfer.USE_COMPRESSION).asBoolean()) { + final PropertyValue compressionValue = ctx.getProperty(FileTransfer.USE_COMPRESSION); + if (compressionValue != null && "true".equalsIgnoreCase(compressionValue.getValue())) { properties.setProperty("compression.s2c", "z...@openssh.com,zlib,none"); properties.setProperty("compression.c2s", "z...@openssh.com,zlib,none"); } else { @@ -381,12 +419,12 @@ public class SFTPTransfer implements FileTransfer { session.setConfig(properties); - final String privateKeyFile = ctx.getProperty(PRIVATE_KEY_PATH).getValue(); + final String privateKeyFile = ctx.getProperty(PRIVATE_KEY_PATH).evaluateAttributeExpressions(flowFile).getValue(); if (privateKeyFile != null) { - jsch.addIdentity(privateKeyFile, ctx.getProperty(PRIVATE_KEY_PASSPHRASE).getValue()); + jsch.addIdentity(privateKeyFile, ctx.getProperty(PRIVATE_KEY_PASSPHRASE).evaluateAttributeExpressions(flowFile).getValue()); } - final String password = ctx.getProperty(FileTransfer.PASSWORD).getValue(); + final String password = ctx.getProperty(FileTransfer.PASSWORD).evaluateAttributeExpressions(flowFile).getValue(); if (password != null) { session.setPassword(password); } @@ -428,7 +466,7 @@ public class SFTPTransfer implements FileTransfer { sftp.exit(); } } catch (final Exception ex) { - logger.warn("Failed to close ChannelSftp due to {}", new Object[]{ex.toString()}, ex); + logger.warn("Failed to close ChannelSftp due to {}", new Object[] {ex.toString()}, ex); } sftp = null; @@ -437,7 +475,7 @@ public class SFTPTransfer implements FileTransfer { session.disconnect(); } } catch (final Exception ex) { - logger.warn("Failed to close session due to {}", new Object[]{ex.toString()}, ex); + logger.warn("Failed to close session due to {}", new Object[] {ex.toString()}, ex); } session = null; } @@ -515,7 +553,7 @@ public class SFTPTransfer implements FileTransfer { int time = (int) (fileModifyTime.getTime() / 1000L); sftp.setMtime(tempPath, time); } catch (final Exception e) { - logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[]{tempPath, lastModifiedTime, e}); + logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] {tempPath, lastModifiedTime, e}); } } @@ -527,7 +565,7 @@ public class SFTPTransfer implements FileTransfer { sftp.chmod(perms, tempPath); } } catch (final Exception e) { - logger.error("Failed to set permission on {} to {} due to {}", new Object[]{tempPath, permissions, e}); + logger.error("Failed to set permission on {} to {} due to {}", new Object[] {tempPath, permissions, e}); } } @@ -536,7 +574,7 @@ public class SFTPTransfer implements FileTransfer { try { sftp.chown(Integer.parseInt(owner), tempPath); } catch (final Exception e) { - logger.error("Failed to set owner on {} to {} due to {}", new Object[]{tempPath, owner, e}); + logger.error("Failed to set owner on {} to {} due to {}", new Object[] {tempPath, owner, e}); } } @@ -545,7 +583,7 @@ public class SFTPTransfer implements FileTransfer { try { sftp.chgrp(Integer.parseInt(group), tempPath); } catch (final Exception e) { - logger.error("Failed to set group on {} to {} due to {}", new Object[]{tempPath, group, e}); + logger.error("Failed to set group on {} to {} due to {}", new Object[] {tempPath, group, e}); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index ff39ad3..b12fb6f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -28,6 +28,7 @@ org.apache.nifi.processors.standard.EvaluateXQuery org.apache.nifi.processors.standard.ExecuteStreamCommand org.apache.nifi.processors.standard.ExecuteProcess org.apache.nifi.processors.standard.ExtractText +org.apache.nifi.processors.standard.FetchSFTP org.apache.nifi.processors.standard.GenerateFlowFile org.apache.nifi.processors.standard.GetFile org.apache.nifi.processors.standard.GetFTP @@ -43,6 +44,7 @@ org.apache.nifi.processors.standard.GetJMSQueue org.apache.nifi.processors.standard.GetJMSTopic org.apache.nifi.processors.standard.ListenHTTP org.apache.nifi.processors.standard.ListenUDP +org.apache.nifi.processors.standard.ListSFTP org.apache.nifi.processors.standard.LogAttribute org.apache.nifi.processors.standard.MergeContent org.apache.nifi.processors.standard.ModifyBytes http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java new file mode 100644 index 0000000..ba84939 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.standard.util.ListableEntity; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +public class TestAbstractListProcessor { + + @Test + public void testOnlyNewEntriesEmitted() { + final ConcreteListProcessor proc = new ConcreteListProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + proc.addEntity("name", "id", 1492L); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); + runner.clearTransferState(); + + proc.addEntity("name", "id2", 1492L); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); + runner.clearTransferState(); + + proc.addEntity("name", "id2", 1492L); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + proc.addEntity("name", "id3", 1491L); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + proc.addEntity("name", "id2", 1492L); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + proc.addEntity("name", "id2", 1493L); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); + runner.clearTransferState(); + + proc.addEntity("name", "id2", 1493L); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + proc.addEntity("name", "id2", 1493L); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + proc.addEntity("name", "id", 1494L); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); + runner.clearTransferState(); + } + + @Test + public void testStateStoredInDistributedService() throws InitializationException { + final ConcreteListProcessor proc = new ConcreteListProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + final DistributedCache cache = new DistributedCache(); + runner.addControllerService("cache", cache); + runner.enableControllerService(cache); + runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache"); + + runner.run(); + + proc.addEntity("name", "id", 1492L); + runner.run(); + + assertEquals(1, cache.stored.size()); + } + + @Test + public void testFetchOnStart() throws InitializationException { + final ConcreteListProcessor proc = new ConcreteListProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + final DistributedCache cache = new DistributedCache(); + runner.addControllerService("cache", cache); + runner.enableControllerService(cache); + runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache"); + + runner.run(); + + assertEquals(1, cache.fetchCount); + } + + private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient { + private final Map<Object, Object> stored = new HashMap<>(); + private int fetchCount = 0; + + @Override + public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { + return false; + } + + @Override + public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException { + return null; + } + + @Override + public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException { + return false; + } + + @Override + public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { + stored.put(key, value); + } + + @Override + @SuppressWarnings("unchecked") + public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException { + fetchCount++; + return (V) stored.get(key); + } + + @Override + public void close() throws IOException { + } + + @Override + public <K> boolean remove(K key, Serializer<K> serializer) throws IOException { + final Object value = stored.remove(key); + return value != null; + } + } + + + private static class ConcreteListProcessor extends AbstractListProcessor<ListableEntity> { + private final List<ListableEntity> entities = new ArrayList<>(); + + @Override + protected File getPersistenceFile() { + return new File("target/ListProcessor-local-state.json"); + } + + public void addEntity(final String name, final String identifier, final long timestamp) { + final ListableEntity entity = new ListableEntity() { + @Override + public String getName() { + return name; + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public long getTimestamp() { + return timestamp; + } + }; + + entities.add(entity); + } + + @Override + protected Map<String, String> createAttributes(final ListableEntity entity, final ProcessContext context) { + return Collections.emptyMap(); + } + + @Override + protected String getPath(final ProcessContext context) { + return "/path"; + } + + @Override + protected List<ListableEntity> performListing(final ProcessContext context, final Long minTimestamp) throws IOException { + return Collections.unmodifiableList(entities); + } + + @Override + protected boolean isListingResetNecessary(PropertyDescriptor property) { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java new file mode 100644 index 0000000..7aa8f9c --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertFalse; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.standard.util.FileInfo; +import org.apache.nifi.processors.standard.util.FileTransfer; +import org.apache.nifi.processors.standard.util.PermissionDeniedException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +public class TestFetchFileTransfer { + + @Test + public void testContentFetched() { + final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); + runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); + runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); + + proc.addContent("hello.txt", "world".getBytes()); + final Map<String, String> attrs = new HashMap<>(); + attrs.put("filename", "hello.txt"); + runner.enqueue(new byte[0], attrs); + + runner.run(1, false, false); + runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); + assertFalse(proc.closed); + runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0).assertContentEquals("world"); + } + + @Test + public void testContentNotFound() { + final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); + runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); + runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); + + final Map<String, String> attrs = new HashMap<>(); + attrs.put("filename", "hello.txt"); + runner.enqueue(new byte[0], attrs); + + runner.run(1, false, false); + runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 1); + } + + @Test + public void testInsufficientPermissions() { + final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); + runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); + runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); + + proc.addContent("hello.txt", "world".getBytes()); + proc.allowAccess = false; + final Map<String, String> attrs = new HashMap<>(); + attrs.put("filename", "hello.txt"); + runner.enqueue(new byte[0], attrs); + + runner.run(1, false, false); + runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1); + } + + private static class TestableFetchFileTransfer extends FetchFileTransfer { + private boolean allowAccess = true; + private boolean closed = false; + private final Map<String, byte[]> fileContents = new HashMap<>(); + + public void addContent(final String filename, final byte[] content) { + this.fileContents.put(filename, content); + } + + @Override + protected FileTransfer createFileTransfer(final ProcessContext context) { + return new FileTransfer() { + @Override + public void close() throws IOException { + closed = true; + } + + @Override + public String getHomeDirectory(FlowFile flowFile) throws IOException { + return null; + } + + @Override + public List<FileInfo> getListing() throws IOException { + return null; + } + + @Override + public InputStream getInputStream(final String remoteFileName) throws IOException { + return getInputStream(remoteFileName, null); + } + + @Override + public InputStream getInputStream(String remoteFileName, FlowFile flowFile) throws IOException { + if (!allowAccess) { + throw new PermissionDeniedException("test permission denied"); + } + + final byte[] content = fileContents.get(remoteFileName); + if (content == null) { + throw new FileNotFoundException(); + } + + return new ByteArrayInputStream(content); + } + + @Override + public void flush() throws IOException { + } + + @Override + public FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException { + return null; + } + + @Override + public String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException { + return null; + } + + @Override + public void deleteFile(String path, String remoteFileName) throws IOException { + if (!fileContents.containsKey(remoteFileName)) { + throw new FileNotFoundException(); + } + + fileContents.remove(remoteFileName); + } + + @Override + public void deleteDirectory(String remoteDirectoryName) throws IOException { + + } + + @Override + public boolean isClosed() { + return false; + } + + @Override + public String getProtocolName() { + return "test"; + } + + @Override + public void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException { + + } + }; + } + } +}