Repository: incubator-nifi Updated Branches: refs/heads/develop 2d0bb1c1c -> 548f6d083
NIFI-703: Do not use prefetch if using FlowFileFilter, as this can result in FlowFiles not being pulled from the queue Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a5d6f88c Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a5d6f88c Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a5d6f88c Branch: refs/heads/develop Commit: a5d6f88c2e0c6fc753fa753f259aaa6e3475049a Parents: a1f0438 Author: Mark Payne <marka...@hotmail.com> Authored: Fri Jun 19 12:55:04 2015 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Fri Jun 19 12:55:04 2015 -0400 ---------------------------------------------------------------------- .../nifi/controller/StandardFlowFileQueue.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5d6f88c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 8f6c8ed..f47ea2f 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -363,7 +363,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { return true; } - if (maxBytes > 0 && (queueSize.getByteCount() >= maxBytes)) { + if (maxBytes > 0 && queueSize.getByteCount() >= maxBytes) { return true; } @@ -437,7 +437,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final List<FlowFileRecord> swapRecords = new ArrayList<>(Math.min(SWAP_RECORD_POLL_SIZE, swapQueue.size())); final Iterator<FlowFileRecord> itr = swapQueue.iterator(); while (itr.hasNext() && swapRecords.size() < SWAP_RECORD_POLL_SIZE) { - FlowFileRecord record = itr.next(); + final FlowFileRecord record = itr.next(); swapRecords.add(record); itr.remove(); } @@ -606,7 +606,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { boolean isExpired; migrateSwapToActive(); - boolean queueFullAtStart = queueFullRef.get(); + final boolean queueFullAtStart = queueFullRef.get(); do { flowFile = this.activeQueue.poll(); @@ -794,9 +794,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { writeLock.lock(); try { migrateSwapToActive(); - if (activeQueue.isEmpty()) { - return Collections.emptyList(); - } final long expirationMillis = this.flowFileExpirationMillis.get(); final boolean queueFullAtStart = queueFullRef.get(); @@ -804,6 +801,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>(); final List<FlowFileRecord> unselected = new ArrayList<>(); + // the prefetch doesn't allow us to add records back. So when this method is used, + // if there are prefetched records, we have to requeue them into the active queue first. + final PreFetch prefetch = preFetchRef.get(); + if (prefetch != null) { + requeueExpiredPrefetch(prefetch); + } + while (true) { FlowFileRecord flowFile = this.activeQueue.poll(); if (flowFile == null) { @@ -970,7 +974,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { boolean updated = false; do { - QueueSize queueSize = unacknowledgedSizeRef.get(); + final QueueSize queueSize = unacknowledgedSizeRef.get(); final QueueSize newSize = new QueueSize(queueSize.getObjectCount() + addToCount, queueSize.getByteCount() + addToSize); updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize); } while (!updated);