NIFI-3553: If IOException is thrown when completing FTP transfer, do not update the FlowFile to point to the content that was streamed but instead leave FlowFile pointing to original version
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c1e65f95 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c1e65f95 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c1e65f95 Branch: refs/heads/support/nifi-0.7.x Commit: c1e65f95bd9951390215c8ef7e2b25cf95f8b5f1 Parents: cd1cf68 Author: Mark Payne <marka...@hotmail.com> Authored: Fri Mar 3 13:18:41 2017 -0500 Committer: Joe Skora <jsk...@apache.org> Committed: Thu Apr 27 14:56:14 2017 -0400 ---------------------------------------------------------------------- .../org/apache/nifi/processors/standard/FetchFileTransfer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c1e65f95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java index a7ae5ef..6182b0a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java @@ -239,9 +239,9 @@ public abstract class FetchFileTransfer extends AbstractProcessor { @Override public void process(final OutputStream out) throws IOException { StreamUtils.copy(in, out); + transfer.flush(); } }); - transfer.flush(); transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime())); } catch (final FileNotFoundException e) { getLogger().error("Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}", @@ -255,14 +255,14 @@ public abstract class FetchFileTransfer extends AbstractProcessor { session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED); session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED); return; - } catch (final IOException e) { + } catch (final ProcessException | IOException e) { try { transfer.close(); } catch (final IOException e1) { getLogger().warn("Failed to close connection to {}:{} due to {}", new Object[] {host, port, e.toString()}, e); } - getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to failure", + getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to comms.failure", new Object[] {flowFile, filename, host, port, e.toString()}, e); session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE); return;