This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 0bdc392462 NIFI-12249 FetchFTP and FetchSFTP set fetch.failure.reason 
on failures
0bdc392462 is described below

commit 0bdc392462a37c99028fe302d5566bfb2691f39c
Author: annanys23 <anna...@gmail.com>
AuthorDate: Wed Oct 25 00:24:27 2023 +0000

    NIFI-12249 FetchFTP and FetchSFTP set fetch.failure.reason on failures
    
    - Set fetch.failure.reason to relationship name when routing to failure 
relationships
    
    This closes #7929
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
    (cherry picked from commit 90498a352d059376b96630c1c3f1136313999d17)
---
 .../apache/nifi/processors/standard/FetchFTP.java  |  3 +-
 .../processors/standard/FetchFileTransfer.java     | 43 +++++++++++++---------
 .../apache/nifi/processors/standard/FetchSFTP.java |  3 +-
 .../apache/nifi/processors/standard/TestFTP.java   |  2 +
 .../nifi/processors/standard/TestFetchFTP.java     | 11 ++++++
 5 files changed, 42 insertions(+), 20 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
index cf6df69d1e..788b5bb179 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
@@ -44,7 +44,8 @@ import org.apache.nifi.processors.standard.util.FTPTransfer;
     @WritesAttribute(attribute = "ftp.remote.port", description = "The port 
that was used to communicate with the remote FTP server"),
     @WritesAttribute(attribute = "ftp.remote.filename", description = "The 
name of the remote file that was pulled"),
     @WritesAttribute(attribute = "filename", description = "The filename is 
updated to point to the filename fo the remote file"),
-    @WritesAttribute(attribute = "path", description = "If the Remote File 
contains a directory name, that directory name will be added to the FlowFile 
using the 'path' attribute")
+    @WritesAttribute(attribute = "path", description = "If the Remote File 
contains a directory name, that directory name will be added to the FlowFile 
using the 'path' attribute"),
+    @WritesAttribute(attribute = "fetch.failure.reason", description = "The 
name of the failure relationship applied when routing to any failure 
relationship")
 })
 public class FetchFTP extends FetchFileTransfer {
 
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
index 227904282d..1e8efba2c0 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
@@ -61,7 +61,7 @@ public abstract class FetchFileTransfer extends 
AbstractProcessor {
     static final AllowableValue COMPLETION_NONE = new AllowableValue("None", 
"None", "Leave the file as-is");
     static final AllowableValue COMPLETION_MOVE = new AllowableValue("Move 
File", "Move File", "Move the file to the directory specified by the <Move 
Destination Directory> property");
     static final AllowableValue COMPLETION_DELETE = new AllowableValue("Delete 
File", "Delete File", "Deletes the original file from the remote system");
-
+    static final String FAILURE_REASON_ATTRIBUTE = "fetch.failure.reason";
 
     static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
         .name("Hostname")
@@ -254,36 +254,33 @@ public abstract class FetchFileTransfer extends 
AbstractProcessor {
             transfer = transferWrapper.getFileTransfer();
         }
 
+        Relationship failureRelationship = null;
+        boolean closeConnOnFailure = false;
+
         try {
             // Pull data from remote system.
             try {
                 flowFile = transfer.getRemoteFile(filename, flowFile, session);
-
             } catch (final FileNotFoundException e) {
+                failureRelationship = REL_NOT_FOUND;
                 getLogger().log(levelFileNotFound, "Failed to fetch content 
for {} from filename {} on remote host {} because the file could not be found 
on the remote system; routing to {}",
-                        flowFile, filename, host, REL_NOT_FOUND.getName());
-                session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
-                session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
-                cleanupTransfer(transfer, false, transferQueue, host, port);
-                return;
+                        flowFile, filename, host, 
failureRelationship.getName());
             } catch (final PermissionDeniedException e) {
+                failureRelationship = REL_PERMISSION_DENIED;
                 getLogger().error("Failed to fetch content for {} from 
filename {} on remote host {} due to insufficient permissions; routing to {}",
-                        flowFile, filename, host, 
REL_PERMISSION_DENIED.getName());
-                session.transfer(session.penalize(flowFile), 
REL_PERMISSION_DENIED);
-                session.getProvenanceReporter().route(flowFile, 
REL_PERMISSION_DENIED);
-                cleanupTransfer(transfer, false, transferQueue, host, port);
-                return;
+                        flowFile, filename, host, 
failureRelationship.getName());
             } catch (final ProcessException | IOException e) {
-                getLogger().error("Failed to fetch content for {} from 
filename {} on remote host {}:{} due to {}; routing to comms.failure",
-                        flowFile, filename, host, port, e.toString(), e);
-                session.transfer(session.penalize(flowFile), 
REL_COMMS_FAILURE);
-                cleanupTransfer(transfer, true, transferQueue, host, port);
-                return;
+                failureRelationship = REL_COMMS_FAILURE;
+                getLogger().error("Failed to fetch content for {} from 
filename {} on remote host {}:{} due to {}; routing to {}",
+                        flowFile, filename, host, port, e.toString(), 
failureRelationship.getName(), e);
+
+                closeConnOnFailure = true;
             }
 
             // Add FlowFile attributes
-            final String protocolName = transfer.getProtocolName();
             final Map<String, String> attributes = new HashMap<>();
+            final String protocolName = transfer.getProtocolName();
+
             attributes.put(protocolName + ".remote.host", host);
             attributes.put(protocolName + ".remote.port", 
String.valueOf(port));
             attributes.put(protocolName + ".remote.filename", filename);
@@ -296,6 +293,16 @@ public abstract class FetchFileTransfer extends 
AbstractProcessor {
             } else {
                 attributes.put(CoreAttributes.FILENAME.key(), filename);
             }
+
+            if (failureRelationship != null) {
+                attributes.put(FAILURE_REASON_ATTRIBUTE, 
failureRelationship.getName());
+                flowFile = session.putAllAttributes(flowFile, attributes);
+                session.transfer(session.penalize(flowFile), 
failureRelationship);
+                session.getProvenanceReporter().route(flowFile, 
failureRelationship);
+                cleanupTransfer(transfer, closeConnOnFailure, transferQueue, 
host, port);
+                return;
+            }
+
             flowFile = session.putAllAttributes(flowFile, attributes);
 
             // emit provenance event and transfer FlowFile
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
index 69281e5695..e0d78622e6 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
@@ -46,7 +46,8 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer;
     @WritesAttribute(attribute = "sftp.remote.port", description = "The port 
that was used to communicate with the remote SFTP server"),
     @WritesAttribute(attribute = "sftp.remote.filename", description = "The 
name of the remote file that was pulled"),
     @WritesAttribute(attribute = "filename", description = "The filename is 
updated to point to the filename fo the remote file"),
-    @WritesAttribute(attribute = "path", description = "If the Remote File 
contains a directory name, that directory name will be added to the FlowFile 
using the 'path' attribute")
+    @WritesAttribute(attribute = "path", description = "If the Remote File 
contains a directory name, that directory name will be added to the FlowFile 
using the 'path' attribute"),
+    @WritesAttribute(attribute = "fetch.failure.reason", description = "The 
name of the failure relationship applied when routing to any failure 
relationship")
 })
 public class FetchSFTP extends FetchFileTransfer {
 
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
index cb405d31b2..94608c93bd 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
@@ -269,6 +269,7 @@ public class TestFTP {
         runner.run();
 
         runner.assertAllFlowFilesTransferred(FetchFTP.REL_NOT_FOUND);
+        
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
     }
 
     @Test
@@ -290,6 +291,7 @@ public class TestFTP {
         runner.run();
 
         runner.assertAllFlowFilesTransferred(FetchFTP.REL_PERMISSION_DENIED);
+        
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
     }
 
     @Test
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java
index 5f636880ec..4fcdf7a21a 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java
@@ -113,6 +113,9 @@ public class TestFetchFTP {
 
         runner.run(1, false, false);
         runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 
1);
+        
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
+        MockFlowFile transferredFlowFile = 
runner.getPenalizedFlowFiles().get(0);
+        assertEquals(FetchFileTransfer.REL_NOT_FOUND.getName(), 
transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE));
     }
 
     @Test
@@ -122,6 +125,9 @@ public class TestFetchFTP {
 
         runner.run(1, false, false);
         
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 
1);
+        
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
+        MockFlowFile transferredFlowFile = 
runner.getPenalizedFlowFiles().get(0);
+        assertEquals(FetchFileTransfer.REL_PERMISSION_DENIED.getName(), 
transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE));
     }
 
     @Test
@@ -132,6 +138,7 @@ public class TestFetchFTP {
 
         runner.run(2, false, false);
         
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 
2);
+        
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
 
         assertEquals(1, proc.numberOfFileTransfers);
         assertFalse(proc.isClosed);
@@ -145,6 +152,7 @@ public class TestFetchFTP {
 
         runner.run(2, false, false);
         runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 
2);
+        
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
 
         assertEquals(1, proc.numberOfFileTransfers);
         assertFalse(proc.isClosed);
@@ -157,6 +165,9 @@ public class TestFetchFTP {
 
         runner.run(1, false, false);
         
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_COMMS_FAILURE, 1);
+        
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
+        MockFlowFile transferredFlowFile = 
runner.getPenalizedFlowFiles().get(0);
+        assertEquals(FetchFileTransfer.REL_COMMS_FAILURE.getName(), 
transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE));
 
         assertTrue(proc.isClosed);
     }

Reply via email to