Repository: nifi Updated Branches: refs/heads/master 67a47dbea -> 8a28395e9
NIFI-2754 - Migrating swap to active prior to swapping if necessary. - This closes #1000. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8a28395e Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8a28395e Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8a28395e Branch: refs/heads/master Commit: 8a28395e9feafdb3af8c76137bfe0f5f7a07e27e Parents: 67a47db Author: Peter Wicks <pwi...@micron.com> Authored: Fri Sep 9 22:10:01 2016 -0600 Committer: Matt Gilman <matt.c.gil...@gmail.com> Committed: Wed Sep 14 14:27:50 2016 -0400 ---------------------------------------------------------------------- .../nifi/controller/StandardFlowFileQueue.java | 5 +++- .../controller/TestStandardFlowFileQueue.java | 24 ++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/8a28395e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 77f82d5..68af208 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -389,7 +389,8 @@ public class StandardFlowFileQueue implements FlowFileQueue { if (flowFile != null) { incrementActiveQueueSize(-1, -flowFile.getSize()); } - } while (isExpired); + } + while (isExpired); if (!expiredRecords.isEmpty()) { incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes); @@ -547,6 +548,8 @@ public class StandardFlowFileQueue implements FlowFileQueue { return; } + migrateSwapToActive(); + final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE; int originalSwapQueueCount = swapQueue.size(); http://git-wip-us.apache.org/repos/asf/nifi/blob/8a28395e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index 32c1dc6..3960d8d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -503,6 +504,29 @@ public class TestStandardFlowFileQueue { assertNull(status.getFailureReason()); } + @Test(timeout = 5000) + public void testListFlowFilesResultsLimitedCollection() throws InterruptedException { + Collection<FlowFileRecord> tff = new ArrayList<>(); + //Swap Size is 10000 records, so 30000 is equal to 3 swap files. + for (int i = 0; i < 30000; i++) { + tff.add(new TestFlowFile()); + } + + queue.putAll(tff); + + final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 100); + assertNotNull(status); + assertEquals(30000, status.getQueueSize().getObjectCount()); + + while (status.getState() != ListFlowFileState.COMPLETE) { + Thread.sleep(100); + } + + assertEquals(100, status.getFlowFileSummaries().size()); + assertEquals(100, status.getCompletionPercentage()); + assertNull(status.getFailureReason()); + } + private class TestSwapManager implements FlowFileSwapManager { private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>();