This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new b04b9b2213 NIFI-12889 - Retry Kerberos login on auth failure in HDFS 
processors
b04b9b2213 is described below

commit b04b9b2213f902d8fd912ab3ba950875deb7e9e4
Author: Matt Burgess <mattyb...@apache.org>
AuthorDate: Tue Apr 9 13:26:43 2024 -0400

    NIFI-12889 - Retry Kerberos login on auth failure in HDFS processors
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #8618.
---
 .../processors/hadoop/AbstractHadoopProcessor.java |  46 +++-
 .../apache/nifi/processors/hadoop/DeleteHDFS.java  |  33 ++-
 .../apache/nifi/processors/hadoop/FetchHDFS.java   | 100 ++++----
 .../org/apache/nifi/processors/hadoop/GetHDFS.java |  21 +-
 .../nifi/processors/hadoop/GetHDFSFileInfo.java    |  16 +-
 .../processors/hadoop/GetHDFSSequenceFile.java     |  19 +-
 .../apache/nifi/processors/hadoop/MoveHDFS.java    | 180 +++++++-------
 .../org/apache/nifi/processors/hadoop/PutHDFS.java | 126 ++++------
 .../GSSExceptionRollbackYieldSessionHandler.java   |  30 +++
 .../processors/hadoop/GetHDFSSequenceFileTest.java |  57 ++++-
 .../apache/nifi/processors/hadoop/GetHDFSTest.java |  39 ++-
 .../nifi/processors/hadoop/MoveHDFSTest.java       |  62 ++++-
 .../apache/nifi/processors/hadoop/PutHDFSTest.java | 191 +++-----------
 .../nifi/processors/hadoop/TestDeleteHDFS.java     |  20 +-
 .../nifi/processors/hadoop/TestFetchHDFS.java      |  62 +++--
 .../processors/hadoop/TestGetHDFSFileInfo.java     | 184 +++-----------
 .../processors/hadoop/util/MockFileSystem.java     | 273 +++++++++++++++++++++
 17 files changed, 854 insertions(+), 605 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 960e7a5b7c..12b9bbc68c 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.hadoop;
 
+import com.google.common.base.Throwables;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -42,11 +43,13 @@ import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.kerberos.KerberosUserService;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.security.krb.KerberosKeytabUser;
 import org.apache.nifi.security.krb.KerberosPasswordUser;
 import org.apache.nifi.security.krb.KerberosUser;
+import org.ietf.jgss.GSSException;
 
 import javax.net.SocketFactory;
 import java.io.File;
@@ -62,7 +65,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
 import java.util.regex.Pattern;
+import java.util.stream.Stream;
 
 /**
  * This is a base class that is helpful when building processors interacting 
with HDFS.
@@ -171,7 +177,7 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor implemen
 
     // variables shared by all threads of this processor
     // Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
-    private final AtomicReference<HdfsResources> hdfsResources = new 
AtomicReference<>();
+    final AtomicReference<HdfsResources> hdfsResources = new 
AtomicReference<>();
 
     // Holder of cached Configuration information so validation does not 
reload the same config over and over
     private final AtomicReference<ValidationResources> 
validationResourceHolder = new AtomicReference<>();
@@ -532,12 +538,7 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor implemen
 
     protected FileSystem getFileSystemAsUser(final Configuration config, 
UserGroupInformation ugi) throws IOException {
         try {
-            return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
-                @Override
-                public FileSystem run() throws Exception {
-                    return FileSystem.get(config);
-                }
-            });
+            return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> 
FileSystem.get(config));
         } catch (InterruptedException e) {
             throw new IOException("Unable to create file system: " + 
e.getMessage());
         }
@@ -703,4 +704,35 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor implemen
 
         return new Path(path.replaceAll("/+", "/"));
     }
+
+    /**
+     * Returns an optional with the first throwable in the causal chain that 
is assignable to the provided cause type,
+     * and satisfies the provided cause predicate, {@link Optional#empty()} 
otherwise.
+     * @param t The throwable to inspect for the cause.
+     * @return Throwable Cause
+     */
+    protected <T extends Throwable> Optional<T> findCause(Throwable t, 
Class<T> expectedCauseType, Predicate<T> causePredicate) {
+        Stream<Throwable> causalChain = Throwables.getCausalChain(t).stream();
+        return causalChain
+                .filter(expectedCauseType::isInstance)
+                .map(expectedCauseType::cast)
+                .filter(causePredicate)
+                .findFirst();
+    }
+
+    protected boolean handleAuthErrors(Throwable t, ProcessSession session, 
ProcessContext context, BiConsumer<ProcessSession, ProcessContext> 
sessionHandler) {
+        Optional<GSSException> causeOptional = findCause(t, 
GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
+        if (causeOptional.isPresent()) {
+
+            getLogger().error("An error occurred while connecting to HDFS. 
Rolling back session and, and resetting HDFS resources", causeOptional.get());
+            try {
+                
hdfsResources.set(resetHDFSResources(getConfigLocations(context), context));
+            } catch (IOException ioe) {
+                getLogger().error("An error occurred resetting HDFS resources, 
you may need to restart the processor.");
+            }
+            sessionHandler.accept(session, context);
+            return true;
+        }
+        return false;
+    }
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
index 0cab0b0bdc..a22991eed8 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
@@ -39,6 +39,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
 
 import java.io.IOException;
 import java.security.PrivilegedAction;
@@ -177,16 +178,20 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
                             flowFile = session.putAttribute(flowFile, 
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
                             
session.getProvenanceReporter().invokeRemoteProcess(flowFile, 
qualifiedPath.toString());
                         } catch (IOException ioe) {
-                            // One possible scenario is that the IOException 
is permissions based, however it would be impractical to check every possible
-                            // external HDFS authorization tool (Ranger, 
Sentry, etc). Local ACLs could be checked but the operation would be expensive.
-                            getLogger().warn("Failed to delete file or 
directory", ioe);
-
-                            Map<String, String> attributes = 
Maps.newHashMapWithExpectedSize(1);
-                            // The error message is helpful in understanding 
at a flowfile level what caused the IOException (which ACL is denying the 
operation, e.g.)
-                            attributes.put(getAttributePrefix() + 
".error.message", ioe.getMessage());
-
-                            
session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), 
getFailureRelationship());
-                            failedPath++;
+                            if (handleAuthErrors(ioe, session, context, new 
GSSExceptionRollbackYieldSessionHandler())) {
+                                return null;
+                            } else {
+                                // One possible scenario is that the 
IOException is permissions based, however it would be impractical to check 
every possible
+                                // external HDFS authorization tool (Ranger, 
Sentry, etc). Local ACLs could be checked but the operation would be expensive.
+                                getLogger().warn("Failed to delete file or 
directory", ioe);
+
+                                Map<String, String> attributes = 
Maps.newHashMapWithExpectedSize(1);
+                                // The error message is helpful in 
understanding at a flowfile level what caused the IOException (which ACL is 
denying the operation, e.g.)
+                                attributes.put(getAttributePrefix() + 
".error.message", ioe.getMessage());
+
+                                
session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), 
getFailureRelationship());
+                                failedPath++;
+                            }
                         }
                     }
                 }
@@ -198,8 +203,12 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
                     session.remove(flowFile);
                 }
             } catch (IOException e) {
-                getLogger().error("Error processing delete for flowfile {} due 
to {}", flowFile, e.getMessage(), e);
-                session.transfer(flowFile, getFailureRelationship());
+                if (handleAuthErrors(e, session, context, new 
GSSExceptionRollbackYieldSessionHandler())) {
+                    return null;
+                } else {
+                    getLogger().error("Error processing delete for flowfile {} 
due to {}", flowFile, e.getMessage(), e);
+                    session.transfer(flowFile, getFailureRelationship());
+                }
             }
 
             return null;
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index 2a4986cadd..e026c01862 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -45,6 +45,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
 import org.apache.nifi.util.StopWatch;
 
 import java.io.FileNotFoundException;
@@ -141,60 +142,59 @@ public class FetchHDFS extends AbstractHadoopProcessor {
         final StopWatch stopWatch = new StopWatch(true);
         final FlowFile finalFlowFile = flowFile;
 
-        ugi.doAs(new PrivilegedAction<Object>() {
-            @Override
-            public Object run() {
-                InputStream stream = null;
-                CompressionCodec codec = null;
-                Configuration conf = getConfiguration();
-                final CompressionCodecFactory compressionCodecFactory = new 
CompressionCodecFactory(conf);
-                final CompressionType compressionType = 
getCompressionType(context);
-                final boolean inferCompressionCodec = compressionType == 
CompressionType.AUTOMATIC;
-
-                if(inferCompressionCodec) {
-                    codec = compressionCodecFactory.getCodec(path);
-                } else if (compressionType != CompressionType.NONE) {
-                    codec = getCompressionCodec(context, getConfiguration());
-                }
+        ugi.doAs((PrivilegedAction<Object>) () -> {
+            InputStream stream = null;
+            CompressionCodec codec = null;
+            Configuration conf = getConfiguration();
+            final CompressionCodecFactory compressionCodecFactory = new 
CompressionCodecFactory(conf);
+            final CompressionType compressionType = 
getCompressionType(context);
+            final boolean inferCompressionCodec = compressionType == 
CompressionType.AUTOMATIC;
+
+            if (inferCompressionCodec) {
+                codec = compressionCodecFactory.getCodec(path);
+            } else if (compressionType != CompressionType.NONE) {
+                codec = getCompressionCodec(context, getConfiguration());
+            }
 
-                FlowFile flowFile = finalFlowFile;
-                final Path qualifiedPath = path.makeQualified(hdfs.getUri(), 
hdfs.getWorkingDirectory());
-                try {
-                    final String outputFilename;
-                    final String originalFilename = path.getName();
-                    stream = hdfs.open(path, 16384);
-
-                    // Check if compression codec is defined (inferred or 
otherwise)
-                    if (codec != null) {
-                        stream = codec.createInputStream(stream);
-                        outputFilename = 
StringUtils.removeEnd(originalFilename, codec.getDefaultExtension());
-                    } else {
-                        outputFilename = originalFilename;
-                    }
-
-                    flowFile = session.importFrom(stream, finalFlowFile);
-                    flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), outputFilename);
-
-                    stopWatch.stop();
-                    getLogger().info("Successfully received content from {} 
for {} in {}", new Object[] {qualifiedPath, flowFile, stopWatch.getDuration()});
-                    flowFile = session.putAttribute(flowFile, 
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
-                    session.getProvenanceReporter().fetch(flowFile, 
qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
-                    session.transfer(flowFile, getSuccessRelationship());
-                } catch (final FileNotFoundException | AccessControlException 
e) {
-                    getLogger().error("Failed to retrieve content from {} for 
{} due to {}; routing to failure", new Object[] {qualifiedPath, flowFile, e});
-                    flowFile = session.putAttribute(flowFile, 
getAttributePrefix() + ".failure.reason", e.getMessage());
-                    flowFile = session.penalize(flowFile);
-                    session.transfer(flowFile, getFailureRelationship());
-                } catch (final IOException e) {
-                    getLogger().error("Failed to retrieve content from {} for 
{} due to {}; routing to comms.failure", new Object[] {qualifiedPath, flowFile, 
e});
-                    flowFile = session.penalize(flowFile);
-                    session.transfer(flowFile, getCommsFailureRelationship());
-                } finally {
-                    IOUtils.closeQuietly(stream);
+            FlowFile outgoingFlowFile = finalFlowFile;
+            final Path qualifiedPath = path.makeQualified(hdfs.getUri(), 
hdfs.getWorkingDirectory());
+            try {
+                final String outputFilename;
+                final String originalFilename = path.getName();
+                stream = hdfs.open(path, 16384);
+
+                // Check if compression codec is defined (inferred or 
otherwise)
+                if (codec != null) {
+                    stream = codec.createInputStream(stream);
+                    outputFilename = StringUtils.removeEnd(originalFilename, 
codec.getDefaultExtension());
+                } else {
+                    outputFilename = originalFilename;
                 }
 
-                return null;
+                outgoingFlowFile = session.importFrom(stream, finalFlowFile);
+                outgoingFlowFile = session.putAttribute(outgoingFlowFile, 
CoreAttributes.FILENAME.key(), outputFilename);
+
+                stopWatch.stop();
+                getLogger().info("Successfully received content from {} for {} 
in {}", new Object[]{qualifiedPath, outgoingFlowFile, stopWatch.getDuration()});
+                outgoingFlowFile = session.putAttribute(outgoingFlowFile, 
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
+                session.getProvenanceReporter().fetch(outgoingFlowFile, 
qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
+                session.transfer(outgoingFlowFile, getSuccessRelationship());
+            } catch (final FileNotFoundException | AccessControlException e) {
+                getLogger().error("Failed to retrieve content from {} for {} 
due to {}; routing to failure", new Object[]{qualifiedPath, outgoingFlowFile, 
e});
+                outgoingFlowFile = session.putAttribute(outgoingFlowFile, 
getAttributePrefix() + ".failure.reason", e.getMessage());
+                outgoingFlowFile = session.penalize(outgoingFlowFile);
+                session.transfer(outgoingFlowFile, getFailureRelationship());
+            } catch (final IOException e) {
+                if (!handleAuthErrors(e, session, context, new 
GSSExceptionRollbackYieldSessionHandler())) {
+                    getLogger().error("Failed to retrieve content from {} for 
{} due to {}; routing to comms.failure", qualifiedPath, outgoingFlowFile, e);
+                    outgoingFlowFile = session.penalize(outgoingFlowFile);
+                    session.transfer(outgoingFlowFile, 
getCommsFailureRelationship());
+                }
+            } finally {
+                IOUtils.closeQuietly(stream);
             }
+
+            return null;
         });
     }
 
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index f1e8661366..c168a18fa2 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -49,6 +49,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
 import org.apache.nifi.util.StopWatch;
 
 import java.io.IOException;
@@ -294,12 +295,12 @@ public class GetHDFS extends AbstractHadoopProcessor {
                     }
                     if (logEmptyListing.getAndDecrement() > 0) {
                         getLogger().info("Obtained file listing in {} 
milliseconds; listing had {} items, {} of which were new",
-                                new Object[]{millis, listedFiles.size(), 
newItems});
+                                millis, listedFiles.size(), newItems);
                     }
                 }
             } catch (IOException e) {
-                context.yield();
-                getLogger().warn("Error while retrieving list of files due to 
{}", new Object[]{e});
+                handleAuthErrors(e, session, context, new 
GSSExceptionRollbackYieldSessionHandler());
+                getLogger().warn("Error while retrieving list of files due to 
{}", e.getMessage(), e);
                 return;
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
@@ -388,20 +389,20 @@ public class GetHDFS extends AbstractHadoopProcessor {
                 flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), outputFilename);
 
                 if (!keepSourceFiles && 
!getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () -> 
hdfs.delete(file, false))) {
-                    getLogger().warn("Could not remove {} from HDFS. Not 
ingesting this file ...",
-                            new Object[]{file});
+                    getLogger().warn("Could not remove {} from HDFS. Not 
ingesting this file ...", file);
                     session.remove(flowFile);
                     continue;
                 }
 
                 session.getProvenanceReporter().receive(flowFile, 
file.toString());
                 session.transfer(flowFile, REL_SUCCESS);
-                getLogger().info("retrieved {} from HDFS {} in {} milliseconds 
at a rate of {}",
-                        new Object[]{flowFile, file, millis, dataRate});
+                getLogger().info("retrieved {} from HDFS {} in {} milliseconds 
at a rate of {}", flowFile, file, millis, dataRate);
             } catch (final Throwable t) {
-                getLogger().error("Error retrieving file {} from HDFS due to 
{}", new Object[]{file, t});
-                session.rollback();
-                context.yield();
+                if (!handleAuthErrors(t, session, context, new 
GSSExceptionRollbackYieldSessionHandler())) {
+                    getLogger().error("Error retrieving file {} from HDFS due 
to {}", file, t);
+                    session.rollback();
+                    context.yield();
+                }
             } finally {
                 IOUtils.closeQuietly(stream);
                 stream = null;
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
index 8383732f77..23541f0e81 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
@@ -55,6 +55,7 @@ import 
org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
 
 import static 
org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Grouping.ALL;
 import static 
org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Grouping.DIR;
@@ -75,10 +76,10 @@ import static 
org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequ
         @WritesAttribute(attribute = "hdfs.owner", description = "The user 
that owns the object in HDFS"),
         @WritesAttribute(attribute = "hdfs.group", description = "The group 
that owns the object in HDFS"),
         @WritesAttribute(attribute = "hdfs.lastModified", description = "The 
timestamp of when the object in HDFS was last modified, as milliseconds since 
midnight Jan 1, 1970 UTC"),
-        @WritesAttribute(attribute = "hdfs.length", description = ""
-                + "In case of files: The number of bytes in the file in HDFS.  
"
+        @WritesAttribute(attribute = "hdfs.length", description =
+                "In case of files: The number of bytes in the file in HDFS.  "
                 + "In case of dirs: Retuns storage space consumed by 
directory. "
-                + ""),
+                ),
         @WritesAttribute(attribute = "hdfs.count.files", description = "In 
case of type='directory' will represent total count of files under this dir. "
                 + "Won't be populated to other types of HDFS objects. "),
         @WritesAttribute(attribute = "hdfs.count.dirs", description = "In case 
of type='directory' will represent total count of directories under this dir 
(including itself). "
@@ -327,9 +328,12 @@ public class GetHDFSFileInfo extends 
AbstractHadoopProcessor {
             ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + 
e);
             session.transfer(ff, REL_FAILURE);
         } catch (final Exception e) {
-            getLogger().error("Failed to perform listing of HDFS due to {}", 
new Object[]{e});
-            ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + 
e);
-            session.transfer(ff, REL_FAILURE);
+            // Catch GSSExceptions and reset the resources
+            if (!handleAuthErrors(e, session, context, new 
GSSExceptionRollbackYieldSessionHandler())) {
+                getLogger().error("Failed to perform listing of HDFS due to 
{}", new Object[]{e});
+                ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " 
+ e);
+                session.transfer(ff, REL_FAILURE);
+            }
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
index 86fe791733..6a674599b7 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
@@ -30,6 +30,7 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import 
org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
 import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
 import org.apache.nifi.util.StopWatch;
 
@@ -109,9 +110,14 @@ public class GetHDFSSequenceFile extends GetHDFS {
                     logger.warn("Unable to delete path " + file.toString() + " 
from HDFS.  Will likely be picked up over and over...");
                 }
             } catch (Throwable t) {
-                logger.error("Error retrieving file {} from HDFS due to {}", 
new Object[]{file, t});
-                session.rollback();
-                context.yield();
+                final String errorString = "Error retrieving file {} from HDFS 
due to {}";
+                if (!handleAuthErrors(t, session, context, new 
GSSExceptionRollbackYieldSessionHandler())) {
+                    logger.error(errorString, file, t);
+                    session.rollback();
+                    context.yield();
+                } else {
+                    logger.warn(errorString, file, t);
+                }
             } finally {
                 stopWatch.stop();
                 long totalSize = 0;
@@ -132,12 +138,7 @@ public class GetHDFSSequenceFile extends GetHDFS {
     }
 
     protected Set<FlowFile> getFlowFiles(final Configuration conf, final 
FileSystem hdfs, final SequenceFileReader<Set<FlowFile>> reader, final Path 
file) throws Exception {
-        PrivilegedExceptionAction<Set<FlowFile>> privilegedExceptionAction = 
new PrivilegedExceptionAction<Set<FlowFile>>() {
-            @Override
-            public Set<FlowFile> run() throws Exception {
-                return reader.readSequenceFile(file, conf, hdfs);
-            }
-        };
+        PrivilegedExceptionAction<Set<FlowFile>> privilegedExceptionAction = 
() -> reader.readSequenceFile(file, conf, hdfs);
         UserGroupInformation userGroupInformation = getUserGroupInformation();
         if (userGroupInformation == null) {
             return privilegedExceptionAction.run();
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
index 33e1fac44c..aba9065820 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
@@ -46,10 +46,13 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
 import org.apache.nifi.util.StopWatch;
+import org.ietf.jgss.GSSException;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -57,6 +60,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -255,7 +259,10 @@ public class MoveHDFS extends AbstractHadoopProcessor {
                 throw new IOException("Input Directory or File does not exist 
in HDFS");
             }
         } catch (Exception e) {
-            getLogger().error("Failed to retrieve content from {} for {} due 
to {}; routing to failure", new Object[]{filenameValue, flowFile, e});
+            if (handleAuthErrors(e, session, context, new 
GSSExceptionRollbackYieldSessionHandler())) {
+                return;
+            }
+            getLogger().error("Failed to retrieve content from {} for {} due 
to {}; routing to failure", filenameValue, flowFile, e);
             flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", 
e.getMessage());
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
@@ -294,7 +301,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
                 if (logEmptyListing.getAndDecrement() > 0) {
                     getLogger().info(
                             "Obtained file listing in {} milliseconds; listing 
had {} items, {} of which were new",
-                            new Object[]{millis, listedFiles.size(), 
newItems});
+                            millis, listedFiles.size(), newItems);
                 }
             }
         } catch (IOException e) {
@@ -322,7 +329,12 @@ public class MoveHDFS extends AbstractHadoopProcessor {
             queueLock.unlock();
         }
 
-        processBatchOfFiles(files, context, session, flowFile);
+        try {
+            processBatchOfFiles(files, context, session, flowFile);
+            session.remove(flowFile);
+        } catch (UncheckedIOException e) {
+            handleAuthErrors(e, session, context, new 
GSSExceptionRollbackYieldSessionHandler());
+        }
 
         queueLock.lock();
         try {
@@ -330,8 +342,6 @@ public class MoveHDFS extends AbstractHadoopProcessor {
         } finally {
             queueLock.unlock();
         }
-
-        session.remove(flowFile);
     }
 
     protected void processBatchOfFiles(final List<Path> files, final 
ProcessContext context,
@@ -352,95 +362,95 @@ public class MoveHDFS extends AbstractHadoopProcessor {
 
         for (final Path file : files) {
 
-            ugi.doAs(new PrivilegedAction<Object>() {
-                @Override
-                public Object run() {
-                    FlowFile flowFile = session.create(parentFlowFile);
-                    try {
-                        final String originalFilename = file.getName();
-                        final Path outputDirPath = getNormalizedPath(context, 
OUTPUT_DIRECTORY, parentFlowFile);
-                        final Path newFile = new Path(outputDirPath, 
originalFilename);
-                        final boolean destinationExists = hdfs.exists(newFile);
-                        // If destination file already exists, resolve that
-                        // based on processor configuration
-                        if (destinationExists) {
-                            switch (processorConfig.getConflictResolution()) {
-                                case REPLACE_RESOLUTION:
-                                    // Remove destination file (newFile) to 
replace
-                                    if (hdfs.delete(newFile, false)) {
-                                        getLogger().info("deleted {} in order 
to replace with the contents of {}",
-                                                new Object[]{newFile, 
flowFile});
-                                    }
-                                    break;
-                                case IGNORE_RESOLUTION:
-                                    session.transfer(flowFile, REL_SUCCESS);
-                                    getLogger().info(
-                                            "transferring {} to success 
because file with same name already exists",
-                                            new Object[]{flowFile});
-                                    return null;
-                                case FAIL_RESOLUTION:
-                                    
session.transfer(session.penalize(flowFile), REL_FAILURE);
-                                    getLogger().warn(
-                                            "penalizing {} and routing to 
failure because file with same name already exists",
-                                            new Object[]{flowFile});
-                                    return null;
-                                default:
-                                    break;
-                            }
+            ugi.doAs((PrivilegedAction<Object>) () -> {
+                FlowFile flowFile = session.create(parentFlowFile);
+                try {
+                    final String originalFilename = file.getName();
+                    final Path outputDirPath = getNormalizedPath(context, 
OUTPUT_DIRECTORY, parentFlowFile);
+                    final Path newFile = new Path(outputDirPath, 
originalFilename);
+                    final boolean destinationExists = hdfs.exists(newFile);
+                    // If destination file already exists, resolve that
+                    // based on processor configuration
+                    if (destinationExists) {
+                        switch (processorConfig.getConflictResolution()) {
+                            case REPLACE_RESOLUTION:
+                                // Remove destination file (newFile) to replace
+                                if (hdfs.delete(newFile, false)) {
+                                    getLogger().info("deleted {} in order to 
replace with the contents of {}",
+                                            new Object[]{newFile, flowFile});
+                                }
+                                break;
+                            case IGNORE_RESOLUTION:
+                                session.transfer(flowFile, REL_SUCCESS);
+                                getLogger().info(
+                                        "transferring {} to success because 
file with same name already exists",
+                                        new Object[]{flowFile});
+                                return null;
+                            case FAIL_RESOLUTION:
+                                session.transfer(session.penalize(flowFile), 
REL_FAILURE);
+                                getLogger().warn(
+                                        "penalizing {} and routing to failure 
because file with same name already exists",
+                                        new Object[]{flowFile});
+                                return null;
+                            default:
+                                break;
                         }
+                    }
 
-                        // Create destination directory if it does not exist
-                        try {
-                            if 
(!hdfs.getFileStatus(outputDirPath).isDirectory()) {
-                                throw new IOException(outputDirPath.toString()
-                                        + " already exists and is not a 
directory");
-                            }
-                        } catch (FileNotFoundException fe) {
-                            if (!hdfs.mkdirs(outputDirPath)) {
-                                throw new IOException(outputDirPath.toString() 
+ " could not be created");
-                            }
-                            changeOwner(context, hdfs, outputDirPath);
+                    // Create destination directory if it does not exist
+                    try {
+                        if (!hdfs.getFileStatus(outputDirPath).isDirectory()) {
+                            throw new IOException(outputDirPath + " already 
exists and is not a directory");
+                        }
+                    } catch (FileNotFoundException fe) {
+                        if (!hdfs.mkdirs(outputDirPath)) {
+                            throw new IOException(outputDirPath + " could not 
be created");
                         }
+                        changeOwner(context, hdfs, outputDirPath);
+                    }
 
-                        boolean moved = false;
-                        for (int i = 0; i < 10; i++) { // try to rename 
multiple
-                            // times.
-                            if (processorConfig.getOperation().equals("move")) 
{
-                                if (hdfs.rename(file, newFile)) {
-                                    moved = true;
-                                    break;// rename was successful
-                                }
-                            } else {
-                                if (FileUtil.copy(hdfs, file, hdfs, newFile, 
false, conf)) {
-                                    moved = true;
-                                    break;// copy was successful
-                                }
+                    boolean moved = false;
+                    for (int i = 0; i < 10; i++) { // try to rename multiple
+                        // times.
+                        if (processorConfig.getOperation().equals("move")) {
+                            if (hdfs.rename(file, newFile)) {
+                                moved = true;
+                                break;// rename was successful
+                            }
+                        } else {
+                            if (FileUtil.copy(hdfs, file, hdfs, newFile, 
false, conf)) {
+                                moved = true;
+                                break;// copy was successful
                             }
-                            Thread.sleep(200L);// try waiting to let whatever 
might cause rename failure to resolve
-                        }
-                        if (!moved) {
-                            throw new ProcessException("Could not move file " 
+ file + " to its final filename");
                         }
+                        Thread.sleep(200L);// try waiting to let whatever 
might cause rename failure to resolve
+                    }
+                    if (!moved) {
+                        throw new ProcessException("Could not move file " + 
file + " to its final filename");
+                    }
 
-                        changeOwner(context, hdfs, newFile);
-                        final String outputPath = newFile.toString();
-                        final String newFilename = newFile.getName();
-                        final String hdfsPath = newFile.getParent().toString();
-                        flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), newFilename);
-                        flowFile = session.putAttribute(flowFile, 
ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
-                        final Path qualifiedPath = 
newFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
-                        flowFile = session.putAttribute(flowFile, 
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
-                        final String transitUri = hdfs.getUri() + 
StringUtils.prependIfMissing(outputPath, "/");
-                        session.getProvenanceReporter().send(flowFile, 
transitUri);
-                        session.transfer(flowFile, REL_SUCCESS);
-
-                    } catch (final Throwable t) {
-                        getLogger().error("Failed to rename on HDFS due to 
{}", new Object[]{t});
-                        session.transfer(session.penalize(flowFile), 
REL_FAILURE);
-                        context.yield();
+                    changeOwner(context, hdfs, newFile);
+                    final String outputPath = newFile.toString();
+                    final String newFilename = newFile.getName();
+                    final String hdfsPath = newFile.getParent().toString();
+                    flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), newFilename);
+                    flowFile = session.putAttribute(flowFile, 
ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
+                    final Path qualifiedPath = 
newFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
+                    flowFile = session.putAttribute(flowFile, 
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
+                    final String transitUri = hdfs.getUri() + 
StringUtils.prependIfMissing(outputPath, "/");
+                    session.getProvenanceReporter().send(flowFile, transitUri);
+                    session.transfer(flowFile, REL_SUCCESS);
+
+                } catch (final Throwable t) {
+                    final Optional<GSSException> causeOptional = findCause(t, 
GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
+                    if (causeOptional.isPresent()) {
+                        throw new UncheckedIOException(new 
IOException(causeOptional.get()));
                     }
-                    return null;
+                    getLogger().error("Failed to rename on HDFS due to {}", 
new Object[]{t});
+                    session.transfer(session.penalize(flowFile), REL_FAILURE);
+                    context.yield();
                 }
+                return null;
             });
         }
     }
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 91e91ff7b1..9a60b2fcc5 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -18,7 +18,6 @@ package org.apache.nifi.processors.hadoop;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
-import com.google.common.base.Throwables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileStatus;
@@ -54,16 +53,14 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
-import org.ietf.jgss.GSSException;
 
 import java.io.BufferedInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.UncheckedIOException;
 import java.security.PrivilegedAction;
@@ -73,11 +70,8 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
-import java.util.stream.Stream;
 
 /**
  * This processor copies FlowFiles to HDFS.
@@ -352,18 +346,18 @@ public class PutHDFS extends AbstractHadoopProcessor {
                             case REPLACE_RESOLUTION:
                                 if (hdfs.delete(copyFile, false)) {
                                     getLogger().info("deleted {} in order to 
replace with the contents of {}",
-                                            new Object[]{copyFile, 
putFlowFile});
+                                            copyFile, putFlowFile);
                                 }
                                 break;
                             case IGNORE_RESOLUTION:
                                 session.transfer(putFlowFile, 
getSuccessRelationship());
                                 getLogger().info("transferring {} to success 
because file with same name already exists",
-                                        new Object[]{putFlowFile});
+                                        putFlowFile);
                                 return null;
                             case FAIL_RESOLUTION:
                                 
session.transfer(session.penalize(putFlowFile), getFailureRelationship());
                                 getLogger().warn("penalizing {} and routing to 
failure because file with same name already exists",
-                                        new Object[]{putFlowFile});
+                                        putFlowFile);
                                 return null;
                             default:
                                 break;
@@ -372,63 +366,58 @@ public class PutHDFS extends AbstractHadoopProcessor {
 
                     // Write FlowFile to temp file on HDFS
                     final StopWatch stopWatch = new StopWatch(true);
-                    session.read(putFlowFile, new InputStreamCallback() {
-
-                        @Override
-                        public void process(InputStream in) throws IOException 
{
-                            OutputStream fos = null;
-                            Path createdFile = null;
-                            try {
-                                if (conflictResponse.equals(APPEND_RESOLUTION) 
&& destinationExists) {
-                                    fos = hdfs.append(copyFile, bufferSize);
-                                } else {
-                                    final EnumSet<CreateFlag> cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
-
-                                    if (shouldIgnoreLocality(context, 
session)) {
-                                        
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
-                                    }
+                    session.read(putFlowFile, in -> {
+                        OutputStream fos = null;
+                        Path createdFile = null;
+                        try {
+                            if (conflictResponse.equals(APPEND_RESOLUTION) && 
destinationExists) {
+                                fos = hdfs.append(copyFile, bufferSize);
+                            } else {
+                                final EnumSet<CreateFlag> cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
 
-                                    fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
-                                            
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
-                                            null, null);
+                                if (shouldIgnoreLocality(context, session)) {
+                                    
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
                                 }
 
-                                if (codec != null) {
-                                    fos = codec.createOutputStream(fos);
+                                fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+                                                
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
+                                        null, null);
+                            }
+
+                            if (codec != null) {
+                                fos = codec.createOutputStream(fos);
+                            }
+                            createdFile = actualCopyFile;
+                            BufferedInputStream bis = new 
BufferedInputStream(in);
+                            StreamUtils.copy(bis, fos);
+                            bis = null;
+                            fos.flush();
+                        } finally {
+                            try {
+                                if (fos != null) {
+                                    fos.close();
                                 }
-                                createdFile = actualCopyFile;
-                                BufferedInputStream bis = new 
BufferedInputStream(in);
-                                StreamUtils.copy(bis, fos);
-                                bis = null;
-                                fos.flush();
-                            } finally {
-                                try {
-                                    if (fos != null) {
-                                        fos.close();
-                                    }
-                                } catch (Throwable t) {
-                                    // when talking to remote HDFS clusters, 
we don't notice problems until fos.close()
-                                    if (createdFile != null) {
-                                        try {
-                                            hdfs.delete(createdFile, false);
-                                        } catch (Throwable ignore) {
-                                        }
+                            } catch (Throwable t) {
+                                // when talking to remote HDFS clusters, we 
don't notice problems until fos.close()
+                                if (createdFile != null) {
+                                    try {
+                                        hdfs.delete(createdFile, false);
+                                    } catch (Throwable ignore) {
                                     }
-                                    throw t;
                                 }
-                                fos = null;
+                                throw t;
                             }
+                            fos = null;
                         }
-
                     });
                     stopWatch.stop();
                     final String dataRate = 
stopWatch.calculateDataRate(putFlowFile.getSize());
                     final long millis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
                     tempDotCopyFile = tempCopyFile;
 
-                    if  (
-                        writingStrategy.equals(WRITE_AND_RENAME)
-                        && (!conflictResponse.equals(APPEND_RESOLUTION) || 
(conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists))
+                    if (
+                            writingStrategy.equals(WRITE_AND_RENAME)
+                                    && 
(!conflictResponse.equals(APPEND_RESOLUTION) || 
(conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists))
                     ) {
                         boolean renamed = false;
 
@@ -449,7 +438,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
                     }
 
                     getLogger().info("copied {} to HDFS at {} in {} 
milliseconds at a rate of {}",
-                            new Object[]{putFlowFile, copyFile, millis, 
dataRate});
+                            putFlowFile, copyFile, millis, dataRate);
 
                     final String newFilename = copyFile.getName();
                     final String hdfsPath = copyFile.getParent().toString();
@@ -462,18 +451,10 @@ public class PutHDFS extends AbstractHadoopProcessor {
 
                     session.transfer(putFlowFile, getSuccessRelationship());
 
-                } catch (final IOException e) {
-                    Optional<GSSException> causeOptional = findCause(e, 
GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
-                    if (causeOptional.isPresent()) {
-                        getLogger().warn("An error occurred while connecting 
to HDFS. "
-                                        + "Rolling back session, and 
penalizing flow file {}",
-                                new Object[] 
{putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()});
-                        session.rollback(true);
-                    } else {
-                        getLogger().error("Failed to access HDFS due to {}", 
new Object[]{e});
-                        session.transfer(putFlowFile, 
getFailureRelationship());
-                    }
                 } catch (final Throwable t) {
+                    if (handleAuthErrors(t, session, context, new 
GSSExceptionRollbackYieldSessionHandler())) {
+                        return null;
+                    }
                     if (tempDotCopyFile != null) {
                         try {
                             hdfs.delete(tempDotCopyFile, false);
@@ -548,21 +529,6 @@ public class PutHDFS extends AbstractHadoopProcessor {
         return group == null || group.isEmpty() ? null : group;
     }
 
-    /**
-     * Returns an optional with the first throwable in the causal chain that 
is assignable to the provided cause type,
-     * and satisfies the provided cause predicate, {@link Optional#empty()} 
otherwise.
-     * @param t The throwable to inspect for the cause.
-     * @return Throwable Cause
-     */
-    private <T extends Throwable> Optional<T> findCause(Throwable t, Class<T> 
expectedCauseType, Predicate<T> causePredicate) {
-        Stream<Throwable> causalChain = Throwables.getCausalChain(t).stream();
-        return causalChain
-                .filter(expectedCauseType::isInstance)
-                .map(expectedCauseType::cast)
-                .filter(causePredicate)
-                .findFirst();
-    }
-
     protected void changeOwner(final ProcessContext context, final FileSystem 
hdfs, final Path name, final FlowFile flowFile) {
         try {
             // Change owner and group of file if configured to do so
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/GSSExceptionRollbackYieldSessionHandler.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/GSSExceptionRollbackYieldSessionHandler.java
new file mode 100644
index 0000000000..8183c1f4cd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/GSSExceptionRollbackYieldSessionHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.util.function.BiConsumer;
+
+public class GSSExceptionRollbackYieldSessionHandler implements 
BiConsumer<ProcessSession, ProcessContext> {
+    @Override
+    public void accept(ProcessSession processSession, ProcessContext 
processContext) {
+        processSession.rollback();
+        processContext.yield();
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
index af1df3d56b..7b59817d21 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
@@ -27,6 +27,10 @@ import 
org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
 import org.apache.nifi.util.MockComponentLog;
 import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.ietf.jgss.GSSException;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
@@ -37,13 +41,14 @@ import java.security.PrivilegedExceptionAction;
 import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 public class GetHDFSSequenceFileTest {
-    private HdfsResources hdfsResources;
+    private HdfsResources hdfsResourcesLocal;
     private GetHDFSSequenceFile getHDFSSequenceFile;
     private Configuration configuration;
     private FileSystem fileSystem;
@@ -55,9 +60,8 @@ public class GetHDFSSequenceFileTest {
         configuration = mock(Configuration.class);
         fileSystem = mock(FileSystem.class);
         userGroupInformation = mock(UserGroupInformation.class);
-        hdfsResources = new HdfsResources(configuration, fileSystem, 
userGroupInformation, null);
-        getHDFSSequenceFile = new TestableGetHDFSSequenceFile();
-        getHDFSSequenceFile.kerberosProperties = 
mock(KerberosProperties.class);
+        hdfsResourcesLocal = new HdfsResources(configuration, fileSystem, 
userGroupInformation, null);
+        getHDFSSequenceFile = new TestableGetHDFSSequenceFile(new 
KerberosProperties(null), userGroupInformation);
         reloginTried = false;
         init();
     }
@@ -75,6 +79,7 @@ public class GetHDFSSequenceFileTest {
     public void getFlowFilesWithUgiAndNewTicketShouldCallDoAsAndNotRelogin() 
throws Exception {
         SequenceFileReader reader = mock(SequenceFileReader.class);
         Path file = mock(Path.class);
+        getHDFSSequenceFile.kerberosProperties = 
mock(KerberosProperties.class);
         getHDFSSequenceFile.getFlowFiles(configuration, fileSystem, reader, 
file);
         ArgumentCaptor<PrivilegedExceptionAction> 
privilegedExceptionActionArgumentCaptor = 
ArgumentCaptor.forClass(PrivilegedExceptionAction.class);
         verifyNoMoreInteractions(reader);
@@ -86,7 +91,8 @@ public class GetHDFSSequenceFileTest {
 
     @Test
     public void testGetFlowFilesNoUgiShouldntCallDoAs() throws Exception {
-        hdfsResources = new HdfsResources(configuration, fileSystem, null, 
null);
+        getHDFSSequenceFile = new TestableGetHDFSSequenceFile(new 
KerberosProperties(null), null);
+        hdfsResourcesLocal = new HdfsResources(configuration, fileSystem, 
null, null);
         init();
         SequenceFileReader reader = mock(SequenceFileReader.class);
         Path file = mock(Path.class);
@@ -94,10 +100,45 @@ public class GetHDFSSequenceFileTest {
         verify(reader).readSequenceFile(file, configuration, fileSystem);
     }
 
+    @Test
+    public void testGSSExceptionOnDoAs() throws Exception {
+        NiFiProperties mockNiFiProperties = mock(NiFiProperties.class);
+        
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
+        GetHDFSSequenceFile testSubject = new 
TestableGetHDFSSequenceFile(getHDFSSequenceFile.kerberosProperties, 
userGroupInformation, true);
+        TestRunner runner = TestRunners.newTestRunner(testSubject);
+        runner.setProperty(GetHDFSSequenceFile.DIRECTORY, 
"path/does/not/exist");
+        runner.run();
+        // assert no flowfiles transferred to outgoing relationships
+        runner.assertTransferCount(MoveHDFS.REL_SUCCESS, 0);
+        runner.assertTransferCount(MoveHDFS.REL_FAILURE, 0);
+    }
+
     public class TestableGetHDFSSequenceFile extends GetHDFSSequenceFile {
+
+        UserGroupInformation userGroupInformation;
+        private KerberosProperties kerberosProperties;
+
+
+        public TestableGetHDFSSequenceFile(KerberosProperties 
kerberosProperties, UserGroupInformation ugi) throws IOException {
+            this(kerberosProperties, ugi, false);
+        }
+
+        public TestableGetHDFSSequenceFile(KerberosProperties 
kerberosProperties, UserGroupInformation ugi, boolean failOnDoAs) throws 
IOException {
+            this.kerberosProperties = kerberosProperties;
+            this.userGroupInformation = ugi;
+            if(failOnDoAs && userGroupInformation != null) {
+                try {
+                    
when(userGroupInformation.doAs(any(PrivilegedExceptionAction.class))).thenThrow(new
 IOException(new GSSException(13)));
+                } catch (InterruptedException e) {
+                    throw new IOException(e);
+                }
+            }
+        }
+
+
         @Override
         HdfsResources resetHDFSResources(final List<String> resourceLocations, 
ProcessContext context) throws IOException {
-            return hdfsResources;
+            return hdfsResourcesLocal;
         }
 
         @Override
@@ -109,5 +150,9 @@ public class GetHDFSSequenceFileTest {
         protected KerberosProperties getKerberosProperties(File 
kerberosConfigFile) {
             return kerberosProperties;
         }
+
+        protected UserGroupInformation getUserGroupInformation() {
+            return userGroupInformation;
+        }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
index 903e38d750..436d2a053a 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
@@ -31,6 +31,7 @@ import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.ietf.jgss.GSSException;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
@@ -58,12 +59,11 @@ import static org.mockito.Mockito.when;
 @DisabledOnOs(OS.WINDOWS)
 public class GetHDFSTest {
 
-    private NiFiProperties mockNiFiProperties;
     private KerberosProperties kerberosProperties;
 
     @BeforeEach
     public void setup() {
-        mockNiFiProperties = mock(NiFiProperties.class);
+        NiFiProperties mockNiFiProperties = mock(NiFiProperties.class);
         
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
         kerberosProperties = new KerberosProperties(null);
     }
@@ -179,7 +179,7 @@ public class GetHDFSTest {
         assertEquals(1, flowFiles.size());
 
         MockFlowFile flowFile = flowFiles.get(0);
-        
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1"));
+        assertEquals("randombytes-1", 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
         InputStream expected = 
getClass().getResourceAsStream("/testdata/randombytes-1");
         flowFile.assertContentEquals(expected);
     }
@@ -198,7 +198,7 @@ public class GetHDFSTest {
         assertEquals(1, flowFiles.size());
 
         MockFlowFile flowFile = flowFiles.get(0);
-        
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1.gz"));
+        assertEquals("randombytes-1.gz", 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
         InputStream expected = 
getClass().getResourceAsStream("/testdata/randombytes-1.gz");
         flowFile.assertContentEquals(expected);
     }
@@ -217,7 +217,7 @@ public class GetHDFSTest {
         assertEquals(1, flowFiles.size());
 
         MockFlowFile flowFile = flowFiles.get(0);
-        
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
+        assertEquals("13545423550275052.zip", 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
         InputStream expected = 
getClass().getResourceAsStream("/testdata/13545423550275052.zip");
         flowFile.assertContentEquals(expected);
     }
@@ -236,7 +236,7 @@ public class GetHDFSTest {
         assertEquals(1, flowFiles.size());
 
         MockFlowFile flowFile = flowFiles.get(0);
-        
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
+        assertEquals("13545423550275052.zip", 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
         InputStream expected = 
getClass().getResourceAsStream("/testdata/13545423550275052.zip");
         flowFile.assertContentEquals(expected);
         final List<ProvenanceEventRecord> provenanceEvents = 
runner.getProvenanceEvents();
@@ -304,12 +304,12 @@ public class GetHDFSTest {
                 final Object result;
                 if (callCounter == 0) {
                     
when(mockFileSystem.exists(any(Path.class))).thenReturn(directoryExists);
-                    result = ((PrivilegedExceptionAction) 
invocationOnMock.getArgument(0)).run();
+                    result = ((PrivilegedExceptionAction<?>) 
invocationOnMock.getArgument(0)).run();
                     verify(mockUserGroupInformation, times(callCounter + 
1)).doAs(any(PrivilegedExceptionAction.class));
                     verify(mockFileSystem).exists(any(Path.class));
                 } else {
                     
when(mockFileSystem.listStatus(any(Path.class))).thenReturn(new FileStatus[0]);
-                    result = ((PrivilegedExceptionAction) 
invocationOnMock.getArgument(0)).run();
+                    result = ((PrivilegedExceptionAction<?>) 
invocationOnMock.getArgument(0)).run();
                     verify(mockUserGroupInformation, times(callCounter + 
1)).doAs(any(PrivilegedExceptionAction.class));
                     verify(mockFileSystem).listStatus(any(Path.class));
                 }
@@ -322,7 +322,24 @@ public class GetHDFSTest {
 
         // THEN
         verify(mockFileSystem).getUri();
-        verifyNoMoreInteractions(mockFileSystem, mockUserGroupInformation);
+        verifyNoMoreInteractions(mockUserGroupInformation);
+    }
+
+    @Test
+    public void testGSSExceptionOnExists() throws Exception {
+        FileSystem mockFileSystem = mock(FileSystem.class);
+        UserGroupInformation mockUserGroupInformation = 
mock(UserGroupInformation.class);
+
+        GetHDFS testSubject = new TestableGetHDFSForUGI(kerberosProperties, 
mockFileSystem, mockUserGroupInformation);
+        TestRunner runner = TestRunners.newTestRunner(testSubject);
+        runner.setProperty(GetHDFS.DIRECTORY, "src/test/resources/testdata");
+        
when(mockUserGroupInformation.doAs(any(PrivilegedExceptionAction.class))).thenThrow(new
 IOException(new GSSException(13)));
+        runner.run();
+
+        // Assert session rollback
+        runner.assertTransferCount(GetHDFS.REL_SUCCESS, 0);
+        // assert that no files were penalized
+        runner.assertPenalizeCount(0);
     }
 
     private static class TestableGetHDFS extends GetHDFS {
@@ -340,8 +357,8 @@ public class GetHDFSTest {
     }
 
     private static class TestableGetHDFSForUGI extends TestableGetHDFS {
-        private FileSystem mockFileSystem;
-        private UserGroupInformation mockUserGroupInformation;
+        private final FileSystem mockFileSystem;
+        private final UserGroupInformation mockUserGroupInformation;
 
         public TestableGetHDFSForUGI(KerberosProperties 
testKerberosProperties, FileSystem mockFileSystem, UserGroupInformation 
mockUserGroupInformation) {
             super(testKerberosProperties);
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
index 8e14507571..6a95d9d66d 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
@@ -18,27 +18,38 @@ package org.apache.nifi.processors.hadoop;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.SystemUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.hadoop.util.MockFileSystem;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.ietf.jgss.GSSException;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import javax.security.sasl.SaslException;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -53,7 +64,6 @@ public class MoveHDFSTest {
     private static final String OUTPUT_DIRECTORY = "target/test-data-output";
     private static final String TEST_DATA_DIRECTORY = 
"src/test/resources/testdata";
     private static final String INPUT_DIRECTORY = "target/test-data-input";
-    private NiFiProperties mockNiFiProperties;
     private KerberosProperties kerberosProperties;
 
     @BeforeAll
@@ -63,7 +73,7 @@ public class MoveHDFSTest {
 
     @BeforeEach
     public void setup() {
-        mockNiFiProperties = mock(NiFiProperties.class);
+        NiFiProperties mockNiFiProperties = mock(NiFiProperties.class);
         
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
         kerberosProperties = new KerberosProperties(null);
     }
@@ -244,6 +254,37 @@ public class MoveHDFSTest {
         assertEquals(0, flowFiles.size());
     }
 
+    @Test
+    public void testPutFileWithGSSException() throws IOException {
+        MockFileSystem noCredentialsFileSystem = new MockFileSystem() {
+            @Override
+            public FileStatus getFileStatus(Path path) throws IOException {
+                throw new IOException("ioe", new SaslException("sasle", new 
GSSException(13)));
+            }
+        };
+        noCredentialsFileSystem.setFailOnExists(true);
+        TestRunner runner = TestRunners.newTestRunner(new 
TestableMoveHDFS(kerberosProperties, noCredentialsFileSystem));
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, 
"input/does/not/exist");
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, "target/test-classes");
+        runner.setProperty(MoveHDFS.CONFLICT_RESOLUTION, "replace");
+
+        try (FileInputStream fis = new 
FileInputStream("src/test/resources/testdata/randombytes-1")) {
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
+            runner.enqueue(fis, attributes);
+            runner.run();
+        }
+
+        // assert no flowfiles transferred to outgoing relationships
+        runner.assertTransferCount(MoveHDFS.REL_SUCCESS, 0);
+        runner.assertTransferCount(MoveHDFS.REL_FAILURE, 0);
+        // assert the processor's queue is not empty (session rollback)
+        assertFalse(runner.isQueueEmpty());
+        // assert that no files were penalized
+        runner.assertPenalizeCount(0);
+        noCredentialsFileSystem.setFailOnExists(false);
+    }
+
     @Test
     public void testPutWhenAlreadyExistingShouldFailWhenFAIL_RESOLUTION() 
throws IOException {
         testPutWhenAlreadyExisting(MoveHDFS.FAIL_RESOLUTION, 
MoveHDFS.REL_FAILURE, "randombytes-1");
@@ -292,15 +333,30 @@ public class MoveHDFSTest {
 
     private static class TestableMoveHDFS extends MoveHDFS {
 
-        private KerberosProperties testKerberosProperties;
+        private final KerberosProperties testKerberosProperties;
+        private final FileSystem fileSystem;
 
         public TestableMoveHDFS(KerberosProperties testKerberosProperties) {
+            this(testKerberosProperties, null);
+        }
+
+        public TestableMoveHDFS(KerberosProperties testKerberosProperties, 
FileSystem fileSystem) {
             this.testKerberosProperties = testKerberosProperties;
+            this.fileSystem = fileSystem;
         }
 
         @Override
         protected KerberosProperties getKerberosProperties(File 
kerberosConfigFile) {
             return testKerberosProperties;
         }
+
+        @Override
+        protected FileSystem getFileSystem(Configuration config) throws 
IOException {
+            return fileSystem == null ? super.getFileSystem(config) : 
fileSystem;
+        }
+        @Override
+        protected FileSystem getFileSystem() {
+            return fileSystem == null ? super.getFileSystem() : fileSystem;
+        }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index ab33df270e..637f312b56 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -17,24 +17,20 @@
 package org.apache.nifi.processors.hadoop;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.util.Progressable;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.hadoop.util.MockFileSystem;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.MockFlowFile;
@@ -47,13 +43,9 @@ import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import javax.security.sasl.SaslException;
-import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -63,7 +55,6 @@ import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.times;
@@ -75,7 +66,7 @@ public class PutHDFSTest {
     private final static String FILE_NAME = "randombytes-1";
 
     private KerberosProperties kerberosProperties;
-    private FileSystem mockFileSystem;
+    private MockFileSystem mockFileSystem;
 
     @BeforeEach
     public void setup() {
@@ -359,19 +350,6 @@ public class PutHDFSTest {
         // assert no flowfiles transferred to outgoing relationships
         runner.assertTransferCount(PutHDFS.REL_SUCCESS, 0);
         runner.assertTransferCount(PutHDFS.REL_FAILURE, 0);
-        // assert the input flowfile was penalized
-        List<MockFlowFile> penalizedFlowFiles = runner.getPenalizedFlowFiles();
-        assertEquals(1, penalizedFlowFiles.size());
-        assertEquals("randombytes-1", 
penalizedFlowFiles.iterator().next().getAttribute(CoreAttributes.FILENAME.key()));
-        // assert the processor's queue is not empty
-        assertFalse(runner.isQueueEmpty());
-        assertEquals(1, runner.getQueueSize().getObjectCount());
-        // assert the input file is back on the queue
-        ProcessSession session = 
runner.getProcessSessionFactory().createSession();
-        FlowFile queuedFlowFile = session.get();
-        assertNotNull(queuedFlowFile);
-        assertEquals("randombytes-1", 
queuedFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
-        session.rollback();
     }
 
     @Test
@@ -610,7 +588,8 @@ public class PutHDFSTest {
 
     @Test
     public void testPutFileWithCloseException() throws IOException {
-        mockFileSystem = new MockFileSystem(true);
+        mockFileSystem = new MockFileSystem();
+        mockFileSystem.setFailOnClose(true);
         String dirName = "target/testPutFileCloseException";
         File file = new File(dirName);
         file.mkdirs();
@@ -635,10 +614,38 @@ public class PutHDFSTest {
         mockFileSystem.delete(p, true);
     }
 
+    @Test
+    public void testPutFileWithCreateException() throws IOException {
+        mockFileSystem = new MockFileSystem();
+        mockFileSystem.setFailOnCreate(true);
+        String dirName = "target/testPutFileCreateException";
+        File file = new File(dirName);
+        file.mkdirs();
+        Path p = new Path(dirName).makeQualified(mockFileSystem.getUri(), 
mockFileSystem.getWorkingDirectory());
+
+        TestRunner runner = TestRunners.newTestRunner(new 
TestablePutHDFS(kerberosProperties, mockFileSystem));
+        runner.setProperty(PutHDFS.DIRECTORY, dirName);
+        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
+
+        try (FileInputStream fis = new 
FileInputStream("src/test/resources/testdata/randombytes-1")) {
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
+            runner.enqueue(fis, attributes);
+            runner.run();
+        }
+
+        List<MockFlowFile> failedFlowFiles = runner
+                .getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
+        assertFalse(failedFlowFiles.isEmpty());
+        assertTrue(failedFlowFiles.get(0).isPenalized());
+
+        mockFileSystem.delete(p, true);
+    }
+
     private class TestablePutHDFS extends PutHDFS {
 
-        private KerberosProperties testKerberosProperties;
-        private FileSystem fileSystem;
+        private final KerberosProperties testKerberosProperties;
+        private final FileSystem fileSystem;
 
         TestablePutHDFS(KerberosProperties testKerberosProperties, FileSystem 
fileSystem) {
             this.testKerberosProperties = testKerberosProperties;
@@ -661,134 +668,4 @@ public class PutHDFSTest {
             return fileSystem;
         }
     }
-
-    private static class MockFileSystem extends FileSystem {
-        private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
-        private final Map<Path, List<AclEntry>> pathToAcl = new HashMap<>();
-        private final boolean failOnClose;
-
-        public MockFileSystem() {
-            failOnClose = false;
-        }
-
-        public MockFileSystem(boolean failOnClose) {
-            this.failOnClose = failOnClose;
-        }
-
-        public void setAcl(final Path path, final List<AclEntry> aclSpec) {
-            pathToAcl.put(path, aclSpec);
-        }
-
-        @Override
-        public AclStatus getAclStatus(final Path path) {
-            return new 
AclStatus.Builder().addEntries(pathToAcl.getOrDefault(path, new 
ArrayList<>())).build();
-        }
-
-        @Override
-        public URI getUri() {
-            return URI.create("file:///");
-        }
-
-        @Override
-        public FSDataInputStream open(final Path f, final int bufferSize) {
-            return null;
-        }
-
-        @Override
-        public FSDataOutputStream create(final Path f, final FsPermission 
permission, final boolean overwrite, final int bufferSize, final short 
replication,
-                                         final long blockSize, final 
Progressable progress) {
-            pathToStatus.put(f, newFile(f, permission));
-            if(failOnClose) {
-                return new FSDataOutputStream(new ByteArrayOutputStream(), new 
Statistics("")) {
-                    @Override
-                    public void close() throws IOException {
-                        super.close();
-                        throw new IOException("Fail on close");
-                    }
-                };
-            } else {
-                return new FSDataOutputStream(new ByteArrayOutputStream(), new 
Statistics(""));
-            }
-        }
-
-        @Override
-        public FSDataOutputStream append(final Path f, final int bufferSize, 
final Progressable progress) {
-            return null;
-        }
-
-        @Override
-        public boolean rename(final Path src, final Path dst) {
-            if (pathToStatus.containsKey(src)) {
-                pathToStatus.put(dst, pathToStatus.remove(src));
-            } else {
-                return false;
-            }
-            return true;
-        }
-
-        @Override
-        public boolean delete(final Path f, final boolean recursive) {
-            if (pathToStatus.containsKey(f)) {
-                pathToStatus.remove(f);
-            } else {
-                return false;
-            }
-            return true;
-        }
-
-        @Override
-        public FileStatus[] listStatus(final Path f) {
-            return null;
-        }
-
-        @Override
-        public void setWorkingDirectory(final Path new_dir) {
-
-        }
-
-        @Override
-        public Path getWorkingDirectory() {
-            return new Path(new File(".").getAbsolutePath());
-        }
-
-        @Override
-        public boolean mkdirs(final Path f, final FsPermission permission) {
-            return false;
-        }
-
-        @Override
-        public boolean mkdirs(Path f) {
-            pathToStatus.put(f, newDir(f));
-            return true;
-        }
-
-        @Override
-        public FileStatus getFileStatus(final Path f) throws IOException {
-            final FileStatus fileStatus = pathToStatus.get(f);
-            if (fileStatus == null) throw new FileNotFoundException();
-            return fileStatus;
-        }
-
-        @Override
-        public boolean exists(Path f) {
-            return pathToStatus.containsKey(f);
-        }
-
-        private FileStatus newFile(Path p, FsPermission permission) {
-            return new FileStatus(100L, false, 3, 128 * 1024 * 1024, 
1523456000000L, 1523457000000L, permission, "owner", "group", p);
-        }
-
-        private FileStatus newDir(Path p) {
-            return new FileStatus(1L, true, 3, 128 * 1024 * 1024, 
1523456000000L, 1523457000000L, perms((short) 0755), "owner", "group", 
(Path)null, p, true, false, false);
-        }
-
-        @Override
-        public long getDefaultBlockSize(Path f) {
-            return 33554432L;
-        }
-    }
-
-    static FsPermission perms(short p) {
-        return new FsPermission(p);
-    }
 }
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
index 6b64cca767..11b7c037de 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
@@ -27,6 +27,7 @@ import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.ietf.jgss.GSSException;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -45,13 +46,12 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestDeleteHDFS {
-    private NiFiProperties mockNiFiProperties;
     private FileSystem mockFileSystem;
     private KerberosProperties kerberosProperties;
 
     @BeforeEach
     public void setup() throws Exception {
-        mockNiFiProperties = mock(NiFiProperties.class);
+        NiFiProperties mockNiFiProperties = mock(NiFiProperties.class);
         
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
         kerberosProperties = new KerberosProperties(null);
         mockFileSystem = mock(FileSystem.class);
@@ -114,6 +114,22 @@ public class TestDeleteHDFS {
         runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1);
     }
 
+    @Test
+    public void testGSSException() throws Exception {
+        Path filePath = new Path("/some/path/to/file.txt");
+        when(mockFileSystem.exists(any(Path.class))).thenThrow(new 
IOException(new GSSException(13)));
+        DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, 
mockFileSystem);
+        TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
+        runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, "${hdfs.file}");
+        Map<String, String> attributes = Maps.newHashMap();
+        attributes.put("hdfs.file", filePath.toString());
+        runner.enqueue("foo", attributes);
+        runner.run();
+        // GSS Auth exceptions should cause rollback
+        runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0);
+        runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0);
+    }
+
     @Test
     public void testPermissionIOException() throws Exception {
         Path filePath = new Path("/some/path/to/file.txt");
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
index 43f0c3ce6d..036904312c 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
@@ -16,8 +16,10 @@
  */
 package org.apache.nifi.processors.hadoop;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.processors.hadoop.util.MockFileSystem;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.MockFlowFile;
@@ -44,17 +46,15 @@ import static org.mockito.Mockito.when;
 public class TestFetchHDFS {
 
     private TestRunner runner;
-    private TestableFetchHDFS proc;
-    private NiFiProperties mockNiFiProperties;
     private KerberosProperties kerberosProperties;
 
     @BeforeEach
     public void setup() {
-        mockNiFiProperties = mock(NiFiProperties.class);
+        NiFiProperties mockNiFiProperties = mock(NiFiProperties.class);
         
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
         kerberosProperties = new KerberosProperties(null);
 
-        proc = new TestableFetchHDFS(kerberosProperties);
+        TestableFetchHDFS proc = new TestableFetchHDFS(kerberosProperties);
         runner = TestRunners.newTestRunner(proc);
     }
 
@@ -63,7 +63,7 @@ public class TestFetchHDFS {
         final String file = "src/test/resources/testdata/randombytes-1";
         final String fileWithMultipliedSeparators = 
"src/test////resources//testdata/randombytes-1";
         runner.setProperty(FetchHDFS.FILENAME, fileWithMultipliedSeparators);
-        runner.enqueue(new String("trigger flow file"));
+        runner.enqueue("trigger flow file");
         runner.run();
         runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
         final List<ProvenanceEventRecord> provenanceEvents = 
runner.getProvenanceEvents();
@@ -83,7 +83,7 @@ public class TestFetchHDFS {
         final String file = destination.getAbsolutePath();
         final String fileWithMultipliedSeparators = "/" + file;
         runner.setProperty(FetchHDFS.FILENAME, fileWithMultipliedSeparators);
-        runner.enqueue(new String("trigger flow file"));
+        runner.enqueue("trigger flow file");
         runner.run();
         runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
         final List<ProvenanceEventRecord> provenanceEvents = 
runner.getProvenanceEvents();
@@ -98,7 +98,7 @@ public class TestFetchHDFS {
     public void testFetchStaticFileThatDoesNotExist() {
         final String file = "src/test/resources/testdata/doesnotexist";
         runner.setProperty(FetchHDFS.FILENAME, file);
-        runner.enqueue(new String("trigger flow file"));
+        runner.enqueue("trigger flow file");
         runner.run();
         runner.assertAllFlowFilesTransferred(FetchHDFS.REL_FAILURE, 1);
     }
@@ -111,7 +111,7 @@ public class TestFetchHDFS {
         final Map<String,String> attributes = new HashMap<>();
         attributes.put("my.file", file);
 
-        runner.enqueue(new String("trigger flow file"), attributes);
+        runner.enqueue("trigger flow file", attributes);
         runner.run();
         runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
     }
@@ -120,7 +120,7 @@ public class TestFetchHDFS {
     public void testFilenameWithValidEL() {
         final String file = 
"src/test/resources/testdata/${literal('randombytes-1')}";
         runner.setProperty(FetchHDFS.FILENAME, file);
-        runner.enqueue(new String("trigger flow file"));
+        runner.enqueue("trigger flow file");
         runner.run();
         runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
     }
@@ -136,7 +136,7 @@ public class TestFetchHDFS {
     public void testFilenameWithUnrecognizedEL() {
         final String file = "data_${literal('testing'):substring(0,4)%7D";
         runner.setProperty(FetchHDFS.FILENAME, file);
-        runner.enqueue(new String("trigger flow file"));
+        runner.enqueue("trigger flow file");
         runner.run();
         runner.assertAllFlowFilesTransferred(FetchHDFS.REL_FAILURE, 1);
     }
@@ -147,14 +147,14 @@ public class TestFetchHDFS {
         TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(FetchHDFS.FILENAME, 
"src/test/resources/testdata/randombytes-1.gz");
         runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "AUTOMATIC");
-        runner.enqueue(new String("trigger flow file"));
+        runner.enqueue("trigger flow file");
         runner.run();
 
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS);
         assertEquals(1, flowFiles.size());
 
         MockFlowFile flowFile = flowFiles.get(0);
-        
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1"));
+        assertEquals("randombytes-1", 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
         InputStream expected = 
getClass().getResourceAsStream("/testdata/randombytes-1");
         flowFile.assertContentEquals(expected);
     }
@@ -165,14 +165,14 @@ public class TestFetchHDFS {
         TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(FetchHDFS.FILENAME, 
"src/test/resources/testdata/randombytes-1.gz");
         runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "NONE");
-        runner.enqueue(new String("trigger flow file"));
+        runner.enqueue("trigger flow file");
         runner.run();
 
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS);
         assertEquals(1, flowFiles.size());
 
         MockFlowFile flowFile = flowFiles.get(0);
-        
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1.gz"));
+        assertEquals("randombytes-1.gz", 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
         InputStream expected = 
getClass().getResourceAsStream("/testdata/randombytes-1.gz");
         flowFile.assertContentEquals(expected);
     }
@@ -183,28 +183,58 @@ public class TestFetchHDFS {
         TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(FetchHDFS.FILENAME, 
"src/test/resources/testdata/13545423550275052.zip");
         runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "AUTOMATIC");
-        runner.enqueue(new String("trigger flow file"));
+        runner.enqueue("trigger flow file");
         runner.run();
 
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS);
         assertEquals(1, flowFiles.size());
 
         MockFlowFile flowFile = flowFiles.get(0);
-        
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
+        assertEquals("13545423550275052.zip", 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
         InputStream expected = 
getClass().getResourceAsStream("/testdata/13545423550275052.zip");
         flowFile.assertContentEquals(expected);
     }
 
+    @Test
+    public void testGSSException() throws IOException {
+        MockFileSystem fileSystem = new MockFileSystem();
+        fileSystem.setFailOnOpen(true);
+        FetchHDFS proc = new TestableFetchHDFS(kerberosProperties, fileSystem);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchHDFS.FILENAME, 
"src/test/resources/testdata/randombytes-1.gz");
+        runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "NONE");
+        runner.enqueue("trigger flow file");
+        runner.run();
+
+        runner.assertTransferCount(FetchHDFS.REL_SUCCESS, 0);
+        runner.assertTransferCount(FetchHDFS.REL_FAILURE, 0);
+        runner.assertTransferCount(FetchHDFS.REL_COMMS_FAILURE, 0);
+        // assert that no files were penalized
+        runner.assertPenalizeCount(0);
+        fileSystem.setFailOnOpen(false);
+    }
+
     private static class TestableFetchHDFS extends FetchHDFS {
         private final KerberosProperties testKerberosProps;
+        private final FileSystem fileSystem;
 
         public TestableFetchHDFS(KerberosProperties testKerberosProps) {
             this.testKerberosProps = testKerberosProps;
+            this.fileSystem = null;
+        }
+        public TestableFetchHDFS(KerberosProperties testKerberosProps, final 
FileSystem fileSystem) {
+            this.testKerberosProps = testKerberosProps;
+            this.fileSystem = fileSystem;
         }
 
         @Override
         protected KerberosProperties getKerberosProperties(File 
kerberosConfigFile) {
             return testKerberosProps;
         }
+
+        @Override
+        protected FileSystem getFileSystem() {
+            return fileSystem == null ? super.getFileSystem() : fileSystem;
+        }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java
index 0235f73940..2b23e02939 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java
@@ -17,17 +17,23 @@
 package org.apache.nifi.processors.hadoop;
 
 import com.google.common.collect.Maps;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Progressable;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.hadoop.util.MockFileSystem;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.NiFiProperties;
@@ -36,21 +42,6 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -257,6 +248,28 @@ public class TestGetHDFSFileInfo {
         mff.assertAttributeEquals("hdfs.status", "Failed due to: 
java.io.InterruptedIOException");
     }
 
+    @Test
+    public void testWithGSSException() {
+        proc.fileSystem.setFailOnExists(true);
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(GetHDFSFileInfo.FULL_PATH, "/some/home/mydir");
+        runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "false");
+        runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true");
+        runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true");
+        runner.setProperty(GetHDFSFileInfo.DESTINATION, 
GetHDFSFileInfo.DESTINATION_CONTENT);
+
+        runner.run();
+
+        // Assert session rollback
+        runner.assertTransferCount(GetHDFSFileInfo.REL_ORIGINAL, 0);
+        runner.assertTransferCount(GetHDFSFileInfo.REL_SUCCESS, 0);
+        runner.assertTransferCount(GetHDFSFileInfo.REL_FAILURE, 0);
+        runner.assertTransferCount(GetHDFSFileInfo.REL_NOT_FOUND, 0);
+
+        proc.fileSystem.setFailOnExists(false);
+    }
+
     @Test
     public void testRunWithPermissionsExceptionAttributes() throws 
InterruptedException {
 
@@ -789,135 +802,4 @@ public class TestGetHDFSFileInfo {
             return fileSystem;
         }
     }
-
-    private class MockFileSystem extends FileSystem {
-        private final Map<Path, Set<FileStatus>> fileStatuses = new 
HashMap<>();
-        private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
-
-        public void addFileStatus(final FileStatus parent, final FileStatus 
child) {
-            Set<FileStatus> children = fileStatuses.get(parent.getPath());
-            if (children == null) {
-                children = new HashSet<>();
-                fileStatuses.put(parent.getPath(), children);
-            }
-            if (child != null) {
-                children.add(child);
-                if (child.isDirectory() && 
!fileStatuses.containsKey(child.getPath())) {
-                    fileStatuses.put(child.getPath(), new 
HashSet<FileStatus>());
-                }
-            }
-
-            pathToStatus.put(parent.getPath(), parent);
-            pathToStatus.put(child.getPath(), child);
-        }
-
-        @Override
-        @SuppressWarnings("deprecation")
-        public long getDefaultBlockSize() {
-            return 1024L;
-        }
-
-        @Override
-        @SuppressWarnings("deprecation")
-        public short getDefaultReplication() {
-            return 1;
-        }
-
-        @Override
-        public URI getUri() {
-            return null;
-        }
-
-        @Override
-        public FSDataInputStream open(final Path f, final int bufferSize) 
throws IOException {
-            return null;
-        }
-
-        @Override
-        public FSDataOutputStream create(final Path f, final FsPermission 
permission, final boolean overwrite, final int bufferSize, final short 
replication,
-                                         final long blockSize, final 
Progressable progress) throws IOException {
-            return null;
-        }
-
-        @Override
-        public FSDataOutputStream append(final Path f, final int bufferSize, 
final Progressable progress) throws IOException {
-            return null;
-        }
-
-        @Override
-        public boolean rename(final Path src, final Path dst) throws 
IOException {
-            return false;
-        }
-
-        @Override
-        public boolean delete(final Path f, final boolean recursive) throws 
IOException {
-            return false;
-        }
-
-        @Override
-        public FileStatus[] listStatus(final Path f) throws 
FileNotFoundException, IOException {
-            if (!fileStatuses.containsKey(f)) {
-                throw new FileNotFoundException();
-            }
-            if (f.getName().startsWith("list_exception_")) {
-                String clzName = 
f.getName().substring("list_exception_".length(), f.getName().length());
-                IOException exception = null;
-                try {
-                     exception = 
(IOException)Class.forName(clzName).newInstance();
-                } catch (Throwable t) {
-                    throw new RuntimeException(t);
-                }
-                throw exception;
-            }
-            final Set<FileStatus> statuses = fileStatuses.get(f);
-            if (statuses == null) {
-                return new FileStatus[0];
-            }
-
-            for (FileStatus s : statuses) {
-                getFileStatus(s.getPath()); //support exception handling only.
-            }
-
-            return statuses.toArray(new FileStatus[statuses.size()]);
-        }
-
-        @Override
-        public void setWorkingDirectory(final Path new_dir) {
-
-        }
-
-        @Override
-        public Path getWorkingDirectory() {
-            return new Path(new File(".").getAbsolutePath());
-        }
-
-        @Override
-        public boolean mkdirs(final Path f, final FsPermission permission) 
throws IOException {
-            return false;
-        }
-
-        @Override
-        public FileStatus getFileStatus(final Path f) throws IOException {
-            if (f!=null && f.getName().startsWith("exception_")) {
-                String clzName = f.getName().substring("exception_".length(), 
f.getName().length());
-                IOException exception = null;
-                try {
-                     exception = 
(IOException)Class.forName(clzName).newInstance();
-                } catch (Throwable t) {
-                    throw new RuntimeException(t);
-                }
-                throw exception;
-            }
-            final FileStatus fileStatus = pathToStatus.get(f);
-            if (fileStatus == null) throw new FileNotFoundException();
-            return fileStatus;
-        }
-
-        public FileStatus newFile(String p) {
-            return new FileStatus(100L, false, 3, 128*1024*1024, 
1523456000000L, 1523457000000L, perms((short)0644), "owner", "group", new 
Path(p));
-        }
-        public FileStatus newDir(String p) {
-            return new FileStatus(1L, true, 3, 128*1024*1024, 1523456000000L, 
1523457000000L, perms((short)0755), "owner", "group", new Path(p));
-        }
-    }
 }
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
new file mode 100644
index 0000000000..3a3477502e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.util.Progressable;
+import org.ietf.jgss.GSSException;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MockFileSystem extends FileSystem {
+    private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
+    private final Map<Path, List<AclEntry>> pathToAcl = new HashMap<>();
+    private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>();
+
+    private boolean failOnOpen;
+    private boolean failOnClose;
+    private boolean failOnCreate;
+    private boolean failOnFileStatus;
+    private boolean failOnExists;
+
+
+    public void setFailOnClose(final boolean failOnClose) {
+        this.failOnClose = failOnClose;
+    }
+
+    public void setFailOnCreate(final boolean failOnCreate) {
+        this.failOnCreate = failOnCreate;
+    }
+
+    public void setFailOnFileStatus(final boolean failOnFileStatus) {
+        this.failOnFileStatus = failOnFileStatus;
+    }
+
+    public void setFailOnExists(final boolean failOnExists) {
+        this.failOnExists = failOnExists;
+    }
+
+    public void setFailOnOpen(final boolean failOnOpen) {
+        this.failOnOpen = failOnOpen;
+    }
+
+    public void setAcl(final Path path, final List<AclEntry> aclSpec) {
+        pathToAcl.put(path, aclSpec);
+    }
+
+    @Override
+    public AclStatus getAclStatus(final Path path) {
+        return new AclStatus.Builder().addEntries(pathToAcl.getOrDefault(path, 
new ArrayList<>())).build();
+    }
+
+    @Override
+    public URI getUri() {
+        return URI.create("file:///");
+    }
+
+    @Override
+    public FSDataInputStream open(final Path f, final int bufferSize) throws 
IOException {
+        if (failOnOpen) {
+            throw new IOException(new GSSException(13));
+        }
+        return null;
+    }
+
+    @Override
+    public FSDataOutputStream create(final Path f, final FsPermission 
permission, final boolean overwrite, final int bufferSize, final short 
replication,
+                                     final long blockSize, final Progressable 
progress) throws IOException {
+        if (failOnCreate) {
+            // Simulate an AuthenticationException wrapped in an IOException
+            throw new IOException(new AuthenticationException("test auth 
error"));
+        }
+        pathToStatus.put(f, newFile(f, permission));
+        if(failOnClose) {
+            return new FSDataOutputStream(new ByteArrayOutputStream(), new 
FileSystem.Statistics("")) {
+                @Override
+                public void close() throws IOException {
+                    super.close();
+                    throw new IOException("Fail on close");
+                }
+            };
+        } else {
+            return new FSDataOutputStream(new ByteArrayOutputStream(), new 
Statistics(""));
+        }
+    }
+
+    @Override
+    public FSDataOutputStream append(final Path f, final int bufferSize, final 
Progressable progress) {
+        return null;
+    }
+
+    @Override
+    public boolean rename(final Path src, final Path dst) {
+        if (pathToStatus.containsKey(src)) {
+            pathToStatus.put(dst, pathToStatus.remove(src));
+        } else {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean delete(final Path f, final boolean recursive) {
+        if (pathToStatus.containsKey(f)) {
+            pathToStatus.remove(f);
+        } else {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void setWorkingDirectory(final Path new_dir) {
+
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+        return new Path(new File(".").getAbsolutePath());
+    }
+
+    @Override
+    public boolean mkdirs(final Path f, final FsPermission permission) {
+        return false;
+    }
+
+    @Override
+    public boolean mkdirs(Path f) {
+        pathToStatus.put(f, newDir(f));
+        return true;
+    }
+
+    @Override
+    public FileStatus getFileStatus(final Path path) throws IOException {
+        if (failOnFileStatus) {
+            throw new IOException(new GSSException(13));
+        }
+        if (path != null && path.getName().startsWith("exception_")) {
+            final String className = 
path.getName().substring("exception_".length());
+            final IOException exception;
+            try {
+                exception = (IOException) 
Class.forName(className).getDeclaredConstructor().newInstance();
+            } catch (Throwable t) {
+                throw new RuntimeException(t);
+            }
+            throw exception;
+        }
+
+        final FileStatus fileStatus = pathToStatus.get(path);
+        if (fileStatus == null) {
+            throw new FileNotFoundException();
+        }
+        return fileStatus;
+    }
+
+    @Override
+    public boolean exists(Path f) throws IOException {
+        if (failOnExists) {
+            throw new IOException(new GSSException(13));
+        }
+        return pathToStatus.containsKey(f);
+    }
+
+    public FileStatus newFile(Path p, FsPermission permission) {
+        return new FileStatus(100L, false, 3, 128 * 1024 * 1024, 
1523456000000L, 1523457000000L, permission, "owner", "group", p);
+    }
+
+    public FileStatus newDir(Path p) {
+        return new FileStatus(1L, true, 3, 128 * 1024 * 1024, 1523456000000L, 
1523457000000L, perms((short) 0755), "owner", "group", (Path)null, p, true, 
false, false);
+    }
+
+    public FileStatus newFile(String p) {
+        return new FileStatus(100L, false, 3, 128*1024*1024, 1523456000000L, 
1523457000000L, perms((short)0644), "owner", "group", new Path(p));
+    }
+    public FileStatus newDir(String p) {
+        return new FileStatus(1L, true, 3, 128*1024*1024, 1523456000000L, 
1523457000000L, perms((short)0755), "owner", "group", new Path(p));
+    }
+
+    @Override
+    public long getDefaultBlockSize(Path f) {
+        return 33554432L;
+    }
+
+    public void addFileStatus(final FileStatus parent, final FileStatus child) 
{
+        Set<FileStatus> children = 
fileStatuses.computeIfAbsent(parent.getPath(), k -> new HashSet<>());
+        if (child != null) {
+            children.add(child);
+            if (child.isDirectory() && 
!fileStatuses.containsKey(child.getPath())) {
+                fileStatuses.put(child.getPath(), new HashSet<>());
+            }
+        }
+
+        pathToStatus.put(parent.getPath(), parent);
+        pathToStatus.put(child.getPath(), child);
+    }
+
+    @Override
+    public FileStatus[] listStatus(final Path f) throws IOException {
+        if (!fileStatuses.containsKey(f)) {
+            throw new FileNotFoundException();
+        }
+
+        if (f.getName().startsWith("list_exception_")) {
+            final String className = 
f.getName().substring("list_exception_".length());
+            final IOException exception;
+            try {
+                exception = (IOException) 
Class.forName(className).getDeclaredConstructor().newInstance();
+            } catch (Throwable t) {
+                throw new RuntimeException(t);
+            }
+            throw exception;
+        }
+
+        final Set<FileStatus> statuses = fileStatuses.get(f);
+        if (statuses == null) {
+            return new FileStatus[0];
+        }
+
+        for (FileStatus s : statuses) {
+            getFileStatus(s.getPath()); //support exception handling only.
+        }
+
+        return statuses.toArray(new FileStatus[0]);
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public long getDefaultBlockSize() {
+        return 1024L;
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public short getDefaultReplication() {
+        return 1;
+    }
+
+
+    private static FsPermission perms(short p) {
+        return new FsPermission(p);
+    }
+}
\ No newline at end of file

Reply via email to