Repository: nifi Updated Branches: refs/heads/master c13cfa6ea -> 6aefc0b91
NIFI-2751 NIFI-2848 Get batch of flow files in a round-robin manner This closes #1111 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6aefc0b9 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6aefc0b9 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6aefc0b9 Branch: refs/heads/master Commit: 6aefc0b9102bc83456ff537ade22946715bdcd30 Parents: c13cfa6 Author: Pierre Villard <pierre.villard...@gmail.com> Authored: Thu Oct 6 22:06:45 2016 +0200 Committer: Matt Burgess <mattyb...@apache.org> Committed: Mon Nov 7 14:01:18 2016 -0500 ---------------------------------------------------------------------- .../repository/StandardProcessSession.java | 34 ++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6aefc0b9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 799cab8..f851fdd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -1250,7 +1250,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final List<Connection> connections = context.getPollableConnections(); final int numConnections = connections.size(); for (int numAttempts = 0; numAttempts < numConnections; numAttempts++) { - final Connection conn = connections.get(context.getNextIncomingConnectionIndex() % connections.size()); + final Connection conn = connections.get(context.getNextIncomingConnectionIndex() % numConnections); final Set<FlowFileRecord> expired = new HashSet<>(); final FlowFileRecord flowFile = conn.getFlowFileQueue().poll(expired); removeExpired(expired, conn); @@ -1273,7 +1273,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return Collections.emptyList(); } - return get(new QueuePoller() { + // get batch of flow files in a round-robin manner + final List<Connection> connections = context.getPollableConnections(); + final Connection connection = connections.get(context.getNextIncomingConnectionIndex() % connections.size()); + + return get(connection, new QueuePoller() { @Override public List<FlowFileRecord> poll(final FlowFileQueue queue, final Set<FlowFileRecord> expiredRecords) { return queue.poll(new FlowFileFilter() { @@ -1302,6 +1306,32 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE }, true); } + private List<FlowFile> get(final Connection connection, final QueuePoller poller, final boolean lockQueue) { + if (lockQueue) { + connection.lock(); + } + + try { + final Set<FlowFileRecord> expired = new HashSet<>(); + final List<FlowFileRecord> newlySelected = poller.poll(connection.getFlowFileQueue(), expired); + removeExpired(expired, connection); + + if (newlySelected.isEmpty() && expired.isEmpty()) { + return new ArrayList<>(); + } + + for (final FlowFileRecord flowFile : newlySelected) { + registerDequeuedRecord(flowFile, connection); + } + + return new ArrayList<FlowFile>(newlySelected); + } finally { + if (lockQueue) { + connection.unlock(); + } + } + } + private List<FlowFile> get(final QueuePoller poller, final boolean lockAllQueues) { final List<Connection> connections = context.getPollableConnections(); if (lockAllQueues) {