Jon Kessler created NIFI-6831:
---------------------------------

             Summary: Create a flowfile queue implementation with global data 
priority awareness
                 Key: NIFI-6831
                 URL: https://issues.apache.org/jira/browse/NIFI-6831
             Project: Apache NiFi
          Issue Type: New Feature
          Components: Core Framework
    Affects Versions: 1.11.0
            Reporter: Jon Kessler


There is currently no way to process data in order by priority on a flow-wide 
or global scale. There are several issues with the way sorting by priority 
attribute is currently done in the framework that I believe we can address with 
a new flowfile queue implementation. Those shortcomings are:
 * Scheduling: No consideration is given to data priority when determining 
which component is given the next available thread with which to work
 * Constant sorting: Because all flowfiles in a given connection share the same 
PriorityQueue they must be sorted every time they move. While this sort is 
efficient it can add up as queues grow deep.
 * Administration: There is a costly human element to managing the value used 
as a priority ranking as priorities change. You must also ensure every
 connection in the appropriate flow has the proper prioritizer assigned to it 
to make use of the property.

The design goals of this new priority mechanism and flowfile queue 
implementation are:
 * Instead of using the value of a FlowFile attribute as a ranking, maintain a 
set of expression language rules to define your priorities. The highest ranked 
rule that a given FlowFile satisfies will be that FlowFile's priority
 * Because we have a finite set of priority rules we can utilize a bucket sort 
in our connections. One bucket per priority rule. The bucket/rule with which a 
FlowFile is associated with will be maintained so that as it moves through the 
system we do not have to re-evaluate that Flowfile against our ruleset unless 
we have reason to do so.
 * Control where in your flow FlowFiles are evaluated against the ruleset with 
a new Prioritizer implementation: BucketPrioritizer.
 * When this queue implementation is polled it will be able to check state to 
see if any data of a higher priority than what it currently contains recently 
(within 5s) moved elsewhere in the system. If higher priority data has recently 
moved elsewhere, the connection will only provide a FlowFile X% of the time 
where X is defined along with the rule. This allows higher priority data to 
have more frequent access to threads without thread-starving lower priority 
data.
 * Rules will be managed via a menu option for the flow and changes to them 
take effect instantly. This allows you to change your priorities without 
stopping/editing/restarting various components on the graph.

Additional design considerations:

The sorting function here takes place on insertion into any connection on which 
a BucketPrioritizer is set. Once a FlowFile has been sorted into a bucket we 
maintain that state so that each time it moves into a new connection we already 
know in which bucket it should be placed without needing to have a 
BucketPrioritizer set on that connection. Each bucket in a connection is just a 
FIFO queue so no additional sorting is done. You should only have to configure 
connections to use the BucketPrioritizer at points in your flow where you 
believe you'll have enough information to accurately determine priority but not 
beyond that point unless you want to re-evaluate downstream for some reason. 
There is administration involved in setting these BucketPrioritizers on some 
connections but it should be minimal per flow (sometimes as few as one).

When you delete a rule the next time each FlowFile moves that was already 
associated with that rule it will be re-evaluated against the ruleset when it 
enters the next connection regardless of whether or not a BucketPrioritizer was 
set on that connection. Also FlowFiles that have yet to be evaluated (have yet 
to encounter a BucketPrioritizer) will not be staggered. This was a design 
decision that if we don't know what a priority is for a given FlowFile we 
should get it to that point in the flow as soon as possible. This decision was 
a result of empirical evidence that when we did stagger unevaluated data an 
incoming flow of high priority data slowed its own upstream processing down 
once it was identified and processed as high priority.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to