Thanks for your advices Piotr. Firstly, yes, we are aware that even with clean shutdown we can end up with duplicated messages after a crash and it is acceptable as is it rare and unintentional unlike deploying new business code or up/down scale.
I made a fork of the 1.2.1 version which we currently use and developed a simple POC based on the solution to pass a boolean stopSourceSavepoint from the job manager to the source when a cancel with savepoint is triggered. This is the altered code : https://github.com/aphilippot/flink/compare/release-1.2.1...aphilippot:flink_1_2_1_POC_savepoint We test it with our production workload and there are no duplicated messages any more while hundred of thousands were duplicated before. I planned to reapply/adapt this patch for the 1.3.2 release when we migrate to it and maybe later to the 1.4 I'm open to suggestion or to help/develop this feature upstream if you want. Le lun. 2 oct. 2017 à 19:09, Piotr Nowojski <pi...@data-artisans.com> a écrit : > We are planning to work on this clean shut down after releasing Flink 1.4. > Implementing this properly would require some work, for example: > - adding some checkpoint options to add information about > “closing”/“shutting down” event > - add clean shutdown to source functions API > - implement handling of this clean shutdown in desired sources > > Those are not super complicated changes but also not trivial. > > One thing that you could do, is to implement some super hacky filter > function just after source operator, that you would manually trigger. > Normally it would pass all of the messages. Once triggered, it would wait > for next checkpoint to happen. It would assume that it is a save point, and > would start filtering out all of the subsequent messages. When this > checkpoint completes, you could manually shutdown your Flink application. > This could guarantee that there are no duplicated writes after a restart. > This might work for clean shutdown, but it would be a very hacky solution. > > Btw, keep in mind that even with clean shutdown you can end up with > duplicated messages after a crash and there is no way around this with > Kafka 0.9. > > Piotrek > > On Oct 2, 2017, at 5:30 PM, Antoine Philippot <antoine.philip...@teads.tv> > wrote: > > Thanks Piotr for your answer, we sadly can't use kafka 0.11 for now (and > until a while). > > We can not afford tens of thousands of duplicated messages for each > application upgrade, can I help by working on this feature ? > Do you have any hint or details on this part of that "todo list" ? > > > Le lun. 2 oct. 2017 à 16:50, Piotr Nowojski <pi...@data-artisans.com> a > écrit : > >> Hi, >> >> For failures recovery with Kafka 0.9 it is not possible to avoid >> duplicated messages. Using Flink 1.4 (unreleased yet) combined with Kafka >> 0.11 it will be possible to achieve exactly-once end to end semantic when >> writing to Kafka. However this still a work in progress: >> >> https://issues.apache.org/jira/browse/FLINK-6988 >> >> However this is a superset of functionality that you are asking for. >> Exactly-once just for clean shutdowns is also on our “TODO” list (it >> would/could support Kafka 0.9), but it is not currently being actively >> developed. >> >> Piotr Nowojski >> >> On Oct 2, 2017, at 3:35 PM, Antoine Philippot <antoine.philip...@teads.tv> >> wrote: >> >> Hi, >> >> I'm working on a flink streaming app with a kafka09 to kafka09 use case >> which handles around 100k messages per seconds. >> >> To upgrade our application we used to run a flink cancel with savepoint >> command followed by a flink run with the previous saved savepoint and the >> new application fat jar as parameter. We notice that we can have more than >> 50k of duplicated messages in the kafka sink wich is not idempotent. >> >> This behaviour is actually problematic for this project and I try to find >> a solution / workaround to avoid these duplicated messages. >> >> The JobManager indicates clearly that the cancel call is triggered once >> the savepoint is finished, but during the savepoint execution, kafka source >> continue to poll new messages which will not be part of the savepoint and >> will be replayed on the next application start. >> >> I try to find a solution with the stop command line argument but the >> kafka source doesn't implement StoppableFunction ( >> https://issues.apache.org/jira/browse/FLINK-3404) and the savepoint >> generation is not available with stop in contrary to cancel. >> >> Is there an other solution to not process duplicated messages for each >> application upgrade or rescaling ? >> >> If no, has someone planned to implement it? Otherwise, I can propose a >> pull request after some architecture advices. >> >> The final goal is to stop polling source and trigger a savepoint once >> polling stopped. >> >> Thanks >> >> >> >