This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new d1579b4d21 NIFI-13121: Handle runtime exceptions in FetchHDFS
d1579b4d21 is described below

commit d1579b4d21335c1e200de079cde151f670764c01
Author: Matt Burgess <mattyb...@apache.org>
AuthorDate: Wed May 1 16:52:57 2024 -0400

    NIFI-13121: Handle runtime exceptions in FetchHDFS
    
    Signed-off-by: Joe Gresock <jgres...@gmail.com>
    This closes #8727.
---
 .../apache/nifi/processors/hadoop/FetchHDFS.java   |  7 +++-
 .../nifi/processors/hadoop/TestFetchHDFS.java      | 19 +++++++++
 .../processors/hadoop/util/MockFileSystem.java     | 48 +++++++++++++++++++++-
 3 files changed, 72 insertions(+), 2 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index e026c01862..ce551acb75 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -43,6 +43,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import 
org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
@@ -175,7 +176,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
                 outgoingFlowFile = session.putAttribute(outgoingFlowFile, 
CoreAttributes.FILENAME.key(), outputFilename);
 
                 stopWatch.stop();
-                getLogger().info("Successfully received content from {} for {} 
in {}", new Object[]{qualifiedPath, outgoingFlowFile, stopWatch.getDuration()});
+                getLogger().info("Successfully received content from {} for {} 
in {}", qualifiedPath, outgoingFlowFile, stopWatch.getDuration());
                 outgoingFlowFile = session.putAttribute(outgoingFlowFile, 
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
                 session.getProvenanceReporter().fetch(outgoingFlowFile, 
qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
                 session.transfer(outgoingFlowFile, getSuccessRelationship());
@@ -190,6 +191,10 @@ public class FetchHDFS extends AbstractHadoopProcessor {
                     outgoingFlowFile = session.penalize(outgoingFlowFile);
                     session.transfer(outgoingFlowFile, 
getCommsFailureRelationship());
                 }
+            } catch (FlowFileAccessException ffae) {
+                getLogger().error("Failed to retrieve S3 Object for {}; 
routing to failure", outgoingFlowFile, ffae);
+                outgoingFlowFile = session.penalize(outgoingFlowFile);
+                session.transfer(outgoingFlowFile, 
getCommsFailureRelationship());
             } finally {
                 IOUtils.closeQuietly(stream);
             }
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
index 036904312c..9845c61e28 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
@@ -214,6 +214,25 @@ public class TestFetchHDFS {
         fileSystem.setFailOnOpen(false);
     }
 
+    @Test
+    public void testRuntimeException() {
+        MockFileSystem fileSystem = new MockFileSystem();
+        fileSystem.setRuntimeFailOnOpen(true);
+        FetchHDFS proc = new TestableFetchHDFS(kerberosProperties, fileSystem);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchHDFS.FILENAME, 
"src/test/resources/testdata/randombytes-1.gz");
+        runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "NONE");
+        runner.enqueue("trigger flow file");
+        runner.run();
+
+        runner.assertTransferCount(FetchHDFS.REL_SUCCESS, 0);
+        runner.assertTransferCount(FetchHDFS.REL_FAILURE, 0);
+        runner.assertTransferCount(FetchHDFS.REL_COMMS_FAILURE, 1);
+        // assert that the file was penalized
+        runner.assertPenalizeCount(1);
+        fileSystem.setRuntimeFailOnOpen(false);
+    }
+
     private static class TestableFetchHDFS extends FetchHDFS {
         private final KerberosProperties testKerberosProps;
         private final FileSystem fileSystem;
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
index f020f8b3b5..d5c196f0ee 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.hadoop.util;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -26,6 +27,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.util.Progressable;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.ietf.jgss.GSSException;
 
 import java.io.ByteArrayOutputStream;
@@ -49,6 +51,7 @@ public class MockFileSystem extends FileSystem {
     private final Map<Path, FSDataOutputStream> pathToOutputStream = new 
HashMap<>();
 
     private boolean failOnOpen;
+    private boolean runtimeFailOnOpen;
     private boolean failOnClose;
     private boolean failOnCreate;
     private boolean failOnFileStatus;
@@ -74,6 +77,10 @@ public class MockFileSystem extends FileSystem {
         this.failOnOpen = failOnOpen;
     }
 
+    public void setRuntimeFailOnOpen(final boolean runtimeFailOnOpen) {
+        this.runtimeFailOnOpen = runtimeFailOnOpen;
+    }
+
     public void setAcl(final Path path, final List<AclEntry> aclSpec) {
         pathToAcl.put(path, aclSpec);
     }
@@ -93,7 +100,10 @@ public class MockFileSystem extends FileSystem {
         if (failOnOpen) {
             throw new IOException(new GSSException(13));
         }
-        return null;
+        if (runtimeFailOnOpen) {
+            throw new FlowFileAccessException("runtime");
+        }
+        return createInputStream(f);
     }
 
     @Override
@@ -190,6 +200,19 @@ public class MockFileSystem extends FileSystem {
         return pathToStatus.containsKey(f);
     }
 
+    private FSDataInputStream createInputStream(final Path f) throws 
IOException {
+        if(failOnClose) {
+            return new FSDataInputStream(new StubFSInputStream()) {
+                @Override
+                public void close() throws IOException {
+                    super.close();
+                    throw new IOException("Fail on close");
+                }
+            };
+        } else {
+            return new FSDataInputStream(new StubFSInputStream());
+        }
+    }
     private FSDataOutputStream createOutputStream() {
         if(failOnClose) {
             return new FSDataOutputStream(new ByteArrayOutputStream(), new 
Statistics("")) {
@@ -294,4 +317,27 @@ public class MockFileSystem extends FileSystem {
     private static FsPermission perms(short p) {
         return new FsPermission(p);
     }
+
+    private class StubFSInputStream extends FSInputStream {
+
+        @Override
+        public void seek(long l) throws IOException {
+
+        }
+
+        @Override
+        public long getPos() throws IOException {
+            return 0;
+        }
+
+        @Override
+        public boolean seekToNewSource(long l) throws IOException {
+            return true;
+        }
+
+        @Override
+        public int read() throws IOException {
+            return -1;
+        }
+    }
 }
\ No newline at end of file

Reply via email to