This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new 0bdc392462 NIFI-12249 FetchFTP and FetchSFTP set fetch.failure.reason on failures 0bdc392462 is described below commit 0bdc392462a37c99028fe302d5566bfb2691f39c Author: annanys23 <anna...@gmail.com> AuthorDate: Wed Oct 25 00:24:27 2023 +0000 NIFI-12249 FetchFTP and FetchSFTP set fetch.failure.reason on failures - Set fetch.failure.reason to relationship name when routing to failure relationships This closes #7929 Signed-off-by: David Handermann <exceptionfact...@apache.org> (cherry picked from commit 90498a352d059376b96630c1c3f1136313999d17) --- .../apache/nifi/processors/standard/FetchFTP.java | 3 +- .../processors/standard/FetchFileTransfer.java | 43 +++++++++++++--------- .../apache/nifi/processors/standard/FetchSFTP.java | 3 +- .../apache/nifi/processors/standard/TestFTP.java | 2 + .../nifi/processors/standard/TestFetchFTP.java | 11 ++++++ 5 files changed, 42 insertions(+), 20 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java index cf6df69d1e..788b5bb179 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java @@ -44,7 +44,8 @@ import org.apache.nifi.processors.standard.util.FTPTransfer; @WritesAttribute(attribute = "ftp.remote.port", description = "The port that was used to communicate with the remote FTP server"), @WritesAttribute(attribute = "ftp.remote.filename", description = "The name of the remote file that was pulled"), @WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"), - @WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute") + @WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute"), + @WritesAttribute(attribute = "fetch.failure.reason", description = "The name of the failure relationship applied when routing to any failure relationship") }) public class FetchFTP extends FetchFileTransfer { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java index 227904282d..1e8efba2c0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java @@ -61,7 +61,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor { static final AllowableValue COMPLETION_NONE = new AllowableValue("None", "None", "Leave the file as-is"); static final AllowableValue COMPLETION_MOVE = new AllowableValue("Move File", "Move File", "Move the file to the directory specified by the <Move Destination Directory> property"); static final AllowableValue COMPLETION_DELETE = new AllowableValue("Delete File", "Delete File", "Deletes the original file from the remote system"); - + static final String FAILURE_REASON_ATTRIBUTE = "fetch.failure.reason"; static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() .name("Hostname") @@ -254,36 +254,33 @@ public abstract class FetchFileTransfer extends AbstractProcessor { transfer = transferWrapper.getFileTransfer(); } + Relationship failureRelationship = null; + boolean closeConnOnFailure = false; + try { // Pull data from remote system. try { flowFile = transfer.getRemoteFile(filename, flowFile, session); - } catch (final FileNotFoundException e) { + failureRelationship = REL_NOT_FOUND; getLogger().log(levelFileNotFound, "Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}", - flowFile, filename, host, REL_NOT_FOUND.getName()); - session.transfer(session.penalize(flowFile), REL_NOT_FOUND); - session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND); - cleanupTransfer(transfer, false, transferQueue, host, port); - return; + flowFile, filename, host, failureRelationship.getName()); } catch (final PermissionDeniedException e) { + failureRelationship = REL_PERMISSION_DENIED; getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}", - flowFile, filename, host, REL_PERMISSION_DENIED.getName()); - session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED); - session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED); - cleanupTransfer(transfer, false, transferQueue, host, port); - return; + flowFile, filename, host, failureRelationship.getName()); } catch (final ProcessException | IOException e) { - getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to comms.failure", - flowFile, filename, host, port, e.toString(), e); - session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE); - cleanupTransfer(transfer, true, transferQueue, host, port); - return; + failureRelationship = REL_COMMS_FAILURE; + getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to {}", + flowFile, filename, host, port, e.toString(), failureRelationship.getName(), e); + + closeConnOnFailure = true; } // Add FlowFile attributes - final String protocolName = transfer.getProtocolName(); final Map<String, String> attributes = new HashMap<>(); + final String protocolName = transfer.getProtocolName(); + attributes.put(protocolName + ".remote.host", host); attributes.put(protocolName + ".remote.port", String.valueOf(port)); attributes.put(protocolName + ".remote.filename", filename); @@ -296,6 +293,16 @@ public abstract class FetchFileTransfer extends AbstractProcessor { } else { attributes.put(CoreAttributes.FILENAME.key(), filename); } + + if (failureRelationship != null) { + attributes.put(FAILURE_REASON_ATTRIBUTE, failureRelationship.getName()); + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(session.penalize(flowFile), failureRelationship); + session.getProvenanceReporter().route(flowFile, failureRelationship); + cleanupTransfer(transfer, closeConnOnFailure, transferQueue, host, port); + return; + } + flowFile = session.putAllAttributes(flowFile, attributes); // emit provenance event and transfer FlowFile diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java index 69281e5695..e0d78622e6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java @@ -46,7 +46,8 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer; @WritesAttribute(attribute = "sftp.remote.port", description = "The port that was used to communicate with the remote SFTP server"), @WritesAttribute(attribute = "sftp.remote.filename", description = "The name of the remote file that was pulled"), @WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"), - @WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute") + @WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute"), + @WritesAttribute(attribute = "fetch.failure.reason", description = "The name of the failure relationship applied when routing to any failure relationship") }) public class FetchSFTP extends FetchFileTransfer { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java index cb405d31b2..94608c93bd 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java @@ -269,6 +269,7 @@ public class TestFTP { runner.run(); runner.assertAllFlowFilesTransferred(FetchFTP.REL_NOT_FOUND); + runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE); } @Test @@ -290,6 +291,7 @@ public class TestFTP { runner.run(); runner.assertAllFlowFilesTransferred(FetchFTP.REL_PERMISSION_DENIED); + runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE); } @Test diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java index 5f636880ec..4fcdf7a21a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java @@ -113,6 +113,9 @@ public class TestFetchFTP { runner.run(1, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 1); + runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE); + MockFlowFile transferredFlowFile = runner.getPenalizedFlowFiles().get(0); + assertEquals(FetchFileTransfer.REL_NOT_FOUND.getName(), transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE)); } @Test @@ -122,6 +125,9 @@ public class TestFetchFTP { runner.run(1, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1); + runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE); + MockFlowFile transferredFlowFile = runner.getPenalizedFlowFiles().get(0); + assertEquals(FetchFileTransfer.REL_PERMISSION_DENIED.getName(), transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE)); } @Test @@ -132,6 +138,7 @@ public class TestFetchFTP { runner.run(2, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 2); + runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE); assertEquals(1, proc.numberOfFileTransfers); assertFalse(proc.isClosed); @@ -145,6 +152,7 @@ public class TestFetchFTP { runner.run(2, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 2); + runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE); assertEquals(1, proc.numberOfFileTransfers); assertFalse(proc.isClosed); @@ -157,6 +165,9 @@ public class TestFetchFTP { runner.run(1, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_COMMS_FAILURE, 1); + runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE); + MockFlowFile transferredFlowFile = runner.getPenalizedFlowFiles().get(0); + assertEquals(FetchFileTransfer.REL_COMMS_FAILURE.getName(), transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE)); assertTrue(proc.isClosed); }