Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/416#discussion_r63419802
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SCPTransfer.java
 ---
    @@ -0,0 +1,682 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard.util;
    +
    +import java.io.BufferedWriter;
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.FileWriter;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Date;
    +import java.util.List;
    +import java.util.Locale;
    +import java.util.Properties;
    +import java.util.Vector;
    +import java.util.concurrent.TimeUnit;
    +import java.util.regex.Pattern;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.slf4j.LoggerFactory;
    +
    +import com.jcraft.jsch.ChannelSftp;
    +import com.jcraft.jsch.ChannelSftp.LsEntry;
    +import com.jcraft.jsch.ChannelSftp.LsEntrySelector;
    +import com.jcraft.jsch.JSch;
    +import com.jcraft.jsch.JSchException;
    +import com.jcraft.jsch.Session;
    +import com.jcraft.jsch.SftpException;
    +
    +
    +public class SCPTransfer implements FileTransfer {
    +
    +    public static final PropertyDescriptor PRIVATE_KEY_PATH = new 
PropertyDescriptor.Builder()
    +            .name("Private Key Path")
    +            .description("The fully qualified path to the Private Key 
file")
    +            .required(false)
    +            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor PRIVATE_KEY_PASSPHRASE = new 
PropertyDescriptor.Builder()
    +            .name("Private Key Passphrase")
    +            .description("Password for the private key")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .sensitive(true)
    +            .build();
    +    public static final PropertyDescriptor HOST_KEY_FILE = new 
PropertyDescriptor.Builder()
    +            .name("Host Key File")
    +            .description("If supplied, the given file will be used as the 
Host Key; otherwise, no use host key file will be used")
    +            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
    +            .required(false)
    +            .build();
    +    public static final PropertyDescriptor STRICT_HOST_KEY_CHECKING = new 
PropertyDescriptor.Builder()
    +            .name("Strict Host Key Checking")
    +            .description("Indicates whether or not strict enforcement of 
hosts keys should be applied")
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .required(true)
    +            .build();
    +    public static final PropertyDescriptor PORT = new 
PropertyDescriptor.Builder()
    +            .name("Port")
    +            .description("The port that the remote system is listening on 
for file transfers")
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .required(true)
    +            .defaultValue("22")
    +            .build();
    +    public static final PropertyDescriptor USE_KEEPALIVE_ON_TIMEOUT = new 
PropertyDescriptor.Builder()
    +            .name("Send Keep Alive On Timeout")
    +            .description("Indicates whether or not to send a single Keep 
Alive message when SSH socket times out")
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .required(true)
    +            .build();
    +
    +    /**
    +     * Dynamic property which is used to decide if the {@link 
#ensureDirectoryExists(FlowFile, File)} method should perform a {@link 
ChannelSftp#ls(String)} before calling
    +     * {@link ChannelSftp#mkdir(String)}. In most cases, the code should 
call ls before mkdir, but some weird permission setups (chmod 100) on a 
directory would cause the 'ls' to throw a permission
    +     * exception.
    +     * <p>
    +     * This property is dynamic until deemed a worthy inclusion as proper.
    +     */
    +    public static final PropertyDescriptor DISABLE_DIRECTORY_LISTING = new 
PropertyDescriptor.Builder()
    +            .name("Disable Directory Listing")
    +            .description("Disables directory listings before operations 
which might fail, such as configurations which create directory structures.")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .dynamic(true)
    +            .defaultValue("false")
    +            .build();
    +
    +    private final ProcessorLog logger;
    +
    +    private final ProcessContext ctx;
    +    private Session session;
    +    private ChannelSftp sftp;
    +    private boolean closed = false;
    +    private String homeDir;
    +
    +    private final boolean disableDirectoryListing;
    +
    +    public SCPTransfer(final ProcessContext processContext, final 
ProcessorLog logger) {
    +           
    +        this.ctx = processContext;
    +        this.logger = logger;
    +  
    +        final PropertyValue disableListing = 
processContext.getProperty(DISABLE_DIRECTORY_LISTING);
    +        disableDirectoryListing = disableListing == null ? false : 
Boolean.TRUE.equals(disableListing.asBoolean());
    +    }
    +
    +    @Override
    +    public String getProtocolName() {
    +           return "scp";
    +    }
    +
    +    @Override
    +    public List<FileInfo> getListing() throws IOException {
    +        final String path = 
ctx.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions().getValue();
    +        final int depth = 0;
    +       // final int maxResults = 
ctx.getProperty(FileTransfer.REMOTE_POLL_BATCH_SIZE).asInteger();
    +       final int maxResults;
    +        final PropertyValue batchSizeValue = 
ctx.getProperty(FileTransfer.REMOTE_POLL_BATCH_SIZE);
    +        if (batchSizeValue == null) {
    +            maxResults = Integer.MAX_VALUE;
    +        } else {
    +            final Integer configuredValue = batchSizeValue.asInteger();
    +            maxResults = configuredValue == null ? Integer.MAX_VALUE : 
configuredValue;
    +        }
    +
    +        final List<FileInfo> listing = new ArrayList<>(1000);
    +        getListing(path, depth, maxResults, listing);
    +        return listing;
    +    }
    +
    +    private void getListing(final String path, final int depth, final int 
maxResults, final List<FileInfo> listing) throws IOException {
    +        if (maxResults < 1 || listing.size() >= maxResults) {
    +            return;
    +        }
    +
    +        if (depth >= 100) {
    +            logger.warn(this + " had to stop recursively searching 
directories at a recursive depth of " + depth + " to avoid memory issues");
    +            return;
    +        }
    +
    +        final boolean ignoreDottedFiles = 
ctx.getProperty(FileTransfer.IGNORE_DOTTED_FILES).asBoolean();
    +        final boolean recurse = 
ctx.getProperty(FileTransfer.RECURSIVE_SEARCH).asBoolean();
    +        final String fileFilterRegex = 
ctx.getProperty(FileTransfer.FILE_FILTER_REGEX).getValue();
    +        final Pattern pattern = (fileFilterRegex == null) ? null : 
Pattern.compile(fileFilterRegex);
    +        final String pathFilterRegex = 
ctx.getProperty(FileTransfer.PATH_FILTER_REGEX).getValue();
    +        final Pattern pathPattern = (!recurse || pathFilterRegex == null) 
? null : Pattern.compile(pathFilterRegex);
    +        final String remotePath = 
ctx.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions().getValue();
    +
    +        // check if this directory path matches the PATH_FILTER_REGEX
    +        boolean pathFilterMatches = true;
    +        if (pathPattern != null) {
    +            Path reldir = path == null ? Paths.get(".") : Paths.get(path);
    +            if (remotePath != null) {
    +                reldir = Paths.get(remotePath).relativize(reldir);
    +            }
    +            if (reldir != null && !reldir.toString().isEmpty()) {
    +                if (!pathPattern.matcher(reldir.toString().replace("\\", 
"/")).matches()) {
    +                    pathFilterMatches = false;
    +                }
    +            }
    +        }
    +
    +        final ChannelSftp sftp = getChannel(null);
    +        final boolean isPathMatch = pathFilterMatches;
    +
    +        final List<LsEntry> subDirs = new ArrayList<>();
    +        try {
    +            final LsEntrySelector filter = new LsEntrySelector() {
    +                @Override
    +                public int select(final LsEntry entry) {
    +                    final String entryFilename = entry.getFilename();
    +
    +                    // skip over 'this directory' and 'parent directory' 
special
    +                    // files regardless of ignoring dot files
    +                    if (entryFilename.equals(".") || 
entryFilename.equals("..")) {
    +                        return LsEntrySelector.CONTINUE;
    +                    }
    +
    +                    // skip files and directories that begin with a dot if 
we're
    +                    // ignoring them
    +                    if (ignoreDottedFiles && 
entryFilename.startsWith(".")) {
    +                        return LsEntrySelector.CONTINUE;
    +                    }
    +
    +                    // if is a directory and we're supposed to recurse
    +                    if (recurse && entry.getAttrs().isDir()) {
    +                        subDirs.add(entry);
    +                        return LsEntrySelector.CONTINUE;
    +                    }
    +
    +                    // if is not a directory and is not a link and it 
matches
    +                    // FILE_FILTER_REGEX - then let's add it
    +                    if (!entry.getAttrs().isDir() && 
!entry.getAttrs().isLink() && isPathMatch) {
    +                        if (pattern == null || 
pattern.matcher(entryFilename).matches()) {
    +                            listing.add(newFileInfo(entry, path));
    +                        }
    +                    }
    +
    +                    if (listing.size() >= maxResults) {
    +                        return LsEntrySelector.BREAK;
    +                    }
    +
    +                    return LsEntrySelector.CONTINUE;
    +                }
    +
    +            };
    +
    +            if (path == null || path.trim().isEmpty()) {
    +                sftp.ls(".", filter);
    +            } else {
    +                sftp.ls(path, filter);
    +            }
    +        } catch (final SftpException e) {
    +            final String pathDesc = path == null ? "current directory" : 
path;
    +            switch (e.id) {
    +                case ChannelSftp.SSH_FX_NO_SUCH_FILE:
    +                    throw new FileNotFoundException("Could not perform 
listing on " + pathDesc + " because could not find the file on the remote 
server");
    +                case ChannelSftp.SSH_FX_PERMISSION_DENIED:
    +                    throw new PermissionDeniedException("Could not perform 
listing on " + pathDesc + " due to insufficient permissions");
    +                default:
    +                    throw new IOException("Failed to obtain file listing 
for " + pathDesc, e);
    +            }
    +        }
    +
    +        for (final LsEntry entry : subDirs) {
    +            final String entryFilename = entry.getFilename();
    +            final File newFullPath = new File(path, entryFilename);
    +            final String newFullForwardPath = 
newFullPath.getPath().replace("\\", "/");
    +
    +            try {
    +                getListing(newFullForwardPath, depth + 1, maxResults, 
listing);
    +            } catch (final IOException e) {
    +                logger.error("Unable to get listing from " + 
newFullForwardPath + "; skipping this subdirectory");
    +            }
    +        }
    +    }
    +
    +    private FileInfo newFileInfo(final LsEntry entry, String path) {
    +        if (entry == null) {
    +            return null;
    +        }
    +        final File newFullPath = new File(path, entry.getFilename());
    +        final String newFullForwardPath = 
newFullPath.getPath().replace("\\", "/");
    +
    +        String perms = entry.getAttrs().getPermissionsString();
    +        if (perms.length() > 9) {
    +            perms = perms.substring(perms.length() - 9);
    +        }
    +
    +        FileInfo.Builder builder = new FileInfo.Builder()
    +            .filename(entry.getFilename())
    +            .fullPathFileName(newFullForwardPath)
    +            .directory(entry.getAttrs().isDir())
    +            .size(entry.getAttrs().getSize())
    +            .lastModifiedTime(entry.getAttrs().getMTime() * 1000L)
    +            .permissions(perms)
    +            .owner(Integer.toString(entry.getAttrs().getUId()))
    +            .group(Integer.toString(entry.getAttrs().getGId()));
    +        return builder.build();
    +    }
    +
    +    @Override
    +    public InputStream getInputStream(final String remoteFileName) throws 
IOException {
    +        return getInputStream(remoteFileName, null);
    +    }
    +
    +    @Override
    +    public InputStream getInputStream(final String remoteFileName, final 
FlowFile flowFile) throws IOException {
    +        final ChannelSftp sftp = getChannel(flowFile);
    +        try {
    +            return sftp.get(remoteFileName);
    +        } catch (final SftpException e) {
    +            switch (e.id) {
    +                case ChannelSftp.SSH_FX_NO_SUCH_FILE:
    +                    throw new FileNotFoundException("Could not find file " 
+ remoteFileName + " on remote SFTP Server");
    +                case ChannelSftp.SSH_FX_PERMISSION_DENIED:
    +                    throw new PermissionDeniedException("Insufficient 
permissions to read file " + remoteFileName + " from remote SFTP Server", e);
    +                default:
    +                    throw new IOException("Failed to obtain file content 
for " + remoteFileName, e);
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void flush() throws IOException {
    +        // nothing needed here
    +    }
    +
    +    @Override
    +    public void deleteFile(final String path, final String remoteFileName) 
throws IOException {
    +        final String fullPath = (path == null) ? remoteFileName : 
(path.endsWith("/")) ? path + remoteFileName : path + "/" + remoteFileName;
    +        try {
    +            sftp.rm(fullPath);
    +        } catch (final SftpException e) {
    +            switch (e.id) {
    +                case ChannelSftp.SSH_FX_NO_SUCH_FILE:
    +                    throw new FileNotFoundException("Could not find file " 
+ remoteFileName + " to remove from remote SFTP Server");
    +                case ChannelSftp.SSH_FX_PERMISSION_DENIED:
    +                    throw new PermissionDeniedException("Insufficient 
permissions to delete file " + remoteFileName + " from remote SFTP Server", e);
    +                default:
    +                    throw new IOException("Failed to delete remote file " 
+ fullPath, e);
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void deleteDirectory(final String remoteDirectoryName) throws 
IOException {
    +        try {
    +            sftp.rm(remoteDirectoryName);
    +        } catch (final SftpException e) {
    +            throw new IOException("Failed to delete remote directory " + 
remoteDirectoryName, e);
    +        }
    +    }
    +
    +    @Override
    +    public void ensureDirectoryExists(final FlowFile flowFile, final File 
directoryName) throws IOException {
    +        final ChannelSftp channel = getChannel(flowFile);
    +        final String remoteDirectory = 
directoryName.getAbsolutePath().replace("\\", "/").replaceAll("^.\\:", "");
    +
    +        // if we disable the directory listing, we just want to blindly 
perform the mkdir command,
    +        // eating any exceptions thrown (like if the directory already 
exists).
    +        if (disableDirectoryListing) {
    +            try {
    +                channel.mkdir(remoteDirectory);
    +            } catch (SftpException e) {
    +                if (e.id != ChannelSftp.SSH_FX_FAILURE) {
    +                    throw new IOException("Could not blindly create remote 
directory due to " + e.getMessage(), e);
    +                }
    +            }
    +            return;
    +        }
    +        // end if disableDirectoryListing
    +
    +        boolean exists;
    +        try {
    +            channel.stat(remoteDirectory);
    +            exists = true;
    +        } catch (final SftpException e) {
    +            if (e.id == ChannelSftp.SSH_FX_NO_SUCH_FILE) {
    +                // No Such File
    +                exists = false;
    +            } else {
    +                throw new IOException("Failed to determine if remote 
directory exists at " + remoteDirectory + " due to " + e, e);
    +            }
    +        }
    +
    +        if (!exists) {
    +            // first ensure parent directories exist before creating this 
one
    +            if (directoryName.getParent() != null && 
!directoryName.getParentFile().equals(new File(File.separator))) {
    +                ensureDirectoryExists(flowFile, 
directoryName.getParentFile());
    +            }
    +            logger.debug("Remote Directory {} does not exist; creating 
it", new Object[] {remoteDirectory});
    +            try {
    +                channel.mkdir(remoteDirectory);
    +                logger.debug("Created {}", new Object[] {remoteDirectory});
    +            } catch (final SftpException e) {
    +                throw new IOException("Failed to create remote directory " 
+ remoteDirectory + " due to " + e, e);
    +            }
    +        }
    +    }
    +
    +    private ChannelSftp getChannel(final FlowFile flowFile) throws 
IOException {
    +        if (sftp != null) {
    +            String sessionhost = session.getHost();
    +            String desthost = 
ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
    +            if (sessionhost.equals(desthost)) {
    +                // destination matches so we can keep our current session
    +                return sftp;
    +            } else {
    +                // this flowFile is going to a different destination, 
reset session
    +                close();
    +            }
    +        }
    +
    +        final JSch jsch = new JSch();
    --- End diff --
    
    Ironically they do output a lot interesting info in the logs so having it 
there and configurable could provide a lot of helo determining the problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to