markap14 commented on PR #10769:
URL: https://github.com/apache/nifi/pull/10769#issuecomment-3855308089

   Hey @pvillard31 looking through this, I'm not 100% sure, because this 
Processor/CS are a bit complex. But I *think* the approach here could lead to 
data loss. Specifically, it is crucial that we do NOT commit any offsets to 
Kafka until after the Processor has called `ProcessSesison.commitAsync` AND the 
callback is called. Otherwise, it is possible that the following series of 
events occurs:
   - Processor receives events but hasn't written them out
   - Rebalance occurs and the offsets get committed
   - Processor fails to serialize them because Content Repo is out of disk 
space or something like that
   - Now we've already committed the offsets and the data is lost, we can't 
replay.
   
   Or, similarly, we write the records just fine but session commit fails:
   - Processor receives events and writes them to the FlowFile
   - Rebalance occurs and the offsets get committed
   - Processor commits session but session commit fails due to FlowFile 
Repository out of disk space
   - Now we've already committed the offsets and the data is lost, we can't 
replay.
   
   Or, a restart / process dies:
   - Processor receives events and writes them to the FlowFile
   - Rebalance occurs and the offsets get committed
   - Process is killed before session commit happens
   - Now we've already committed the offsets and the data is lost, we can't 
replay.
   
   You did note above in the *Important Note* that it may make sense to add 
some more coupling here where the Processor is made known of a rebalance, but I 
think it is absolutely required in order to commit the offsets on rebalance. I 
think we could avoid the callback mechanism, though, as that is definitely 
messy. Rather, the Processor is already polling the service. We could have the 
poll() get interrupted in the case of a rebalance, and we could have the 
Processor always checking before a call to `poll()` whether or not a rebalance 
has occurred. If so, it would be responsible for committing the session and 
only then, upon successful session commit, it could trigger the Kafka Offset 
commit.
   
   Please let me know if I'm missing something here, but I think this approach 
basically trades the potential of duplicates in favor of the potential for data 
loss, and we always want to prefer duplicates over data loss.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to