[ https://issues.apache.org/jira/browse/NIFI-6831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jon Kessler reassigned NIFI-6831: --------------------------------- Assignee: Jon Kessler > Create a flowfile queue implementation with global data priority awareness > -------------------------------------------------------------------------- > > Key: NIFI-6831 > URL: https://issues.apache.org/jira/browse/NIFI-6831 > Project: Apache NiFi > Issue Type: New Feature > Components: Core Framework > Affects Versions: 1.11.0 > Reporter: Jon Kessler > Assignee: Jon Kessler > Priority: Major > > There is currently no way to process data in order by priority on a flow-wide > or global scale. There are several issues with the way sorting by priority > attribute is currently done in the framework that I believe we can address > with a new flowfile queue implementation. Those shortcomings are: > * Scheduling: No consideration is given to data priority when determining > which component is given the next available thread with which to work > * Constant sorting: Because all flowfiles in a given connection share the > same PriorityQueue they must be sorted every time they move. While this sort > is efficient it can add up as queues grow deep. > * Administration: There is a costly human element to managing the value used > as a priority ranking as priorities change. You must also ensure every > connection in the appropriate flow has the proper prioritizer assigned to it > to make use of the property. > The design goals of this new priority mechanism and flowfile queue > implementation are: > * Instead of using the value of a FlowFile attribute as a ranking, maintain > a set of expression language rules to define your priorities. The highest > ranked rule that a given FlowFile satisfies will be that FlowFile's priority > * Because we have a finite set of priority rules we can utilize a bucket > sort in our connections. One bucket per priority rule. The bucket/rule with > which a FlowFile is associated with will be maintained so that as it moves > through the system we do not have to re-evaluate that Flowfile against our > ruleset unless we have reason to do so. > * Control where in your flow FlowFiles are evaluated against the ruleset > with a new Prioritizer implementation: BucketPrioritizer. > * When this queue implementation is polled it will be able to check state to > see if any data of a higher priority than what it currently contains recently > (within 5s) moved elsewhere in the system. If higher priority data has > recently moved elsewhere, the connection will only provide a FlowFile X% of > the time where X is defined along with the rule. This allows higher priority > data to have more frequent access to threads without thread-starving lower > priority data. > * Rules will be managed via a menu option for the flow and changes to them > take effect instantly. This allows you to change your priorities without > stopping/editing/restarting various components on the graph. > Additional design considerations: > The sorting function here takes place on insertion into any connection on > which a BucketPrioritizer is set. Once a FlowFile has been sorted into a > bucket we maintain that state so that each time it moves into a new > connection we already know in which bucket it should be placed without > needing to have a BucketPrioritizer set on that connection. Each bucket in a > connection is just a FIFO queue so no additional sorting is done. You should > only have to configure connections to use the BucketPrioritizer at points in > your flow where you believe you'll have enough information to accurately > determine priority but not beyond that point unless you want to re-evaluate > downstream for some reason. There is administration involved in setting these > BucketPrioritizers on some connections but it should be minimal per flow > (sometimes as few as one). > When you delete a rule the next time each FlowFile moves that was already > associated with that rule it will be re-evaluated against the ruleset when it > enters the next connection regardless of whether or not a BucketPrioritizer > was set on that connection. Also FlowFiles that have yet to be evaluated > (have yet to encounter a BucketPrioritizer) will not be staggered. This was a > design decision that if we don't know what a priority is for a given FlowFile > we should get it to that point in the flow as soon as possible. This decision > was a result of empirical evidence that when we did stagger unevaluated data > an incoming flow of high priority data slowed its own upstream processing > down once it was identified and processed as high priority. -- This message was sent by Atlassian Jira (v8.3.4#803005)