Hi Mark, 

Thank's a lot for your explanations. 

Sad to hear that NiFi doesn't support Exactly-Once out-of-the-box, but totally 
understandable. 

I think, your argumentation is not fully complete though. I might be wrong 
here, but the following is my understanding: 
You write that Kafka EOS works in a way that a transaction is fully handled by 
the producer. The producer _atomically_ takes care of committing source offsets 
and committing produced messages ( And in kafka streams also commits internal 
streaming state as well ). You conclucde that Kafka-EOS only apply when a given 
kafka cluster is both source and destination of the data. That's the 
understanding I also got when I first read the confluent blog series about 
exactly once processing. What confluent does in those posts: They explicitly 
describe how exactly once semantics are implemented in a kafka streaming 
application (reading from kafka and writing to the same kafka). Of course, if 
they are in "their own world", controlling source, state and sink, they can be 
more efficient with transactions and exactly once guarantees and that's what 
they describe. But the transaction concept of kafka applies to a broader range 
of tools (at least if one doesn't expect _atomic_ updates of source read and 
sink written simultaenously. But I also don't know why someone would need that) 
. 
When I read more about Apache Flink, I understood that the exactly once 
semantics of Kafka are not bound to processing in the kafka domain only. Kafka 
Producers implement a two-Phase-Commit protocol and one can utilize that with 
his own application. Apache Flink and Apache Spark both promise that. If one 
has a replayble source in Spark or Flink (Like a kafka topic) and one has a 
sink that is either idempotent or supports two-phase-commits, both Flink and 
Spark provide exactly once semantics on demand (Of course, your processing job 
must be free of side-effects). See [1] for instance. 

With the two-phase-commit sink, in the end, it comes down that you kind of need 
to manage your entire state in one system. In Kafka-Streams, they manage all 
their state in kafka (and committing atomically to that). In Flink, it is 
managed in the configured state backend, which can be a standard filesystem, 
HDFS, s3, whatever... 
Let's take a look at a very simple example reading from a Kafka cluster, 
writing to another kafka cluster with a SQL DB in the middle as state store. 
(Of course that's not performant): 
1. I can read messages from kafka and write those to a SQL DB. If I manage my 
consumer offsets myself within the SQL DB where I store the kafka records as 
well, I can easily make exactly once guarantees. Either I commit a transaction 
(with new consumed records and offsets) atomically, or I abort it. If something 
fails, I redo the process (Hence replayable source needed) 
2. More tricky: I read records from the DB, and within another DB transaction, 
I write a batch of messages to kafka (in a kafka transaction) and if Kafka 
succeeds the pre-commit, I commit my SQL DB transaction with the required infos 
(transactional id, producer id, transaction epoch). With some more 
communications with the DB, eventually I end up with exactly once semantics 
from DB to Kafka. (Kafka transactions with two phase commit. If something 
crashes after the pre-commit, I can restart the producer, reassign 
transactional id, resumue transaction with producerId and epoch and retry the 
commit until it eventually succeeds). 
In summary, I designed end-to-end exactly once from one kafka cluster to 
another kafka cluster. [If we want to introduce some new naming here, we could 
call that eventual end-to-end exactly once :) Source and Sink not updated 
simultaenously as in Kafka (atomic), but utilized some intermediate store to 
track progress]. 
Of course, the devil is in the detail and the implementation of such logic is 
very tricky. One has to take care of lots of possible failure cases, but 
technically it's possible. If that's technically possible within the current 
NiFi architecture is another question though which I can't answer as I don't 
have enough understanding of NiFi. For instance, I think for two phase commits, 
one needs some kind of "coordinator". In Flink, that's the Jobmanager. In NiFi, 
I don't know if there is such thing. I just wanted to add my two cents, that in 
general, kafka can provide exactly once semantics even when not fully operating 
withing a single kafka cluster only but having some other compatible sources 
and/or sinks involved. 

[1] [ 
https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
 | 
https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
 ] 

Best regards 
Theo 


Von: "markap14" <[email protected]> 
An: "Theo Diefenthal" <[email protected]> 
CC: "users" <[email protected]> 
Gesendet: Sonntag, 3. Oktober 2021 16:04:26 
Betreff: Re: Does NiFi provide end-to-end exactly once semantics with kafka 
sink and replayable source? 

Hey Theo, 

So there’s quite a lot to unpack here. Possibly entire books could be written 
on some of this stuff. And there’s a lot that is misunderstood about Apache 
Kafka’s exactly once semantics (EOS). In fact, I’m working on a YouTube video 
and blog that will cover a lot of this in more detail. 

But the super short version of it is: 

- Apache NiFi does not currently support Kafka's Exactly Once, even though it 
does support Kafka transactions 
- I don’t know a lot about a Flink. But the way that Kafka’s EOS works is that 
offsets that are consumed by a consumer are then committed not by the consumer 
but rather a Kafka Producer. As a result, if there’s some failure or 
cancellation, the transaction is never completed and everything is rolled back. 
This is what provides the Exactly Once Semantics. Because of that, Exactly Once 
semantics ONLY apply when a given Kafka cluster is both the source of the data 
AND the destination of the data. You cannot guarantee Exactly Once when sending 
a file from a file system. This is discussed more at [1]. There’s a diagram 
there that shows this as the “wild west”. 

In terms of what about NiFi’s architecture causes conflicts: 
- NiFi’s components are, by design, loosely coupled. Processor A doesn’t know 
anything about Processor B. This was done for many reasons and most of the time 
is exactly what we want and has a lot of advantages. But in order to play in 
Kafka’s EOS world, you need a tighter coupling - if processing fails or the 
publisher can’t send data back, the consumer needs to know to rollback its 
offsets. 
- Secondly, NiFi persists the data across restarts. If NiFi is restarted, 
without the transaction being committed, it would result in NiFi pulling a 
second copy of the data on restart, but the data that already was consumed will 
still be processed, so you’d end up with duplicate (at least once). 
- Finally, we need to take into consideration data ordering guarantees. 
Specifically, if we have two FlowFiles, one with Kafka offset 10 and one with 
Kafka offset of 100, then we must NOT send the data produced from offset 100 in 
a transaction before a different transaction sends the data from offset 10. 
This becomes difficult to guarantee with NiFi, since the dataflow may split 
data apart, filter some, introduce many branches that take a long time to 
complete, etc. 

With the upcoming changes in 1.15.0 (assuming all goes as planned and that the 
proposed components are all merged, etc.) we will introduce the capabilities 
necessary. We’ll have the ability to run a dataflow using the Stateless NiFi 
engine much more easily, and as I said I’ll be putting together a YouTube Video 
and/or blog that demonstrates all of this. 

Hope this is helpful! 
-Mark 

[1] [ https://www.confluent.io/blog/chain-services-exactly-guarantees/ | 
https://www.confluent.io/blog/chain-services-exactly-guarantees/ ] 





On Oct 2, 2021, at 8:37 PM, Theo Diefenthal < [ 
mailto:[email protected] | [email protected] ] 
> wrote: 

Hi there, 

At the moment, I'm wondering whether NiFi provides end-to-end exactly once 
semantics in a place where it should in general be possible (speaking not in 
terms of NiFi but from a technical viewpoint). My example: Reading a file from 
filesystem and write the contents of that file to Kafka. Apache Flink for 
instance can provide this guarantee when having checkpoints enabled and setting 
up the producer in an exactly once mode (i.e. enable transactions) 

With regards to NiFi, I read some unclear statements: 

- In the NiFi docs [1], I found nothing with regards to exactly-once but only a 
statement letting me decude at-least-once delivery: "A core philosophy of NiFi 
has been that even at very high scale, guaranteed delivery is a must. This is 
achieved through effective use of a purpose-built persistent write-ahead log 
and content repository." 
- In a NiFi crash course from 2018 by HortonWorks [2], I found a slide with 
challenges for a DataFlow systems [at time 08:43] where the ""Exactly once" 
delivery" problem is mentioned twice. As this is a crash course of and 
advertising NiFi, I think I can/should deduce from this slide that NiFi 
addresses and solves that exactly-once challenge for me. 
- In the book "Practical Real-time Data Processing and Analytics" from 2017 by 
Shilpi Saxena und Saurabh Gupta, it is written: "Let's start with NiFi. It is a 
guaranteed delivery processing engine (exactly once) by default, which 
maintains write-ahead logs and a content repository to achieve this." 
- In [3], back in 2016 where Kafka didn't yet released version 0.11 which 
allowed for the first time end-to-end exactly-once semantics in producing, Mark 
Payne as a lead developer of NiFi wrote that " NiFi generally will guarantee At 
Least Once delivery of your data ", which is in sync with the statements of the 
current NiFi docs I'd say. But he also wrote that in general, exactly once 
semantics can be achieved for a distributed system if the source is replayable 
and the sink can go along with that (he explicitly mentions deduplication, but 
I think from a todays perspective, a sink supporting a two phase commit (like 
kafka) should in general work as well as Flink demonstrates). 
- In [4], in 2017 some community user wrote a knowledge article where he 
explicitly mentions Two-Phase-Commits in NiFi for Kafka and NiFi Site-to-Site 
communications which according to his words still provide only at-least-once 
semantics (But very robust ones, "close" to exactly-once). 
- A few days ago [5], Mark Payne created a JIRA issue for a new feature 
implemented in upcoming NiFi 1.15 which will allow "Exactly Once Semantics" 
(EOS) for stateless pipelines e.g. from Kafka to Kafka in NiFi stateless mode. 
In the introduction of that story, he talks about "While there are benefits to 
being able to do so [Implementing exactly once in standard NiFi], the 
requirements that Kafka puts forth don't really work well with NiFi's 
architecture.". So from that statement, I'd deduce that the current NiFi 
doesn't have exactly once semantics and that it sadly doesn't fit well into 
NiFi architecture. 

In summary, it seems that general NiFi (not upcoming 1.15 stateless) doesn't 
support (End-to-End) Exactly-Once-Semantics, am I right? Why doesn't it fit 
well to the architecutre of NiFi? From my limited understanding of NiFi (I am 
currently evaluating it playing around with it for the first time), it should 
in general be possible. Each processor works with transactions which can be 
rolled backed or committed and being persisted to a write ahead log. So if we 
don't face a hardware disk failure losing the hard drive NiFi is running on, 
exactly-once semantics should be possible between from and to each processor 
and hence, with transactions/two-phase-commit in the kafka producer, fully end 
to end exactly once (semantics)?! 

Best regards 
Theo 

[1] [ https://nifi.apache.org/docs/nifi-docs/html/overview.html | 
https://nifi.apache.org/docs/nifi-docs/html/overview.html ] 
[2] [ https://www.youtube.com/watch?v=fblkgr1PJ0o | 
https://www.youtube.com/watch?v=fblkgr1PJ0o ] 
[3] [ 
https://community.cloudera.com/t5/Support-Questions/Can-nifi-promise-each-of-the-flowfiles-can-be-processed/m-p/141655
 | 
https://community.cloudera.com/t5/Support-Questions/Can-nifi-promise-each-of-the-flowfiles-can-be-processed/m-p/141655
 ] 
[4] [ 
https://community.cloudera.com/t5/Community-Articles/At-least-once-delivery-vs-exactly-once-delivery-semantics-in/ta-p/244688
 | 
https://community.cloudera.com/t5/Community-Articles/At-least-once-delivery-vs-exactly-once-delivery-semantics-in/ta-p/244688
 ] 
[5] [ https://issues.apache.org/jira/browse/NIFI-9239 | 
https://issues.apache.org/jira/browse/NIFI-9239 ] 



Reply via email to