Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 2d0bb1c1c -> 548f6d083


NIFI-703: Do not use prefetch if using FlowFileFilter, as this can result in 
FlowFiles not being pulled from the queue


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a5d6f88c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a5d6f88c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a5d6f88c

Branch: refs/heads/develop
Commit: a5d6f88c2e0c6fc753fa753f259aaa6e3475049a
Parents: a1f0438
Author: Mark Payne <marka...@hotmail.com>
Authored: Fri Jun 19 12:55:04 2015 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Fri Jun 19 12:55:04 2015 -0400

----------------------------------------------------------------------
 .../nifi/controller/StandardFlowFileQueue.java    | 18 +++++++++++-------
 1 file changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5d6f88c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 8f6c8ed..f47ea2f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -363,7 +363,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
             return true;
         }
 
-        if (maxBytes > 0 && (queueSize.getByteCount() >= maxBytes)) {
+        if (maxBytes > 0 && queueSize.getByteCount() >= maxBytes) {
             return true;
         }
 
@@ -437,7 +437,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
             final List<FlowFileRecord> swapRecords = new 
ArrayList<>(Math.min(SWAP_RECORD_POLL_SIZE, swapQueue.size()));
             final Iterator<FlowFileRecord> itr = swapQueue.iterator();
             while (itr.hasNext() && swapRecords.size() < 
SWAP_RECORD_POLL_SIZE) {
-                FlowFileRecord record = itr.next();
+                final FlowFileRecord record = itr.next();
                 swapRecords.add(record);
                 itr.remove();
             }
@@ -606,7 +606,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         boolean isExpired;
 
         migrateSwapToActive();
-        boolean queueFullAtStart = queueFullRef.get();
+        final boolean queueFullAtStart = queueFullRef.get();
 
         do {
             flowFile = this.activeQueue.poll();
@@ -794,9 +794,6 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         writeLock.lock();
         try {
             migrateSwapToActive();
-            if (activeQueue.isEmpty()) {
-                return Collections.emptyList();
-            }
 
             final long expirationMillis = this.flowFileExpirationMillis.get();
             final boolean queueFullAtStart = queueFullRef.get();
@@ -804,6 +801,13 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
             final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
             final List<FlowFileRecord> unselected = new ArrayList<>();
 
+            // the prefetch doesn't allow us to add records back. So when this 
method is used,
+            // if there are prefetched records, we have to requeue them into 
the active queue first.
+            final PreFetch prefetch = preFetchRef.get();
+            if (prefetch != null) {
+                requeueExpiredPrefetch(prefetch);
+            }
+
             while (true) {
                 FlowFileRecord flowFile = this.activeQueue.poll();
                 if (flowFile == null) {
@@ -970,7 +974,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         boolean updated = false;
 
         do {
-            QueueSize queueSize = unacknowledgedSizeRef.get();
+            final QueueSize queueSize = unacknowledgedSizeRef.get();
             final QueueSize newSize = new QueueSize(queueSize.getObjectCount() 
+ addToCount, queueSize.getByteCount() + addToSize);
             updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize);
         } while (!updated);

Reply via email to