Repository: nifi Updated Branches: refs/heads/master bb96b0f46 -> 08b66b5b6
NIFI-3969: Prevent merging flowfiles prematurely when all bins fill but some are already full and can be processed Signed-off-by: Pierre Villard <pierre.villard...@gmail.com> This closes #1850. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/08b66b5b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/08b66b5b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/08b66b5b Branch: refs/heads/master Commit: 08b66b5b6a2fd4194d55a98900c1d898b4e74a28 Parents: bb96b0f Author: Mark Payne <marka...@hotmail.com> Authored: Wed May 24 08:27:06 2017 -0400 Committer: Pierre Villard <pierre.villard...@gmail.com> Committed: Wed May 24 19:36:18 2017 +0200 ---------------------------------------------------------------------- .../apache/nifi/processor/util/bin/BinFiles.java | 2 +- .../processors/standard/TestMergeContent.java | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/08b66b5b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java index 67e37c2..b15d23b 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java @@ -205,7 +205,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { // if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do // this, then we will simply wait for it to expire because we can't get any more FlowFiles into the // bins. So we may as well expire it now. - if (added == 0 && (readyBins.size() + binManager.getBinCount()) >= context.getProperty(MAX_BIN_COUNT).asInteger()) { + if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) { final Bin bin = binManager.removeOldestBin(); if (bin != null) { added++; http://git-wip-us.apache.org/repos/asf/nifi/blob/08b66b5b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java index b6025c5..b8f9210 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java @@ -911,6 +911,24 @@ public class TestMergeContent { bundle.assertAttributeExists(MergeContent.MERGE_BIN_AGE_ATTRIBUTE); } + @Test + public void testLeavesSmallBinUnmerged() { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MIN_ENTRIES, "5"); + runner.setProperty(MergeContent.MAX_ENTRIES, "5"); + runner.setProperty(MergeContent.MAX_BIN_COUNT, "3"); + + for (int i = 0; i < 17; i++) { + runner.enqueue(String.valueOf(i) + "\n"); + } + + runner.run(5); + + runner.assertTransferCount(MergeContent.REL_MERGED, 3); + runner.assertTransferCount(MergeContent.REL_ORIGINAL, 15); + assertEquals(2, runner.getQueueSize().getObjectCount()); + } + private void createFlowFiles(final TestRunner testRunner) throws UnsupportedEncodingException { final Map<String, String> attributes = new HashMap<>(); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");