This is an automated email from the ASF dual-hosted git repository. jgresock pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new d1579b4d21 NIFI-13121: Handle runtime exceptions in FetchHDFS d1579b4d21 is described below commit d1579b4d21335c1e200de079cde151f670764c01 Author: Matt Burgess <mattyb...@apache.org> AuthorDate: Wed May 1 16:52:57 2024 -0400 NIFI-13121: Handle runtime exceptions in FetchHDFS Signed-off-by: Joe Gresock <jgres...@gmail.com> This closes #8727. --- .../apache/nifi/processors/hadoop/FetchHDFS.java | 7 +++- .../nifi/processors/hadoop/TestFetchHDFS.java | 19 +++++++++ .../processors/hadoop/util/MockFileSystem.java | 48 +++++++++++++++++++++- 3 files changed, 72 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java index e026c01862..ce551acb75 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -43,6 +43,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler; @@ -175,7 +176,7 @@ public class FetchHDFS extends AbstractHadoopProcessor { outgoingFlowFile = session.putAttribute(outgoingFlowFile, CoreAttributes.FILENAME.key(), outputFilename); stopWatch.stop(); - getLogger().info("Successfully received content from {} for {} in {}", new Object[]{qualifiedPath, outgoingFlowFile, stopWatch.getDuration()}); + getLogger().info("Successfully received content from {} for {} in {}", qualifiedPath, outgoingFlowFile, stopWatch.getDuration()); outgoingFlowFile = session.putAttribute(outgoingFlowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString()); session.getProvenanceReporter().fetch(outgoingFlowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS)); session.transfer(outgoingFlowFile, getSuccessRelationship()); @@ -190,6 +191,10 @@ public class FetchHDFS extends AbstractHadoopProcessor { outgoingFlowFile = session.penalize(outgoingFlowFile); session.transfer(outgoingFlowFile, getCommsFailureRelationship()); } + } catch (FlowFileAccessException ffae) { + getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", outgoingFlowFile, ffae); + outgoingFlowFile = session.penalize(outgoingFlowFile); + session.transfer(outgoingFlowFile, getCommsFailureRelationship()); } finally { IOUtils.closeQuietly(stream); } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java index 036904312c..9845c61e28 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java @@ -214,6 +214,25 @@ public class TestFetchHDFS { fileSystem.setFailOnOpen(false); } + @Test + public void testRuntimeException() { + MockFileSystem fileSystem = new MockFileSystem(); + fileSystem.setRuntimeFailOnOpen(true); + FetchHDFS proc = new TestableFetchHDFS(kerberosProperties, fileSystem); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/randombytes-1.gz"); + runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "NONE"); + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(FetchHDFS.REL_SUCCESS, 0); + runner.assertTransferCount(FetchHDFS.REL_FAILURE, 0); + runner.assertTransferCount(FetchHDFS.REL_COMMS_FAILURE, 1); + // assert that the file was penalized + runner.assertPenalizeCount(1); + fileSystem.setRuntimeFailOnOpen(false); + } + private static class TestableFetchHDFS extends FetchHDFS { private final KerberosProperties testKerberosProps; private final FileSystem fileSystem; diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java index f020f8b3b5..d5c196f0ee 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.hadoop.util; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -26,6 +27,7 @@ import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.util.Progressable; +import org.apache.nifi.processor.exception.FlowFileAccessException; import org.ietf.jgss.GSSException; import java.io.ByteArrayOutputStream; @@ -49,6 +51,7 @@ public class MockFileSystem extends FileSystem { private final Map<Path, FSDataOutputStream> pathToOutputStream = new HashMap<>(); private boolean failOnOpen; + private boolean runtimeFailOnOpen; private boolean failOnClose; private boolean failOnCreate; private boolean failOnFileStatus; @@ -74,6 +77,10 @@ public class MockFileSystem extends FileSystem { this.failOnOpen = failOnOpen; } + public void setRuntimeFailOnOpen(final boolean runtimeFailOnOpen) { + this.runtimeFailOnOpen = runtimeFailOnOpen; + } + public void setAcl(final Path path, final List<AclEntry> aclSpec) { pathToAcl.put(path, aclSpec); } @@ -93,7 +100,10 @@ public class MockFileSystem extends FileSystem { if (failOnOpen) { throw new IOException(new GSSException(13)); } - return null; + if (runtimeFailOnOpen) { + throw new FlowFileAccessException("runtime"); + } + return createInputStream(f); } @Override @@ -190,6 +200,19 @@ public class MockFileSystem extends FileSystem { return pathToStatus.containsKey(f); } + private FSDataInputStream createInputStream(final Path f) throws IOException { + if(failOnClose) { + return new FSDataInputStream(new StubFSInputStream()) { + @Override + public void close() throws IOException { + super.close(); + throw new IOException("Fail on close"); + } + }; + } else { + return new FSDataInputStream(new StubFSInputStream()); + } + } private FSDataOutputStream createOutputStream() { if(failOnClose) { return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")) { @@ -294,4 +317,27 @@ public class MockFileSystem extends FileSystem { private static FsPermission perms(short p) { return new FsPermission(p); } + + private class StubFSInputStream extends FSInputStream { + + @Override + public void seek(long l) throws IOException { + + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public boolean seekToNewSource(long l) throws IOException { + return true; + } + + @Override + public int read() throws IOException { + return -1; + } + } } \ No newline at end of file