Repository: nifi
Updated Branches:
  refs/heads/master 91ff810db -> 7ff14f719


NIFI-3091: Ensure that we set the appropriate size on FlowFiles when modifying 
them

This closes #1267

Signed-off-by: jpercivall <jperciv...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7ff14f71
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7ff14f71
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7ff14f71

Branch: refs/heads/master
Commit: 7ff14f7191f8bbb1722340ccdd963d3e7d24b9e4
Parents: 91ff810
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed Nov 23 11:09:42 2016 -0500
Committer: jpercivall <jperciv...@apache.org>
Committed: Wed Nov 23 12:07:44 2016 -0500

----------------------------------------------------------------------
 .../repository/StandardProcessSession.java      |  3 +-
 .../repository/TestStandardProcessSession.java  | 59 ++++++++++++++++++++
 2 files changed, 61 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7ff14f71/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 002bac9..80c917c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2580,7 +2580,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                     cnfeThrown = true;
                     throw cnfe;
                 } finally {
-                    this.bytesWritten += countingOut.getBytesWritten();
+                    writtenToFlowFile = countingOut.getBytesWritten();
+                    this.bytesWritten += writtenToFlowFile;
                     this.bytesRead += countingIn.getBytesRead();
                     recursionSet.remove(source);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7ff14f71/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 8cc088d..6f94994 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -277,6 +277,65 @@ public class TestStandardProcessSession {
     }
 
     @Test
+    public void testModifyContentWithStreamCallbackHasCorrectSize() throws 
IOException {
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .id(1000L)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
+        flowFileQueue.put(flowFileRecord);
+        FlowFile original = session.get();
+        assertNotNull(original);
+
+        FlowFile child = session.write(original, (in, out) -> 
out.write("hello".getBytes()));
+        session.transfer(child);
+        session.commit();
+
+        final FlowFileRecord onQueue = 
flowFileQueue.poll(Collections.emptySet());
+        assertEquals(5, onQueue.getSize());
+    }
+
+    @Test
+    public void testModifyContentWithOutputStreamCallbackHasCorrectSize() 
throws IOException {
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .id(1000L)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
+        flowFileQueue.put(flowFileRecord);
+        FlowFile original = session.get();
+        assertNotNull(original);
+
+        FlowFile child = session.write(original, out -> 
out.write("hello".getBytes()));
+        session.transfer(child);
+        session.commit();
+
+        final FlowFileRecord onQueue = 
flowFileQueue.poll(Collections.emptySet());
+        assertEquals(5, onQueue.getSize());
+    }
+
+    @Test
+    public void testModifyContentWithAppendHasCorrectSize() throws IOException 
{
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .id(1000L)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
+        flowFileQueue.put(flowFileRecord);
+        FlowFile original = session.get();
+        assertNotNull(original);
+
+        FlowFile child = session.append(original, out -> 
out.write("hello".getBytes()));
+        session.transfer(child);
+        session.commit();
+
+        final FlowFileRecord onQueue = 
flowFileQueue.poll(Collections.emptySet());
+        assertEquals(5, onQueue.getSize());
+    }
+
+
+
+    @Test
     public void testModifyContentThenRollback() throws IOException {
         assertEquals(0, contentRepo.getExistingClaims().size());
 

Reply via email to