[ 
https://issues.apache.org/jira/browse/NIFI-6831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Kessler reassigned NIFI-6831:
---------------------------------

    Assignee: Jon Kessler

> Create a flowfile queue implementation with global data priority awareness
> --------------------------------------------------------------------------
>
>                 Key: NIFI-6831
>                 URL: https://issues.apache.org/jira/browse/NIFI-6831
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Core Framework
>    Affects Versions: 1.11.0
>            Reporter: Jon Kessler
>            Assignee: Jon Kessler
>            Priority: Major
>
> There is currently no way to process data in order by priority on a flow-wide 
> or global scale. There are several issues with the way sorting by priority 
> attribute is currently done in the framework that I believe we can address 
> with a new flowfile queue implementation. Those shortcomings are:
>  * Scheduling: No consideration is given to data priority when determining 
> which component is given the next available thread with which to work
>  * Constant sorting: Because all flowfiles in a given connection share the 
> same PriorityQueue they must be sorted every time they move. While this sort 
> is efficient it can add up as queues grow deep.
>  * Administration: There is a costly human element to managing the value used 
> as a priority ranking as priorities change. You must also ensure every
>  connection in the appropriate flow has the proper prioritizer assigned to it 
> to make use of the property.
> The design goals of this new priority mechanism and flowfile queue 
> implementation are:
>  * Instead of using the value of a FlowFile attribute as a ranking, maintain 
> a set of expression language rules to define your priorities. The highest 
> ranked rule that a given FlowFile satisfies will be that FlowFile's priority
>  * Because we have a finite set of priority rules we can utilize a bucket 
> sort in our connections. One bucket per priority rule. The bucket/rule with 
> which a FlowFile is associated with will be maintained so that as it moves 
> through the system we do not have to re-evaluate that Flowfile against our 
> ruleset unless we have reason to do so.
>  * Control where in your flow FlowFiles are evaluated against the ruleset 
> with a new Prioritizer implementation: BucketPrioritizer.
>  * When this queue implementation is polled it will be able to check state to 
> see if any data of a higher priority than what it currently contains recently 
> (within 5s) moved elsewhere in the system. If higher priority data has 
> recently moved elsewhere, the connection will only provide a FlowFile X% of 
> the time where X is defined along with the rule. This allows higher priority 
> data to have more frequent access to threads without thread-starving lower 
> priority data.
>  * Rules will be managed via a menu option for the flow and changes to them 
> take effect instantly. This allows you to change your priorities without 
> stopping/editing/restarting various components on the graph.
> Additional design considerations:
> The sorting function here takes place on insertion into any connection on 
> which a BucketPrioritizer is set. Once a FlowFile has been sorted into a 
> bucket we maintain that state so that each time it moves into a new 
> connection we already know in which bucket it should be placed without 
> needing to have a BucketPrioritizer set on that connection. Each bucket in a 
> connection is just a FIFO queue so no additional sorting is done. You should 
> only have to configure connections to use the BucketPrioritizer at points in 
> your flow where you believe you'll have enough information to accurately 
> determine priority but not beyond that point unless you want to re-evaluate 
> downstream for some reason. There is administration involved in setting these 
> BucketPrioritizers on some connections but it should be minimal per flow 
> (sometimes as few as one).
> When you delete a rule the next time each FlowFile moves that was already 
> associated with that rule it will be re-evaluated against the ruleset when it 
> enters the next connection regardless of whether or not a BucketPrioritizer 
> was set on that connection. Also FlowFiles that have yet to be evaluated 
> (have yet to encounter a BucketPrioritizer) will not be staggered. This was a 
> design decision that if we don't know what a priority is for a given FlowFile 
> we should get it to that point in the flow as soon as possible. This decision 
> was a result of empirical evidence that when we did stagger unevaluated data 
> an incoming flow of high priority data slowed its own upstream processing 
> down once it was identified and processed as high priority.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to