Hey Theo, So there’s quite a lot to unpack here. Possibly entire books could be written on some of this stuff. And there’s a lot that is misunderstood about Apache Kafka’s exactly once semantics (EOS). In fact, I’m working on a YouTube video and blog that will cover a lot of this in more detail.
But the super short version of it is: - Apache NiFi does not currently support Kafka's Exactly Once, even though it does support Kafka transactions - I don’t know a lot about a Flink. But the way that Kafka’s EOS works is that offsets that are consumed by a consumer are then committed not by the consumer but rather a Kafka Producer. As a result, if there’s some failure or cancellation, the transaction is never completed and everything is rolled back. This is what provides the Exactly Once Semantics. Because of that, Exactly Once semantics ONLY apply when a given Kafka cluster is both the source of the data AND the destination of the data. You cannot guarantee Exactly Once when sending a file from a file system. This is discussed more at [1]. There’s a diagram there that shows this as the “wild west”. In terms of what about NiFi’s architecture causes conflicts: - NiFi’s components are, by design, loosely coupled. Processor A doesn’t know anything about Processor B. This was done for many reasons and most of the time is exactly what we want and has a lot of advantages. But in order to play in Kafka’s EOS world, you need a tighter coupling - if processing fails or the publisher can’t send data back, the consumer needs to know to rollback its offsets. - Secondly, NiFi persists the data across restarts. If NiFi is restarted, without the transaction being committed, it would result in NiFi pulling a second copy of the data on restart, but the data that already was consumed will still be processed, so you’d end up with duplicate (at least once). - Finally, we need to take into consideration data ordering guarantees. Specifically, if we have two FlowFiles, one with Kafka offset 10 and one with Kafka offset of 100, then we must NOT send the data produced from offset 100 in a transaction before a different transaction sends the data from offset 10. This becomes difficult to guarantee with NiFi, since the dataflow may split data apart, filter some, introduce many branches that take a long time to complete, etc. With the upcoming changes in 1.15.0 (assuming all goes as planned and that the proposed components are all merged, etc.) we will introduce the capabilities necessary. We’ll have the ability to run a dataflow using the Stateless NiFi engine much more easily, and as I said I’ll be putting together a YouTube Video and/or blog that demonstrates all of this. Hope this is helpful! -Mark [1] https://www.confluent.io/blog/chain-services-exactly-guarantees/ On Oct 2, 2021, at 8:37 PM, Theo Diefenthal <[email protected]<mailto:[email protected]>> wrote: Hi there, At the moment, I'm wondering whether NiFi provides end-to-end exactly once semantics in a place where it should in general be possible (speaking not in terms of NiFi but from a technical viewpoint). My example: Reading a file from filesystem and write the contents of that file to Kafka. Apache Flink for instance can provide this guarantee when having checkpoints enabled and setting up the producer in an exactly once mode (i.e. enable transactions) With regards to NiFi, I read some unclear statements: - In the NiFi docs [1], I found nothing with regards to exactly-once but only a statement letting me decude at-least-once delivery: "A core philosophy of NiFi has been that even at very high scale, guaranteed delivery is a must. This is achieved through effective use of a purpose-built persistent write-ahead log and content repository." - In a NiFi crash course from 2018 by HortonWorks [2], I found a slide with challenges for a DataFlow systems [at time 08:43] where the ""Exactly once" delivery" problem is mentioned twice. As this is a crash course of and advertising NiFi, I think I can/should deduce from this slide that NiFi addresses and solves that exactly-once challenge for me. - In the book "Practical Real-time Data Processing and Analytics" from 2017 by Shilpi Saxena und Saurabh Gupta, it is written: "Let's start with NiFi. It is a guaranteed delivery processing engine (exactly once) by default, which maintains write-ahead logs and a content repository to achieve this." - In [3], back in 2016 where Kafka didn't yet released version 0.11 which allowed for the first time end-to-end exactly-once semantics in producing, Mark Payne as a lead developer of NiFi wrote that " NiFi generally will guarantee At Least Once delivery of your data ", which is in sync with the statements of the current NiFi docs I'd say. But he also wrote that in general, exactly once semantics can be achieved for a distributed system if the source is replayable and the sink can go along with that (he explicitly mentions deduplication, but I think from a todays perspective, a sink supporting a two phase commit (like kafka) should in general work as well as Flink demonstrates). - In [4], in 2017 some community user wrote a knowledge article where he explicitly mentions Two-Phase-Commits in NiFi for Kafka and NiFi Site-to-Site communications which according to his words still provide only at-least-once semantics (But very robust ones, "close" to exactly-once). - A few days ago [5], Mark Payne created a JIRA issue for a new feature implemented in upcoming NiFi 1.15 which will allow "Exactly Once Semantics" (EOS) for stateless pipelines e.g. from Kafka to Kafka in NiFi stateless mode. In the introduction of that story, he talks about "While there are benefits to being able to do so [Implementing exactly once in standard NiFi], the requirements that Kafka puts forth don't really work well with NiFi's architecture.". So from that statement, I'd deduce that the current NiFi doesn't have exactly once semantics and that it sadly doesn't fit well into NiFi architecture. In summary, it seems that general NiFi (not upcoming 1.15 stateless) doesn't support (End-to-End) Exactly-Once-Semantics, am I right? Why doesn't it fit well to the architecutre of NiFi? From my limited understanding of NiFi (I am currently evaluating it playing around with it for the first time), it should in general be possible. Each processor works with transactions which can be rolled backed or committed and being persisted to a write ahead log. So if we don't face a hardware disk failure losing the hard drive NiFi is running on, exactly-once semantics should be possible between from and to each processor and hence, with transactions/two-phase-commit in the kafka producer, fully end to end exactly once (semantics)?! Best regards Theo [1] https://nifi.apache.org/docs/nifi-docs/html/overview.html [2] https://www.youtube.com/watch?v=fblkgr1PJ0o [3] https://community.cloudera.com/t5/Support-Questions/Can-nifi-promise-each-of-the-flowfiles-can-be-processed/m-p/141655 [4] https://community.cloudera.com/t5/Community-Articles/At-least-once-delivery-vs-exactly-once-delivery-semantics-in/ta-p/244688 [5] https://issues.apache.org/jira/browse/NIFI-9239
