Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
pvillard31 commented on PR #8495: URL: https://github.com/apache/nifi/pull/8495#issuecomment-2044659154 @mattyb149 - I merged this into main but forgot to amend the commit and add the magic words to close the PR, so closing manually. Also it does not apply cleanly to 1.x can you open a PR for the support branch? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
pvillard31 closed pull request #8495: NIFI-12889: Retry Kerberos login on auth failure in HDFS processors URL: https://github.com/apache/nifi/pull/8495 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
Lehel44 commented on PR #8495: URL: https://github.com/apache/nifi/pull/8495#issuecomment-2043896737 @mattyb149 Thanks for making these improvements! LGTM+1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
Lehel44 commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1554481562 ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java: ## @@ -108,6 +109,8 @@ protected void processBatchOfFiles(final List files, final ProcessContext if (!keepSourceFiles && !hdfs.delete(file, false)) { logger.warn("Unable to delete path " + file.toString() + " from HDFS. Will likely be picked up over and over..."); } +} catch (final IOException e) { Review Comment: I think in the default exception handling the ```java session.rollback(); context.yield(); ``` was removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
Lehel44 commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1554480782 ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java: ## @@ -352,95 +355,95 @@ protected void processBatchOfFiles(final List files, final ProcessContext for (final Path file : files) { -ugi.doAs(new PrivilegedAction() { -@Override -public Object run() { -FlowFile flowFile = session.create(parentFlowFile); -try { -final String originalFilename = file.getName(); -final Path outputDirPath = getNormalizedPath(context, OUTPUT_DIRECTORY, parentFlowFile); -final Path newFile = new Path(outputDirPath, originalFilename); -final boolean destinationExists = hdfs.exists(newFile); -// If destination file already exists, resolve that -// based on processor configuration -if (destinationExists) { -switch (processorConfig.getConflictResolution()) { -case REPLACE_RESOLUTION: -// Remove destination file (newFile) to replace -if (hdfs.delete(newFile, false)) { -getLogger().info("deleted {} in order to replace with the contents of {}", -new Object[]{newFile, flowFile}); -} -break; -case IGNORE_RESOLUTION: -session.transfer(flowFile, REL_SUCCESS); -getLogger().info( -"transferring {} to success because file with same name already exists", -new Object[]{flowFile}); -return null; -case FAIL_RESOLUTION: - session.transfer(session.penalize(flowFile), REL_FAILURE); -getLogger().warn( -"penalizing {} and routing to failure because file with same name already exists", -new Object[]{flowFile}); -return null; -default: -break; -} +ugi.doAs((PrivilegedAction) () -> { Review Comment: I think the patch didn't apply correctly. The NullPointerException still occurs. In the processBatchOfFiles method's catch should look like this: ```java catch (final Throwable t) { final Optional causeOptional = findCause(t, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); if (causeOptional.isPresent()) { throw new UncheckedIOException(new IOException(causeOptional.get())); } getLogger().error("Failed to rename on HDFS due to {}", new Object[]{t}); session.transfer(session.penalize(flowFile), REL_FAILURE); context.yield(); } ``` In case we roll back the session here in the for loop, the NullPointerException appears, that's why we handle the exception outside of it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
Lehel44 commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1554480782 ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java: ## @@ -352,95 +355,95 @@ protected void processBatchOfFiles(final List files, final ProcessContext for (final Path file : files) { -ugi.doAs(new PrivilegedAction() { -@Override -public Object run() { -FlowFile flowFile = session.create(parentFlowFile); -try { -final String originalFilename = file.getName(); -final Path outputDirPath = getNormalizedPath(context, OUTPUT_DIRECTORY, parentFlowFile); -final Path newFile = new Path(outputDirPath, originalFilename); -final boolean destinationExists = hdfs.exists(newFile); -// If destination file already exists, resolve that -// based on processor configuration -if (destinationExists) { -switch (processorConfig.getConflictResolution()) { -case REPLACE_RESOLUTION: -// Remove destination file (newFile) to replace -if (hdfs.delete(newFile, false)) { -getLogger().info("deleted {} in order to replace with the contents of {}", -new Object[]{newFile, flowFile}); -} -break; -case IGNORE_RESOLUTION: -session.transfer(flowFile, REL_SUCCESS); -getLogger().info( -"transferring {} to success because file with same name already exists", -new Object[]{flowFile}); -return null; -case FAIL_RESOLUTION: - session.transfer(session.penalize(flowFile), REL_FAILURE); -getLogger().warn( -"penalizing {} and routing to failure because file with same name already exists", -new Object[]{flowFile}); -return null; -default: -break; -} +ugi.doAs((PrivilegedAction) () -> { Review Comment: I think the patch didn't apply correctly. The NullPointerException still occurs. In the processBatchOfFiles method's catch should look like this: ```java catch (final Throwable t) { final Optional causeOptional = findCause(t, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); if (causeOptional.isPresent()) { throw new UncheckedIOException(new IOException(causeOptional.get())); } getLogger().error("Failed to rename on HDFS due to {}", new Object[]{t}); session.transfer(session.penalize(flowFile), REL_FAILURE); context.yield(); } ``` ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java: ## @@ -352,95 +355,95 @@ protected void processBatchOfFiles(final List files, final ProcessContext for (final Path file : files) { -ugi.doAs(new PrivilegedAction() { -@Override -public Object run() { -FlowFile flowFile = session.create(parentFlowFile); -try { -final String originalFilename = file.getName(); -final Path outputDirPath = getNormalizedPath(context, OUTPUT_DIRECTORY, parentFlowFile); -final Path newFile = new Path(outputDirPath, originalFilename); -final boolean destinationExists = hdfs.exists(newFile); -// If destination file already exists, resolve that -// based on processor configuration -if (destinationExists) { -switch (processorConfig.getConflictResolution()) { -case REPLACE_RESOLUTION: -// Remove destination file (newFile) to replace -if (hdfs.delete(newFile, false)) { -getLogger().info("deleted {
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
Lehel44 commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1554480782 ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java: ## @@ -352,95 +355,95 @@ protected void processBatchOfFiles(final List files, final ProcessContext for (final Path file : files) { -ugi.doAs(new PrivilegedAction() { -@Override -public Object run() { -FlowFile flowFile = session.create(parentFlowFile); -try { -final String originalFilename = file.getName(); -final Path outputDirPath = getNormalizedPath(context, OUTPUT_DIRECTORY, parentFlowFile); -final Path newFile = new Path(outputDirPath, originalFilename); -final boolean destinationExists = hdfs.exists(newFile); -// If destination file already exists, resolve that -// based on processor configuration -if (destinationExists) { -switch (processorConfig.getConflictResolution()) { -case REPLACE_RESOLUTION: -// Remove destination file (newFile) to replace -if (hdfs.delete(newFile, false)) { -getLogger().info("deleted {} in order to replace with the contents of {}", -new Object[]{newFile, flowFile}); -} -break; -case IGNORE_RESOLUTION: -session.transfer(flowFile, REL_SUCCESS); -getLogger().info( -"transferring {} to success because file with same name already exists", -new Object[]{flowFile}); -return null; -case FAIL_RESOLUTION: - session.transfer(session.penalize(flowFile), REL_FAILURE); -getLogger().warn( -"penalizing {} and routing to failure because file with same name already exists", -new Object[]{flowFile}); -return null; -default: -break; -} +ugi.doAs((PrivilegedAction) () -> { Review Comment: I think the patch didn't apply correctly. The NullPointerException still occurs. In the processBatchOfFiles method's catch should look like this: ```java catch (final Throwable t) { final Optional causeOptional = findCause(t, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); if (causeOptional.isPresent()) { throw new UncheckedIOException(new IOException(causeOptional.get())); } getLogger().error("Failed to rename on HDFS due to {}", new Object[]{t}); session.transfer(session.penalize(flowFile), REL_FAILURE); context.yield(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
tpalfy commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1554149017 ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java: ## @@ -352,95 +355,95 @@ protected void processBatchOfFiles(final List files, final ProcessContext for (final Path file : files) { -ugi.doAs(new PrivilegedAction() { -@Override -public Object run() { -FlowFile flowFile = session.create(parentFlowFile); -try { -final String originalFilename = file.getName(); -final Path outputDirPath = getNormalizedPath(context, OUTPUT_DIRECTORY, parentFlowFile); -final Path newFile = new Path(outputDirPath, originalFilename); -final boolean destinationExists = hdfs.exists(newFile); -// If destination file already exists, resolve that -// based on processor configuration -if (destinationExists) { -switch (processorConfig.getConflictResolution()) { -case REPLACE_RESOLUTION: -// Remove destination file (newFile) to replace -if (hdfs.delete(newFile, false)) { -getLogger().info("deleted {} in order to replace with the contents of {}", -new Object[]{newFile, flowFile}); -} -break; -case IGNORE_RESOLUTION: -session.transfer(flowFile, REL_SUCCESS); -getLogger().info( -"transferring {} to success because file with same name already exists", -new Object[]{flowFile}); -return null; -case FAIL_RESOLUTION: - session.transfer(session.penalize(flowFile), REL_FAILURE); -getLogger().warn( -"penalizing {} and routing to failure because file with same name already exists", -new Object[]{flowFile}); -return null; -default: -break; -} +ugi.doAs((PrivilegedAction) () -> { Review Comment: This way MoveHDFS is throwing a NullPointerException when the GSSException occurs because it tries the next file after it did a rollback. Here's a patch (against the current state of this PR) that would help: ``` Subject: [PATCH] Changes base on code review comments --- Index: nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 === diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java (revision 024860a2d8dccbaed8e98d280e1afd22dd0b7201) +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java (date 1712341636424) @@ -47,9 +47,11 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; +import org.ietf.jgss.GSSException; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.UncheckedIOException; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -57,6 +59,7 @@ import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -325,16 +328,20 @@ queueLock.unlock(); } -processBatchOfFiles(files, context, session, flowFile); +try { +processBatchOfFiles(files, context, session, flowFile); +session.remo
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
tpalfy commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1554121881 ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java: ## @@ -388,18 +388,18 @@ protected void processBatchOfFiles(final List files, final ProcessContext flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename); if (!keepSourceFiles && !getUserGroupInformation().doAs((PrivilegedExceptionAction) () -> hdfs.delete(file, false))) { -getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...", -new Object[]{file}); +getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...", file); session.remove(flowFile); continue; } session.getProvenanceReporter().receive(flowFile, file.toString()); session.transfer(flowFile, REL_SUCCESS); -getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}", -new Object[]{flowFile, file, millis, dataRate}); +getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}", flowFile, file, millis, dataRate); +} catch (final IOException e) { Review Comment: (GitHub doesn't allow me to suggest efficiently) This way we don't handle non-GSS IOExceptions properly. Instead of ```java } catch (final IOException e) { handleAuthErrors(e, session, context); } catch (final Throwable t) { getLogger().error("Error retrieving file {} from HDFS due to {}", file, t); session.rollback(); context.yield(); ``` we should have ```java } catch (final Throwable t) { if (!handleAuthErrors(t, session, context)) { getLogger().error("Error retrieving file {} from HDFS due to {}", file, t); session.rollback(); context.yield(); } ``` ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java: ## @@ -352,95 +355,95 @@ protected void processBatchOfFiles(final List files, final ProcessContext for (final Path file : files) { -ugi.doAs(new PrivilegedAction() { -@Override -public Object run() { -FlowFile flowFile = session.create(parentFlowFile); -try { -final String originalFilename = file.getName(); -final Path outputDirPath = getNormalizedPath(context, OUTPUT_DIRECTORY, parentFlowFile); -final Path newFile = new Path(outputDirPath, originalFilename); -final boolean destinationExists = hdfs.exists(newFile); -// If destination file already exists, resolve that -// based on processor configuration -if (destinationExists) { -switch (processorConfig.getConflictResolution()) { -case REPLACE_RESOLUTION: -// Remove destination file (newFile) to replace -if (hdfs.delete(newFile, false)) { -getLogger().info("deleted {} in order to replace with the contents of {}", -new Object[]{newFile, flowFile}); -} -break; -case IGNORE_RESOLUTION: -session.transfer(flowFile, REL_SUCCESS); -getLogger().info( -"transferring {} to success because file with same name already exists", -new Object[]{flowFile}); -return null; -case FAIL_RESOLUTION: - session.transfer(session.penalize(flowFile), REL_FAILURE); -getLogger().warn( -"penalizing {} and routing to failure because file with same name already exists", -new Object[]{flowFile}); -return null; -default: -break; -} +ugi.doAs((PrivilegedAction) () -> { Review Comment: This way M
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
Lehel44 commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1552005815 ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java: ## @@ -177,16 +177,20 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString()); session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString()); } catch (IOException ioe) { -// One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible -// external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive. -getLogger().warn("Failed to delete file or directory", ioe); - -Map attributes = Maps.newHashMapWithExpectedSize(1); -// The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.) -attributes.put(getAttributePrefix() + ".error.message", ioe.getMessage()); - - session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), getFailureRelationship()); -failedPath++; +if (handleAuthErrors(ioe, session, context)) { +return null; +} else { +// One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible +// external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive. +getLogger().warn("Failed to delete file or directory", ioe); + +Map attributes = Maps.newHashMapWithExpectedSize(1); +// The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.) +attributes.put(getAttributePrefix() + ".error.message", ioe.getMessage()); + + session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), getFailureRelationship()); +failedPath++; +} } } } Review Comment: Error handling of GSSException should be added to line 213 outer catch as well, because hdfs is called outside of the inner catch block. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
Lehel44 commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1552005815 ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java: ## @@ -177,16 +177,20 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString()); session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString()); } catch (IOException ioe) { -// One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible -// external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive. -getLogger().warn("Failed to delete file or directory", ioe); - -Map attributes = Maps.newHashMapWithExpectedSize(1); -// The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.) -attributes.put(getAttributePrefix() + ".error.message", ioe.getMessage()); - - session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), getFailureRelationship()); -failedPath++; +if (handleAuthErrors(ioe, session, context)) { +return null; +} else { +// One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible +// external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive. +getLogger().warn("Failed to delete file or directory", ioe); + +Map attributes = Maps.newHashMapWithExpectedSize(1); +// The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.) +attributes.put(getAttributePrefix() + ".error.message", ioe.getMessage()); + + session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), getFailureRelationship()); +failedPath++; +} } } } Review Comment: Error handling of GSSException should be added to line 213 outer catch as well, because the fileSystem.exists is called outside of the inner catch block. ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java: ## @@ -294,7 +302,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro if (logEmptyListing.getAndDecrement() > 0) { getLogger().info( "Obtained file listing in {} milliseconds; listing had {} items, {} of which were new", -new Object[]{millis, listedFiles.size(), newItems}); +millis, listedFiles.size(), newItems); Review Comment: Could not comment on line 450: In processBachOfFiles line 450 the GSSException should be handled as well. ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java: ## @@ -254,8 +254,16 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro if (!directoryExists) { throw new IOException("Input Directory or File does not exist in HDFS"); } +} catch (final IOException e) { Review Comment: The IOException does not need to be handled differently, it can be handled in the Exception catch branch ```java catch (Exception e) { if(!handleAuthErrors(e, session, context)) { getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", filenameValue, flowFile, e); flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage()); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); } return; } ``` ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java: ## @@ -372,6
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
Lehel44 commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1538284672 ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java: ## @@ -465,10 +465,16 @@ public void process(InputStream in) throws IOException { } catch (final IOException e) { Review Comment: Since we throw the GSSException wrapped in a ProcessException, I think we have to catch the ProcessException, then check if it's a GSSException. If so rollback, othwerwise throw it away. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
Lehel44 commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1538284672 ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java: ## @@ -465,10 +465,16 @@ public void process(InputStream in) throws IOException { } catch (final IOException e) { Review Comment: Since we throw the GSSException wrapped in a ProcessException, I think we have to catch the ProcessException, then check if it's a GSSException. If so rollback, othwerwise throw it away. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
Lehel44 commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1534451779 ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java: ## @@ -372,54 +370,56 @@ public Object run() { // Write FlowFile to temp file on HDFS final StopWatch stopWatch = new StopWatch(true); -session.read(putFlowFile, new InputStreamCallback() { - -@Override -public void process(InputStream in) throws IOException { -OutputStream fos = null; -Path createdFile = null; -try { -if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { -fos = hdfs.append(copyFile, bufferSize); -} else { -final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); - -if (shouldIgnoreLocality(context, session)) { - cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); -} +session.read(putFlowFile, in -> { +OutputStream fos = null; +Path createdFile = null; +try { +if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { +fos = hdfs.append(copyFile, bufferSize); +} else { +final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); -fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), - FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, -null, null); +if (shouldIgnoreLocality(context, session)) { + cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); } -if (codec != null) { -fos = codec.createOutputStream(fos); +fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), + FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, +null, null); +} + +if (codec != null) { +fos = codec.createOutputStream(fos); +} +createdFile = actualCopyFile; +BufferedInputStream bis = new BufferedInputStream(in); +StreamUtils.copy(bis, fos); +bis = null; +fos.flush(); +} catch (IOException e) { +// Catch GSSExceptions and reset the resources +Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); +if (causeOptional.isPresent()) { +getLogger().warn("Error authenticating when performing file operation, resetting HDFS resources: {} ", e.getCause().getMessage()); + hdfsResources.set(resetHDFSResources(getConfigLocations(context), context)); +} Review Comment: This change intended to throw the exception to the outer catch block that will log it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
mattyb149 commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1534228284 ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java: ## @@ -372,54 +370,56 @@ public Object run() { // Write FlowFile to temp file on HDFS final StopWatch stopWatch = new StopWatch(true); -session.read(putFlowFile, new InputStreamCallback() { - -@Override -public void process(InputStream in) throws IOException { -OutputStream fos = null; -Path createdFile = null; -try { -if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { -fos = hdfs.append(copyFile, bufferSize); -} else { -final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); - -if (shouldIgnoreLocality(context, session)) { - cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); -} +session.read(putFlowFile, in -> { +OutputStream fos = null; +Path createdFile = null; +try { +if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { +fos = hdfs.append(copyFile, bufferSize); +} else { +final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); -fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), - FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, -null, null); +if (shouldIgnoreLocality(context, session)) { + cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); } -if (codec != null) { -fos = codec.createOutputStream(fos); +fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), + FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, +null, null); +} + +if (codec != null) { +fos = codec.createOutputStream(fos); +} +createdFile = actualCopyFile; +BufferedInputStream bis = new BufferedInputStream(in); +StreamUtils.copy(bis, fos); +bis = null; +fos.flush(); +} catch (IOException e) { +// Catch GSSExceptions and reset the resources +Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); +if (causeOptional.isPresent()) { +getLogger().warn("Error authenticating when performing file operation, resetting HDFS resources: {} ", e.getCause().getMessage()); + hdfsResources.set(resetHDFSResources(getConfigLocations(context), context)); +} Review Comment: This suggested change removes the error logging and the reset, is that intentional? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
Lehel44 commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1527388335 ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java: ## @@ -372,54 +370,56 @@ public Object run() { // Write FlowFile to temp file on HDFS final StopWatch stopWatch = new StopWatch(true); -session.read(putFlowFile, new InputStreamCallback() { - -@Override -public void process(InputStream in) throws IOException { -OutputStream fos = null; -Path createdFile = null; -try { -if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { -fos = hdfs.append(copyFile, bufferSize); -} else { -final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); - -if (shouldIgnoreLocality(context, session)) { - cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); -} +session.read(putFlowFile, in -> { +OutputStream fos = null; +Path createdFile = null; +try { +if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { +fos = hdfs.append(copyFile, bufferSize); +} else { +final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); -fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), - FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, -null, null); +if (shouldIgnoreLocality(context, session)) { + cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); } -if (codec != null) { -fos = codec.createOutputStream(fos); +fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), + FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, +null, null); +} + +if (codec != null) { +fos = codec.createOutputStream(fos); +} +createdFile = actualCopyFile; +BufferedInputStream bis = new BufferedInputStream(in); +StreamUtils.copy(bis, fos); +bis = null; +fos.flush(); +} catch (IOException e) { Review Comment: Yeah I think it's fine. If executing the mentioned lines does not cause discrepancy, I prefer the simplicity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
mattyb149 commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1526776640 ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java: ## @@ -372,54 +370,56 @@ public Object run() { // Write FlowFile to temp file on HDFS final StopWatch stopWatch = new StopWatch(true); -session.read(putFlowFile, new InputStreamCallback() { - -@Override -public void process(InputStream in) throws IOException { -OutputStream fos = null; -Path createdFile = null; -try { -if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { -fos = hdfs.append(copyFile, bufferSize); -} else { -final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); - -if (shouldIgnoreLocality(context, session)) { - cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); -} +session.read(putFlowFile, in -> { +OutputStream fos = null; +Path createdFile = null; +try { +if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { +fos = hdfs.append(copyFile, bufferSize); +} else { +final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); -fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), - FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, -null, null); +if (shouldIgnoreLocality(context, session)) { + cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); } -if (codec != null) { -fos = codec.createOutputStream(fos); +fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), + FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, +null, null); +} + +if (codec != null) { +fos = codec.createOutputStream(fos); +} +createdFile = actualCopyFile; +BufferedInputStream bis = new BufferedInputStream(in); +StreamUtils.copy(bis, fos); +bis = null; +fos.flush(); +} catch (IOException e) { +// Catch GSSExceptions and reset the resources +Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); +if (causeOptional.isPresent()) { +getLogger().warn("Error authenticating when performing file operation, resetting HDFS resources: {} ", e.getCause().getMessage()); Review Comment: Yeah same thing as above, I can add the exception regardless (I think) and change the logging level to error if the user needs to be alerted immediately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
mattyb149 commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1526776640 ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java: ## @@ -372,54 +370,56 @@ public Object run() { // Write FlowFile to temp file on HDFS final StopWatch stopWatch = new StopWatch(true); -session.read(putFlowFile, new InputStreamCallback() { - -@Override -public void process(InputStream in) throws IOException { -OutputStream fos = null; -Path createdFile = null; -try { -if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { -fos = hdfs.append(copyFile, bufferSize); -} else { -final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); - -if (shouldIgnoreLocality(context, session)) { - cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); -} +session.read(putFlowFile, in -> { +OutputStream fos = null; +Path createdFile = null; +try { +if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { +fos = hdfs.append(copyFile, bufferSize); +} else { +final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); -fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), - FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, -null, null); +if (shouldIgnoreLocality(context, session)) { + cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); } -if (codec != null) { -fos = codec.createOutputStream(fos); +fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), + FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, +null, null); +} + +if (codec != null) { +fos = codec.createOutputStream(fos); +} +createdFile = actualCopyFile; +BufferedInputStream bis = new BufferedInputStream(in); +StreamUtils.copy(bis, fos); +bis = null; +fos.flush(); +} catch (IOException e) { +// Catch GSSExceptions and reset the resources +Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); +if (causeOptional.isPresent()) { +getLogger().warn("Error authenticating when performing file operation, resetting HDFS resources: {} ", e.getCause().getMessage()); Review Comment: Yeah same thing as above, I can add the error regardless (I think) and change it to error if the user needs to be alerted immediately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
mattyb149 commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1526775762 ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java: ## @@ -372,54 +370,56 @@ public Object run() { // Write FlowFile to temp file on HDFS final StopWatch stopWatch = new StopWatch(true); -session.read(putFlowFile, new InputStreamCallback() { - -@Override -public void process(InputStream in) throws IOException { -OutputStream fos = null; -Path createdFile = null; -try { -if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { -fos = hdfs.append(copyFile, bufferSize); -} else { -final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); - -if (shouldIgnoreLocality(context, session)) { - cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); -} +session.read(putFlowFile, in -> { +OutputStream fos = null; +Path createdFile = null; +try { +if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { +fos = hdfs.append(copyFile, bufferSize); +} else { +final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); -fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), - FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, -null, null); +if (shouldIgnoreLocality(context, session)) { + cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); } -if (codec != null) { -fos = codec.createOutputStream(fos); +fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), + FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, +null, null); +} + +if (codec != null) { +fos = codec.createOutputStream(fos); +} +createdFile = actualCopyFile; +BufferedInputStream bis = new BufferedInputStream(in); +StreamUtils.copy(bis, fos); +bis = null; +fos.flush(); +} catch (IOException e) { +// Catch GSSExceptions and reset the resources +Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); +if (causeOptional.isPresent()) { +getLogger().warn("Error authenticating when performing file operation, resetting HDFS resources: {} ", e.getCause().getMessage()); Review Comment: The intent with WARN was not to generate a bulletin by default but I'm fine either way -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
mattyb149 commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1526688710 ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java: ## @@ -372,54 +370,56 @@ public Object run() { // Write FlowFile to temp file on HDFS final StopWatch stopWatch = new StopWatch(true); -session.read(putFlowFile, new InputStreamCallback() { - -@Override -public void process(InputStream in) throws IOException { -OutputStream fos = null; -Path createdFile = null; -try { -if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { -fos = hdfs.append(copyFile, bufferSize); -} else { -final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); - -if (shouldIgnoreLocality(context, session)) { - cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); -} +session.read(putFlowFile, in -> { +OutputStream fos = null; +Path createdFile = null; +try { +if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { +fos = hdfs.append(copyFile, bufferSize); +} else { +final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); -fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), - FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, -null, null); +if (shouldIgnoreLocality(context, session)) { + cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); } -if (codec != null) { -fos = codec.createOutputStream(fos); +fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), + FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, +null, null); +} + +if (codec != null) { +fos = codec.createOutputStream(fos); +} +createdFile = actualCopyFile; +BufferedInputStream bis = new BufferedInputStream(in); +StreamUtils.copy(bis, fos); +bis = null; +fos.flush(); +} catch (IOException e) { Review Comment: Works for me, the key was to not have it blow up for a "simple" relogin error. I thought I'd found that depending on the config the first create() could cause the login error. If anything but a "Kerberos relogin error" occurs, I think we should rightfully propagate the exception, I can throw it as a ProcessException inside the read(). What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
Lehel44 commented on code in PR #8495: URL: https://github.com/apache/nifi/pull/8495#discussion_r1524238959 ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java: ## @@ -372,54 +370,56 @@ public Object run() { // Write FlowFile to temp file on HDFS final StopWatch stopWatch = new StopWatch(true); -session.read(putFlowFile, new InputStreamCallback() { - -@Override -public void process(InputStream in) throws IOException { -OutputStream fos = null; -Path createdFile = null; -try { -if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { -fos = hdfs.append(copyFile, bufferSize); -} else { -final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); - -if (shouldIgnoreLocality(context, session)) { - cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); -} +session.read(putFlowFile, in -> { +OutputStream fos = null; +Path createdFile = null; +try { +if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { +fos = hdfs.append(copyFile, bufferSize); +} else { +final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); -fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), - FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, -null, null); +if (shouldIgnoreLocality(context, session)) { + cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); } -if (codec != null) { -fos = codec.createOutputStream(fos); +fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), + FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, +null, null); +} + +if (codec != null) { +fos = codec.createOutputStream(fos); +} +createdFile = actualCopyFile; +BufferedInputStream bis = new BufferedInputStream(in); +StreamUtils.copy(bis, fos); +bis = null; +fos.flush(); +} catch (IOException e) { +// Catch GSSExceptions and reset the resources +Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); +if (causeOptional.isPresent()) { Review Comment: Here we catch the IOException, but in case the underlying cause is not a GSSException with an error code NO_CRED, the exception wont be handled and will be absorbed by the catch. The IOException should be thrown in the else branch. ## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java: ## @@ -372,54 +370,56 @@ public Object run() { // Write FlowFile to temp file on HDFS final StopWatch stopWatch = new StopWatch(true); -session.read(putFlowFile, new InputStreamCallback() { - -@Override -public void process(InputStream in) throws IOException { -OutputStream fos = null; -Path createdFile = null; -try { -if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) { -fos = hdfs.append(copyFile, bufferSize); -} else { -final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); - -if (shouldIgnoreLocality(context, session)) { - cflags.add(CreateFlag.IGNORE_CLIENT_LOCALI
Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
Lehel44 commented on PR #8495: URL: https://github.com/apache/nifi/pull/8495#issuecomment-1995416957 reviewing... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]
mattyb149 opened a new pull request, #8495: URL: https://github.com/apache/nifi/pull/8495 # Summary [NIFI-12889](https://issues.apache.org/jira/browse/NIFI-12889) This PR resets the HDFS resources upon Kerberos authentication failure so the relogin can happen correctly on the next try. # Tracking Please complete the following tracking steps prior to pull request creation. ### Issue Tracking - [x] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created ### Pull Request Tracking - [x] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-0` - [x] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-0` ### Pull Request Formatting - [x] Pull Request based on current revision of the `main` branch - [x] Pull Request refers to a feature branch with one commit containing changes # Verification Please indicate the verification steps performed prior to pull request creation. ### Build - [x] Build completed using `mvn clean install -P contrib-check` - [x] JDK 21 ### Licensing - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html) - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files ### Documentation - [ ] Documentation formatting appears as expected in rendered files -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org