Repository: nifi
Updated Branches:
  refs/heads/master b7e1f4813 -> c59b6fdf6


NIFI-4658 set Maximum Number of Entries to required and allow FlowFiles having 
fragment.count greater than Max Entries property

Signed-off-by: Mike Moser <[email protected]>

This closes #2559


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c59b6fdf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c59b6fdf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c59b6fdf

Branch: refs/heads/master
Commit: c59b6fdf66a33b68a6dd6a0a2b1da90cd98d9825
Parents: b7e1f48
Author: Mark Bean <[email protected]>
Authored: Fri Mar 16 19:43:29 2018 -0400
Committer: Mike Moser <[email protected]>
Committed: Mon Apr 2 20:42:04 2018 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/processor/util/bin/Bin.java | 14 ++++-----
 .../nifi/processor/util/bin/BinFiles.java       |  4 +--
 .../nifi/processor/util/bin/BinManager.java     |  4 +++
 .../processors/standard/TestMergeContent.java   | 30 ++++++++++++++++++++
 4 files changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c59b6fdf/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
index b427e06..f95c470 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
@@ -48,10 +48,10 @@ public class Bin {
     private volatile int maximumEntries = Integer.MAX_VALUE;
     private final String fileCountAttribute;
 
-    final List<FlowFile> binContents = new ArrayList<>();
+    private final List<FlowFile> binContents = new ArrayList<>();
     private final Set<String> binIndexSet = new HashSet<>();
-    long size;
-    int successiveFailedOfferings = 0;
+    private long size;
+    private int successiveFailedOfferings = 0;
 
     /**
      * Constructs a new bin
@@ -141,11 +141,11 @@ public class Bin {
         if (fileCountAttribute != null) {
             final String countValue = 
flowFile.getAttribute(fileCountAttribute);
             final Integer count = toInteger(countValue);
-            if (count != null) {
-                int currentMaxEntries = this.maximumEntries;
-                this.maximumEntries = Math.min(count, currentMaxEntries);
-                this.minimumEntries = currentMaxEntries;
+            if (count == null) {
+                return false;
             }
+            this.maximumEntries = count;
+            this.minimumEntries = count;
 
             final String index = 
flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE);
             if (index == null || index.isEmpty() || !binIndexSet.add(index)) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c59b6fdf/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
index 643aae4..bad0ded 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
@@ -72,9 +72,9 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
             .build();
     public static final PropertyDescriptor MAX_ENTRIES = new 
PropertyDescriptor.Builder()
             .name("Maximum Number of Entries")
-            .description("The maximum number of files to include in a bundle. 
If not specified, there is no maximum.")
+            .description("The maximum number of files to include in a bundle")
             .defaultValue("1000")
-            .required(false)
+            .required(true)
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c59b6fdf/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
index e6cec78..60c2966 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
@@ -76,6 +76,10 @@ public class BinManager {
         this.fileCountAttribute.set(fileCountAttribute);
     }
 
+    public String getFileCountAttribute() {
+        return fileCountAttribute.get();
+    }
+
     public void setMinimumEntries(final int minimumEntries) {
         this.minEntries.set(minimumEntries);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c59b6fdf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index af448d6..0af5f8b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -87,6 +87,9 @@ public class TestMergeContent {
         runner.assertTransferCount(MergeContent.REL_MERGED, 1);
         runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
 
+        final MockFlowFile bundle = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+        assertEquals(1024 * 6, bundle.getSize());
+
         // Queue should not be empty because the first FlowFile will be 
transferred back to the input queue
         // when we run out @OnStopped logic, since it won't be transferred to 
any bin.
         runner.assertQueueNotEmpty();
@@ -887,6 +890,33 @@ public class TestMergeContent {
     }
 
     @Test
+    public void testDefragmentWithTooManyFragements() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MERGE_STRATEGY, 
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
+        runner.setProperty(MergeContent.MAX_ENTRIES, "3");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
+        attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
+        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
+
+        runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
+        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
+        runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
+        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
+        runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
+        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
+        runner.enqueue("Panama".getBytes("UTF-8"), attributes);
+
+        runner.run();
+
+        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
+        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+        final MockFlowFile assembled = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+        assembled.assertContentEquals("A Man A Plan A Canal 
Panama".getBytes("UTF-8"));
+    }
+
+    @Test
     public void testDefragmentWithTooFewFragments() throws IOException {
         final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
         runner.setProperty(MergeContent.MERGE_STRATEGY, 
MergeContent.MERGE_STRATEGY_DEFRAGMENT);

Reply via email to