Repository: nifi Updated Branches: refs/heads/master 91ff810db -> 7ff14f719
NIFI-3091: Ensure that we set the appropriate size on FlowFiles when modifying them This closes #1267 Signed-off-by: jpercivall <jperciv...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7ff14f71 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7ff14f71 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7ff14f71 Branch: refs/heads/master Commit: 7ff14f7191f8bbb1722340ccdd963d3e7d24b9e4 Parents: 91ff810 Author: Mark Payne <marka...@hotmail.com> Authored: Wed Nov 23 11:09:42 2016 -0500 Committer: jpercivall <jperciv...@apache.org> Committed: Wed Nov 23 12:07:44 2016 -0500 ---------------------------------------------------------------------- .../repository/StandardProcessSession.java | 3 +- .../repository/TestStandardProcessSession.java | 59 ++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/7ff14f71/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 002bac9..80c917c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -2580,7 +2580,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE cnfeThrown = true; throw cnfe; } finally { - this.bytesWritten += countingOut.getBytesWritten(); + writtenToFlowFile = countingOut.getBytesWritten(); + this.bytesWritten += writtenToFlowFile; this.bytesRead += countingIn.getBytesRead(); recursionSet.remove(source); http://git-wip-us.apache.org/repos/asf/nifi/blob/7ff14f71/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 8cc088d..6f94994 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -277,6 +277,65 @@ public class TestStandardProcessSession { } @Test + public void testModifyContentWithStreamCallbackHasCorrectSize() throws IOException { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile original = session.get(); + assertNotNull(original); + + FlowFile child = session.write(original, (in, out) -> out.write("hello".getBytes())); + session.transfer(child); + session.commit(); + + final FlowFileRecord onQueue = flowFileQueue.poll(Collections.emptySet()); + assertEquals(5, onQueue.getSize()); + } + + @Test + public void testModifyContentWithOutputStreamCallbackHasCorrectSize() throws IOException { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile original = session.get(); + assertNotNull(original); + + FlowFile child = session.write(original, out -> out.write("hello".getBytes())); + session.transfer(child); + session.commit(); + + final FlowFileRecord onQueue = flowFileQueue.poll(Collections.emptySet()); + assertEquals(5, onQueue.getSize()); + } + + @Test + public void testModifyContentWithAppendHasCorrectSize() throws IOException { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile original = session.get(); + assertNotNull(original); + + FlowFile child = session.append(original, out -> out.write("hello".getBytes())); + session.transfer(child); + session.commit(); + + final FlowFileRecord onQueue = flowFileQueue.poll(Collections.emptySet()); + assertEquals(5, onQueue.getSize()); + } + + + + @Test public void testModifyContentThenRollback() throws IOException { assertEquals(0, contentRepo.getExistingClaims().size());