[ https://issues.apache.org/jira/browse/NIFI-8469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mark Payne updated NIFI-8469: ----------------------------- Fix Version/s: 1.14.0 Resolution: Fixed Status: Resolved (was: Patch Available) > Change ProcessSession so that commits are asynchronous > ------------------------------------------------------ > > Key: NIFI-8469 > URL: https://issues.apache.org/jira/browse/NIFI-8469 > Project: Apache NiFi > Issue Type: New Feature > Components: Core Framework, Extensions > Reporter: Mark Payne > Assignee: Mark Payne > Priority: Critical > Fix For: 1.14.0 > > Time Spent: 4h 10m > Remaining Estimate: 0h > > Currently, ProcessSession.commit() guarantees that when the method call > returns, that all FlowFiles have been persisted to the repositories. As such, > it is safe to acknowledge/dispose of the data on the external system. > For example, a processor that consumes from JMS will consume the message, > create a FlowFile from it, and then call ProcessSession.commit(). Only then > is it safe to acknowledge the message on the JMS broker. If it is > acknowledged prior to calling ProcessSession.commit(), and NiFi is restarted, > the data may be lost. But after calling ProcessSession.commit(), it is safe > because the content will be available to NiFi upon restart. > This API has served us well. However, lately there has also been a great deal > of work and desire from the community to introduce other "runtimes." One of > those is MiNiFi. Another is Stateless NiFi. > One of the distinguishing features of Stateless NiFi is that it stores > content in-memory. This means that fast (and large) disks are not necessary, > but it also means that upon restarting the application, all FlowFiles are > lost. In order to provide data reliability, Stateless NiFi requires that the > source of data be both reliable and replayable (like a JMS Broker or Apache > Kafka, for instance). In this way, we can keep content in-memory by avoiding > the message acknowledgment until after we've finished processing the message > completely. If the application is restarted in the middle, the message will > not have been acknowledged and as a result will be replayed, so we maintain > our strong at-least-once guarantees. > Another distinguishing feature of Stateless NiFi is that it is > single-threaded. This allows us to be far more scalable and consume few > resources. > This works by allowing ProcessSession.commit() to enqueue data for the next > Processor in the chain and then invoke the next Processor (recursively) > before ProcessSession.commit() ever returns. > Unfortunately, though, some dataflows do not work well with such a model. Any > flow that has MergeContent or MergeRecord in the middle will end up in a > situation where the Processor never progresses. Take for example, the > following dataflow: > GetFile --> SplitText --> ReplaceText --> MergeContent --> PutS3Object > In this case, assume that SplitText splits an incoming FlowFile into 10 > smaller FlowFiles. ReplaceText performs some manipulation. MergeContent is > then expected to merge all 10 FlowFiles back into one. > However, because of the nature of how this works, after SplitText, the queue > will have 10 FlowFiles. ReplaceText will then be called, which will consume > one FlowFIle, manipulate it, and call ProcessSession.commit(). This will then > enqueue the FlowFile for MergeContent. MergeContent will be triggered but > will be unable to make progress because it doesn't have enough FlowFiles. The > only choice that the framework has is to then call MergeContent again until > its entire queue is emptied, but the queue will never empty. As a result, the > dataflow will end up in an infinite loop, calling MergeContent, which will > make no progress. > What we really want to do is to call ReplaceText repeated until its queue is > empty and only then move on to the next Processor (MergeContent). > Unfortunately, this can't really be accomplished with the current semantics, > though. If we tried to do so, when ReplaceText is triggered the first time, > and it calls ProcessSession.commit(), we would have two choices: > * Recursively call ReplaceText.onTrigger(). This very quickly results in a > StackOverflowException, so this approach doesn't work well. > * Have ProcessSession.commit() block while another thread is responsible for > calling ReplaceText.onTrigger(). This results in spawning a new thread for > each FlowFile in the queue, which can very quickly exhaust the number of > threads, leading to an OutOfMemoryError (or, even worse, depending on > system/jvm settings, causing the entire operating system to crash). > So any approach here is not viable. > Additionally, any dataflow that has a self-loop such as a failure loop has > the same issue as above, resulting in a StackOverflowException. > The idea here, then, is to deprecate ProcessSession.commit() in favor of a > new ProcessSession.commitAsync(). (Perhaps there will be a better name, but > we'll refer to it as such for the time being). The differentiator here is > that commitAsync() would allow for an optional callback method to be invoked > after the session commit completes: > * void commitAsync(); > * void commitAsync(Runnable successCallback); > * void commitAsync(Consumer<Throwable> failureCallback); > * void commitAsync(Runnable successCallback, Consumer<Throwable> > failureCallback); > Now, for most Processors, there is no need to call ProcessSession.commit() > because the abstract parent takes care of it. For those that do call > ProcessSession.commit(), it is typically because they need to perform some > cleanup action after the commit, such as in the JMS case illustrated above. > In this case, the logic would be updated in order to perform the cleanup in > the callback. > So, instead of using logic such as: > {code:java} > JMSMessage message = jmsConsumer.consume(); > FlowFile flowFile = createFlowFile(message, session); > session.commit(); > message.acknowledge();{code} > The logic should be something more akin to: > {code:java} > JMSMessage message = jmsConsumer.consume(); > FlowFile flowFile = createFlowFile(message, session); > session.commitAsync(message:acknowledge); {code} > In the case of the traditional NiFi engine, void c{{ommitAsync(Runnable > successCallback)}} would be implemented something like: > {code:java} > void commitAsync(Runnable successCallback) { > commit(); > successCallback.run(); > }{code} > However, in Stateless (or any other engine that may be developed), it would > be able to instead simply create a Stack of callbacks, and add the success > callback to the stack when session.commit() is called. > When the entire dataflow has completed, only then it will be able to unwind > the stack of callbacks. > With this approach, it means that in the dataflow described above, > ReplaceText can be called continually until its queue is emptied. Then, the > next Processor can be called continually until its queue is emptied. Even if > the data has been merged, if the process dies or is restarted, the Success > Callback of GetFile has not be triggered so the data is still available. > However, as soon as PutS3Object calls {{ProcessSession.commit()}} (because it > is the last processor in the chain and it auto-terminates the FlowFile) the > stack of callbacks can be unwound and called. As a result, GetFile's Success > Callback is triggered only after successful completion of the entire > dataflow. If any processor along the way rolls back the session or throws an > uncaught Exception, the entire session is rolled back, ensuring no data loss. > There are a few other important considerations to take note of: > * Because the StandardProcessSession.commitAsync() would behave as described > above, just calling commit() and then any provided callbacks, the change in > how traditional NiFi operates is quite minimal and therefore fairly low risk. > The changes to Stateless NiFi are higher risk but Stateless is generally > still considered somewhat experimental and expected to evolve pretty heavily. > * This removes a very large barrier to entry for Stateless NiFi, which is > that it's very difficult for users to know which flows can and cannot be used > in Stateless. With these changes, Stateless should be able to run almost any > flow that traditional NiFi can. > * This will mean that processors should change to a new API, but for > probably > 95% of processors, the changes necessary will be trivial. > * The Mock Framework should be updated to cause a failure on a call to > ProcessSession.commit() in order to ensure that the new commitAsync() is > being used. The TestRunner should enable this by default but allow the > requirement for commitAsync() to be disabled by calling something like > {{TestRunner.allowSynchronousCommits()}}. Of course, this would only occur > when users chose to change to 1.14.0 (or whatever version this is released > in) of the Mock Framework. Building against older versions of the Mock > Framework would not expose this behavior. > * If any Processor does still use {{ProcessSession.commit()}} and is run in > Stateless NiFi, the Stateless engine will be required to then trigger the > next Processor in the flow before returning from {{ProcessSession.commit()}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)