Repository: nifi
Updated Branches:
  refs/heads/master c13cfa6ea -> 6aefc0b91


NIFI-2751 NIFI-2848 Get batch of flow files in a round-robin manner

This closes #1111


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6aefc0b9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6aefc0b9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6aefc0b9

Branch: refs/heads/master
Commit: 6aefc0b9102bc83456ff537ade22946715bdcd30
Parents: c13cfa6
Author: Pierre Villard <pierre.villard...@gmail.com>
Authored: Thu Oct 6 22:06:45 2016 +0200
Committer: Matt Burgess <mattyb...@apache.org>
Committed: Mon Nov 7 14:01:18 2016 -0500

----------------------------------------------------------------------
 .../repository/StandardProcessSession.java      | 34 ++++++++++++++++++--
 1 file changed, 32 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6aefc0b9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 799cab8..f851fdd 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1250,7 +1250,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         final List<Connection> connections = context.getPollableConnections();
         final int numConnections = connections.size();
         for (int numAttempts = 0; numAttempts < numConnections; numAttempts++) 
{
-            final Connection conn = 
connections.get(context.getNextIncomingConnectionIndex() % connections.size());
+            final Connection conn = 
connections.get(context.getNextIncomingConnectionIndex() % numConnections);
             final Set<FlowFileRecord> expired = new HashSet<>();
             final FlowFileRecord flowFile = 
conn.getFlowFileQueue().poll(expired);
             removeExpired(expired, conn);
@@ -1273,7 +1273,11 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             return Collections.emptyList();
         }
 
-        return get(new QueuePoller() {
+        // get batch of flow files in a round-robin manner
+        final List<Connection> connections = context.getPollableConnections();
+        final Connection connection = 
connections.get(context.getNextIncomingConnectionIndex() % connections.size());
+
+        return get(connection, new QueuePoller() {
             @Override
             public List<FlowFileRecord> poll(final FlowFileQueue queue, final 
Set<FlowFileRecord> expiredRecords) {
                 return queue.poll(new FlowFileFilter() {
@@ -1302,6 +1306,32 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         }, true);
     }
 
+    private List<FlowFile> get(final Connection connection, final QueuePoller 
poller, final boolean lockQueue) {
+        if (lockQueue) {
+            connection.lock();
+        }
+
+        try {
+            final Set<FlowFileRecord> expired = new HashSet<>();
+            final List<FlowFileRecord> newlySelected = 
poller.poll(connection.getFlowFileQueue(), expired);
+            removeExpired(expired, connection);
+
+            if (newlySelected.isEmpty() && expired.isEmpty()) {
+                return new ArrayList<>();
+            }
+
+            for (final FlowFileRecord flowFile : newlySelected) {
+                registerDequeuedRecord(flowFile, connection);
+            }
+
+            return new ArrayList<FlowFile>(newlySelected);
+        } finally {
+            if (lockQueue) {
+                connection.unlock();
+            }
+        }
+    }
+
     private List<FlowFile> get(final QueuePoller poller, final boolean 
lockAllQueues) {
         final List<Connection> connections = context.getPollableConnections();
         if (lockAllQueues) {

Reply via email to