Hey Theo,

So there’s quite a lot to unpack here. Possibly entire books could be written 
on some of this stuff. And there’s a lot that is misunderstood about Apache 
Kafka’s exactly once semantics (EOS). In fact, I’m working on a YouTube video 
and blog that will cover a lot of this in more detail.

But the super short version of it is:

- Apache NiFi does not currently support Kafka's Exactly Once, even though it 
does support Kafka transactions
- I don’t know a lot about a Flink. But the way that Kafka’s EOS works is that 
offsets that are consumed by a consumer are then committed not by the consumer 
but rather a Kafka Producer. As a result, if there’s some failure or 
cancellation, the transaction is never completed and everything is rolled back. 
This is what provides the Exactly Once Semantics. Because of that, Exactly Once 
semantics ONLY apply when a given Kafka cluster is both the source of the data 
AND the destination of the data. You cannot guarantee Exactly Once when sending 
a file from a file system. This is discussed more at [1]. There’s a diagram 
there that shows this as the “wild west”.

In terms of what about NiFi’s architecture causes conflicts:
- NiFi’s components are, by design, loosely coupled. Processor A doesn’t know 
anything about Processor B. This was done for many reasons and most of the time 
is exactly what we want and has a lot of advantages. But in order to play in 
Kafka’s EOS world, you need a tighter coupling - if processing fails or the 
publisher can’t send data back, the consumer needs to know to rollback its 
offsets.
- Secondly, NiFi persists the data across restarts. If NiFi is restarted, 
without the transaction being committed, it would result in NiFi pulling a 
second copy of the data on restart, but the data that already was consumed will 
still be processed, so you’d end up with duplicate (at least once).
- Finally, we need to take into consideration data ordering guarantees. 
Specifically, if we have two FlowFiles, one with Kafka offset 10 and one with 
Kafka offset of 100, then we must NOT send the data produced from offset 100 in 
a transaction before a different transaction sends the data from offset 10. 
This becomes difficult to guarantee with NiFi, since the dataflow may split 
data apart, filter some, introduce many branches that take a long time to 
complete, etc.

With the upcoming changes in 1.15.0 (assuming all goes as planned and that the 
proposed components are all merged, etc.) we will introduce the capabilities 
necessary. We’ll have the ability to run a dataflow using the Stateless NiFi 
engine much more easily, and as I said I’ll be putting together a YouTube Video 
and/or blog that demonstrates all of this.

Hope this is helpful!
-Mark

[1] https://www.confluent.io/blog/chain-services-exactly-guarantees/


On Oct 2, 2021, at 8:37 PM, Theo Diefenthal 
<[email protected]<mailto:[email protected]>> 
wrote:

Hi there,

At the moment, I'm wondering whether NiFi provides end-to-end exactly once 
semantics in a place where it should in general be possible (speaking not in 
terms of NiFi but from a technical viewpoint). My example: Reading a file from 
filesystem and write the contents of that file to Kafka. Apache Flink for 
instance can provide this guarantee when having checkpoints enabled and setting 
up the producer in an exactly once mode (i.e. enable transactions)

With regards to NiFi, I read some unclear statements:

- In the NiFi docs [1], I found nothing with regards to exactly-once but only a 
statement letting me decude at-least-once delivery: "A core philosophy of NiFi 
has been that even at very high scale, guaranteed delivery is a must. This is 
achieved through effective use of a purpose-built persistent write-ahead log 
and content repository."
- In a NiFi crash course from 2018 by HortonWorks [2], I found a slide with 
challenges for a DataFlow systems [at time 08:43] where the ""Exactly once" 
delivery" problem is mentioned twice. As this is a crash course of and 
advertising NiFi, I think I can/should deduce from this slide that NiFi 
addresses and solves that exactly-once challenge for me.
- In the book "Practical Real-time Data Processing and Analytics" from 2017 by 
Shilpi Saxena und Saurabh Gupta, it is written: "Let's start with NiFi. It is a 
guaranteed delivery processing engine (exactly once) by default, which 
maintains write-ahead logs and a content repository to achieve this."
- In [3], back in 2016 where Kafka didn't yet released version 0.11 which 
allowed for the first time end-to-end exactly-once semantics in producing, Mark 
Payne as a lead developer of NiFi wrote that " NiFi generally will guarantee At 
Least Once delivery of your data ", which is in sync with the statements of the 
current NiFi docs I'd say. But he also wrote that in general, exactly once 
semantics can be achieved for a distributed system if the source is replayable 
and the sink can go along with that (he explicitly mentions deduplication, but 
I think from a todays perspective, a sink supporting a two phase commit (like 
kafka) should in general work as well as Flink demonstrates).
- In [4], in 2017 some community user wrote a knowledge article where he 
explicitly mentions Two-Phase-Commits in NiFi for Kafka and NiFi Site-to-Site 
communications which according to his words still provide only at-least-once 
semantics (But very robust ones, "close" to exactly-once).
- A few days ago [5], Mark Payne created a JIRA issue for a new feature 
implemented in upcoming NiFi 1.15 which will allow "Exactly Once Semantics" 
(EOS) for stateless pipelines e.g. from Kafka to Kafka in NiFi stateless mode. 
In the introduction of that story, he talks about "While there are benefits to 
being able to do so [Implementing exactly once in standard NiFi], the 
requirements that Kafka puts forth don't really work well with NiFi's 
architecture.". So from that statement, I'd deduce that the current NiFi 
doesn't have exactly once semantics and that it sadly doesn't fit well into 
NiFi architecture.

In summary, it seems that general NiFi (not upcoming 1.15 stateless) doesn't 
support (End-to-End) Exactly-Once-Semantics, am I right? Why doesn't it fit 
well to the architecutre of NiFi? From my limited understanding of NiFi (I am 
currently evaluating it playing around with it for the first time), it should 
in general be possible. Each processor works with transactions which can be 
rolled backed or committed and being persisted to a write ahead log. So if we 
don't face a hardware disk failure losing the hard drive NiFi is running on, 
exactly-once semantics should be possible between from and to each processor 
and hence, with transactions/two-phase-commit in the kafka producer, fully end 
to end exactly once (semantics)?!

Best regards
Theo

[1] https://nifi.apache.org/docs/nifi-docs/html/overview.html
[2] https://www.youtube.com/watch?v=fblkgr1PJ0o
[3] 
https://community.cloudera.com/t5/Support-Questions/Can-nifi-promise-each-of-the-flowfiles-can-be-processed/m-p/141655
[4] 
https://community.cloudera.com/t5/Community-Articles/At-least-once-delivery-vs-exactly-once-delivery-semantics-in/ta-p/244688
[5] https://issues.apache.org/jira/browse/NIFI-9239

Reply via email to