Hi, Sorry for entering the discussion somewhat late but I wrote on the Issue you created, please have a look.
Best, Aljoscha > On 20. Oct 2017, at 16:56, Antoine Philippot <antoine.philip...@teads.tv> > wrote: > > Hi Piotrek, > > I come back to you with a Jira ticket that I created and a proposal > the ticket : https://issues.apache.org/jira/browse/FLINK-7883 > <https://issues.apache.org/jira/browse/FLINK-7883> > the proposal : > https://github.com/aphilippot/flink/commit/9c58c95bb4b68ea337f7c583b7e039d86f3142a6 > > <https://github.com/aphilippot/flink/commit/9c58c95bb4b68ea337f7c583b7e039d86f3142a6> > > I'am open to any comments or suggestions > > Antoine > > Le mar. 10 oct. 2017 à 09:28, Piotr Nowojski <pi...@data-artisans.com > <mailto:pi...@data-artisans.com>> a écrit : > Hi, > > That’s good to hear :) > > I quickly went through the code and it seems reasonable. I think there might > be need to think a little bit more about how this cancel checkpoint should be > exposed to the operators and what should be default action - right now by > default cancel flag is ignored, I would like to consider if throwing an > UnsupportedOperation would be a better long therm solution. > > But at first glance I do not see any larger issues and it would great if you > could make a pull request out of it. > > Piotrek > >> On 9 Oct 2017, at 15:56, Antoine Philippot <antoine.philip...@teads.tv >> <mailto:antoine.philip...@teads.tv>> wrote: >> >> Thanks for your advices Piotr. >> >> Firstly, yes, we are aware that even with clean shutdown we can end up with >> duplicated messages after a crash and it is acceptable as is it rare and >> unintentional unlike deploying new business code or up/down scale. >> >> I made a fork of the 1.2.1 version which we currently use and developed a >> simple POC based on the solution to pass a boolean stopSourceSavepoint from >> the job manager to the source when a cancel with savepoint is triggered. >> This is the altered code : >> https://github.com/aphilippot/flink/compare/release-1.2.1...aphilippot:flink_1_2_1_POC_savepoint >> >> <https://github.com/aphilippot/flink/compare/release-1.2.1...aphilippot:flink_1_2_1_POC_savepoint> >> >> We test it with our production workload and there are no duplicated messages >> any more while hundred of thousands were duplicated before. >> >> I planned to reapply/adapt this patch for the 1.3.2 release when we migrate >> to it and maybe later to the 1.4 >> >> I'm open to suggestion or to help/develop this feature upstream if you want. >> >> >> Le lun. 2 oct. 2017 à 19:09, Piotr Nowojski <pi...@data-artisans.com >> <mailto:pi...@data-artisans.com>> a écrit : >> We are planning to work on this clean shut down after releasing Flink 1.4. >> Implementing this properly would require some work, for example: >> - adding some checkpoint options to add information about >> “closing”/“shutting down” event >> - add clean shutdown to source functions API >> - implement handling of this clean shutdown in desired sources >> >> Those are not super complicated changes but also not trivial. >> >> One thing that you could do, is to implement some super hacky filter >> function just after source operator, that you would manually trigger. >> Normally it would pass all of the messages. Once triggered, it would wait >> for next checkpoint to happen. It would assume that it is a save point, and >> would start filtering out all of the subsequent messages. When this >> checkpoint completes, you could manually shutdown your Flink application. >> This could guarantee that there are no duplicated writes after a restart. >> This might work for clean shutdown, but it would be a very hacky solution. >> >> Btw, keep in mind that even with clean shutdown you can end up with >> duplicated messages after a crash and there is no way around this with Kafka >> 0.9. >> >> Piotrek >> >>> On Oct 2, 2017, at 5:30 PM, Antoine Philippot <antoine.philip...@teads.tv >>> <mailto:antoine.philip...@teads.tv>> wrote: >>> >>> Thanks Piotr for your answer, we sadly can't use kafka 0.11 for now (and >>> until a while). >>> >>> We can not afford tens of thousands of duplicated messages for each >>> application upgrade, can I help by working on this feature ? >>> Do you have any hint or details on this part of that "todo list" ? >>> >>> >>> Le lun. 2 oct. 2017 à 16:50, Piotr Nowojski <pi...@data-artisans.com >>> <mailto:pi...@data-artisans.com>> a écrit : >>> Hi, >>> >>> For failures recovery with Kafka 0.9 it is not possible to avoid duplicated >>> messages. Using Flink 1.4 (unreleased yet) combined with Kafka 0.11 it will >>> be possible to achieve exactly-once end to end semantic when writing to >>> Kafka. However this still a work in progress: >>> >>> https://issues.apache.org/jira/browse/FLINK-6988 >>> <https://issues.apache.org/jira/browse/FLINK-6988> >>> >>> However this is a superset of functionality that you are asking for. >>> Exactly-once just for clean shutdowns is also on our “TODO” list (it >>> would/could support Kafka 0.9), but it is not currently being actively >>> developed. >>> >>> Piotr Nowojski >>> >>>> On Oct 2, 2017, at 3:35 PM, Antoine Philippot <antoine.philip...@teads.tv >>>> <mailto:antoine.philip...@teads.tv>> wrote: >>>> >>>> Hi, >>>> >>>> I'm working on a flink streaming app with a kafka09 to kafka09 use case >>>> which handles around 100k messages per seconds. >>>> >>>> To upgrade our application we used to run a flink cancel with savepoint >>>> command followed by a flink run with the previous saved savepoint and the >>>> new application fat jar as parameter. We notice that we can have more than >>>> 50k of duplicated messages in the kafka sink wich is not idempotent. >>>> >>>> This behaviour is actually problematic for this project and I try to find >>>> a solution / workaround to avoid these duplicated messages. >>>> >>>> The JobManager indicates clearly that the cancel call is triggered once >>>> the savepoint is finished, but during the savepoint execution, kafka >>>> source continue to poll new messages which will not be part of the >>>> savepoint and will be replayed on the next application start. >>>> >>>> I try to find a solution with the stop command line argument but the kafka >>>> source doesn't implement StoppableFunction >>>> (https://issues.apache.org/jira/browse/FLINK-3404 >>>> <https://issues.apache.org/jira/browse/FLINK-3404>) and the savepoint >>>> generation is not available with stop in contrary to cancel. >>>> >>>> Is there an other solution to not process duplicated messages for each >>>> application upgrade or rescaling ? >>>> >>>> If no, has someone planned to implement it? Otherwise, I can propose a >>>> pull request after some architecture advices. >>>> >>>> The final goal is to stop polling source and trigger a savepoint once >>>> polling stopped. >>>> >>>> Thanks >>> >> >