[ 
https://issues.apache.org/jira/browse/NIFI-8469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mark Payne updated NIFI-8469:
-----------------------------
    Fix Version/s: 1.14.0
       Resolution: Fixed
           Status: Resolved  (was: Patch Available)

> Change ProcessSession so that commits are asynchronous
> ------------------------------------------------------
>
>                 Key: NIFI-8469
>                 URL: https://issues.apache.org/jira/browse/NIFI-8469
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Core Framework, Extensions
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>            Priority: Critical
>             Fix For: 1.14.0
>
>          Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Currently, ProcessSession.commit() guarantees that when the method call 
> returns, that all FlowFiles have been persisted to the repositories. As such, 
> it is safe to acknowledge/dispose of the data on the external system.
> For example, a processor that consumes from JMS will consume the message, 
> create a FlowFile from it, and then call ProcessSession.commit(). Only then 
> is it safe to acknowledge the message on the JMS broker. If it is 
> acknowledged prior to calling ProcessSession.commit(), and NiFi is restarted, 
> the data may be lost. But after calling ProcessSession.commit(), it is safe 
> because the content will be available to NiFi upon restart.
> This API has served us well. However, lately there has also been a great deal 
> of work and desire from the community to introduce other "runtimes." One of 
> those is MiNiFi. Another is Stateless NiFi.
> One of the distinguishing features of Stateless NiFi is that it stores 
> content in-memory. This means that fast (and large) disks are not necessary, 
> but it also means that upon restarting the application, all FlowFiles are 
> lost. In order to provide data reliability, Stateless NiFi requires that the 
> source of data be both reliable and replayable (like a JMS Broker or Apache 
> Kafka, for instance). In this way, we can keep content in-memory by avoiding 
> the message acknowledgment until after we've finished processing the message 
> completely. If the application is restarted in the middle, the message will 
> not have been acknowledged and as a result will be replayed, so we maintain 
> our strong at-least-once guarantees.
> Another distinguishing feature of Stateless NiFi is that it is 
> single-threaded. This allows us to be far more scalable and consume few 
> resources.
> This works by allowing ProcessSession.commit() to enqueue data for the next 
> Processor  in the chain and then invoke the next Processor (recursively) 
> before ProcessSession.commit() ever returns.
> Unfortunately, though, some dataflows do not work well with such a model. Any 
> flow that has MergeContent or MergeRecord in the middle will end up in a 
> situation where the Processor never progresses. Take for example, the 
> following dataflow:
> GetFile --> SplitText --> ReplaceText --> MergeContent --> PutS3Object
> In this case, assume that SplitText splits an incoming FlowFile into 10 
> smaller FlowFiles. ReplaceText performs some manipulation. MergeContent is 
> then expected to merge all 10 FlowFiles back into one.
> However, because of the nature of how this works, after SplitText, the queue 
> will have 10 FlowFiles. ReplaceText will then be called, which will consume 
> one FlowFIle, manipulate it, and call ProcessSession.commit(). This will then 
> enqueue the FlowFile for MergeContent. MergeContent will be triggered but 
> will be unable to make progress because it doesn't have enough FlowFiles. The 
> only choice that the framework has is to then call MergeContent again until 
> its entire queue is emptied, but the queue will never empty. As a result, the 
> dataflow will end up in an infinite loop, calling MergeContent, which will 
> make no progress.
> What we really want to do is to call ReplaceText repeated until its queue is 
> empty and only then move on to the next Processor (MergeContent). 
> Unfortunately, this can't really be accomplished with the current semantics, 
> though. If we tried to do so, when ReplaceText is triggered the first time, 
> and it calls ProcessSession.commit(), we would have two choices:
>  * Recursively call ReplaceText.onTrigger(). This very quickly results in a 
> StackOverflowException, so this approach doesn't work well.
>  * Have ProcessSession.commit() block while another thread is responsible for 
> calling ReplaceText.onTrigger(). This results in spawning a new thread for 
> each FlowFile in the queue, which can very quickly exhaust the number of 
> threads, leading to an OutOfMemoryError (or, even worse, depending on 
> system/jvm settings, causing the entire operating system to crash).
> So any approach here is not viable.
> Additionally, any dataflow that has a self-loop such as a failure loop has 
> the same issue as above, resulting in a StackOverflowException.
> The idea here, then, is to deprecate ProcessSession.commit() in favor of a 
> new ProcessSession.commitAsync(). (Perhaps there will be a better name, but 
> we'll refer to it as such for the time being). The differentiator here is 
> that commitAsync() would allow for an optional callback method to be invoked 
> after the session commit completes:
>  * void commitAsync();
>  * void commitAsync(Runnable successCallback);
>  * void commitAsync(Consumer<Throwable> failureCallback);
>  * void commitAsync(Runnable successCallback, Consumer<Throwable> 
> failureCallback);
> Now, for most Processors, there is no need to call ProcessSession.commit() 
> because the abstract parent takes care of it. For those that do call 
> ProcessSession.commit(), it is typically because they need to perform some 
> cleanup action after the commit, such as in the JMS case illustrated above. 
> In this case, the logic would be updated in order to perform the cleanup in 
> the callback.
> So, instead of using logic such as:
> {code:java}
> JMSMessage message = jmsConsumer.consume();
> FlowFile flowFile = createFlowFile(message, session);
> session.commit();
> message.acknowledge();{code}
> The logic should be something more akin to:
> {code:java}
> JMSMessage message = jmsConsumer.consume();
> FlowFile flowFile = createFlowFile(message, session);
> session.commitAsync(message:acknowledge); {code}
> In the case of the traditional NiFi engine, void c{{ommitAsync(Runnable 
> successCallback)}} would be implemented something like:
> {code:java}
> void  commitAsync(Runnable successCallback) {
>     commit();
>     successCallback.run();
> }{code}
> However, in Stateless (or any other engine that may be developed), it would 
> be able to instead simply create a Stack of callbacks, and add the success 
> callback to the stack when session.commit() is called.
> When the entire dataflow has completed, only then it will be able to unwind 
> the stack of callbacks.
> With this approach, it means that in the dataflow described above, 
> ReplaceText can be called continually until its queue is emptied. Then, the 
> next Processor can be called continually until its queue is emptied. Even if 
> the data has been merged, if the process dies or is restarted, the Success 
> Callback of GetFile has not be triggered so the data is still available.
> However, as soon as PutS3Object calls {{ProcessSession.commit()}} (because it 
> is the last processor in the chain and it auto-terminates the FlowFile) the 
> stack of callbacks can be unwound and called. As a result, GetFile's Success 
> Callback is triggered only after successful completion of the entire 
> dataflow. If any processor along the way rolls back the session or throws an 
> uncaught Exception, the entire session is rolled back, ensuring no data loss.
> There are a few other important considerations to take note of:
>  * Because the StandardProcessSession.commitAsync() would behave as described 
> above, just calling commit() and then any provided callbacks, the change in 
> how traditional NiFi operates is quite minimal and therefore fairly low risk. 
> The changes to Stateless NiFi are higher risk but Stateless is generally 
> still considered somewhat experimental and expected to evolve pretty heavily.
>  * This removes a very large barrier to entry for Stateless NiFi, which is 
> that it's very difficult for users to know which flows can and cannot be used 
> in Stateless. With these changes, Stateless should be able to run almost any 
> flow that traditional NiFi can.
>  * This will mean that processors should change to a new API, but for 
> probably > 95% of processors, the changes necessary will be trivial.
>  * The Mock Framework should be updated to cause a failure on a call to 
> ProcessSession.commit() in order to ensure that the new commitAsync() is 
> being used. The TestRunner should enable this by default but allow the 
> requirement for commitAsync() to be disabled by calling something like 
> {{TestRunner.allowSynchronousCommits()}}. Of course, this would only occur 
> when users chose to change to 1.14.0 (or whatever version this is released 
> in) of the Mock Framework. Building against older versions of the Mock 
> Framework would not expose this behavior.
>  * If any Processor does still use {{ProcessSession.commit()}} and is run in 
> Stateless NiFi, the Stateless engine will be required to then trigger the 
> next Processor in the flow before returning from {{ProcessSession.commit()}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to