Hi,

Sorry for entering the discussion somewhat late but I wrote on the Issue you 
created, please have a look.

Best,
Aljoscha

> On 20. Oct 2017, at 16:56, Antoine Philippot <antoine.philip...@teads.tv> 
> wrote:
> 
> Hi Piotrek,
> 
> I come back to you with a Jira ticket that I created and a proposal
> the ticket : https://issues.apache.org/jira/browse/FLINK-7883 
> <https://issues.apache.org/jira/browse/FLINK-7883>
> the proposal  : 
> https://github.com/aphilippot/flink/commit/9c58c95bb4b68ea337f7c583b7e039d86f3142a6
>  
> <https://github.com/aphilippot/flink/commit/9c58c95bb4b68ea337f7c583b7e039d86f3142a6>
> 
> I'am open to any comments or suggestions
> 
> Antoine
> 
> Le mar. 10 oct. 2017 à 09:28, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> a écrit :
> Hi, 
> 
> That’s good to hear :)
> 
> I quickly went through the code and it seems reasonable. I think there might 
> be need to think a little bit more about how this cancel checkpoint should be 
> exposed to the operators and what should be default action - right now by 
> default cancel flag is ignored, I would like to consider if throwing an 
> UnsupportedOperation would be a better long therm solution.
> 
> But at first glance I do not see any larger issues and it would great if you 
> could make a pull request out of it.
> 
> Piotrek
> 
>> On 9 Oct 2017, at 15:56, Antoine Philippot <antoine.philip...@teads.tv 
>> <mailto:antoine.philip...@teads.tv>> wrote:
>> 
>> Thanks for your advices Piotr.
>> 
>> Firstly, yes, we are aware that even with clean shutdown we can end up with 
>> duplicated messages after a crash and it is acceptable as is it rare and 
>> unintentional unlike deploying new business code or up/down scale.
>> 
>> I made a fork of the 1.2.1 version which we currently use and developed a 
>> simple POC based on the solution to pass a boolean stopSourceSavepoint from 
>> the job manager to the source when a cancel with savepoint is triggered.
>> This is the altered code : 
>> https://github.com/aphilippot/flink/compare/release-1.2.1...aphilippot:flink_1_2_1_POC_savepoint
>>  
>> <https://github.com/aphilippot/flink/compare/release-1.2.1...aphilippot:flink_1_2_1_POC_savepoint>
>> 
>> We test it with our production workload and there are no duplicated messages 
>> any more while hundred of thousands were duplicated before.
>> 
>> I planned to reapply/adapt this patch for the 1.3.2 release when we migrate 
>> to it and maybe later to the 1.4
>> 
>> I'm open to suggestion or to help/develop this feature upstream if you want.
>> 
>> 
>> Le lun. 2 oct. 2017 à 19:09, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> a écrit :
>> We are planning to work on this clean shut down after releasing Flink 1.4. 
>> Implementing this properly would require some work, for example:
>> - adding some checkpoint options to add information about 
>> “closing”/“shutting down” event
>> - add clean shutdown to source functions API
>> - implement handling of this clean shutdown in desired sources
>> 
>> Those are not super complicated changes but also not trivial.
>> 
>> One thing that you could do, is to implement some super hacky filter 
>> function just after source operator, that you would manually trigger. 
>> Normally it would pass all of the messages. Once triggered, it would wait 
>> for next checkpoint to happen. It would assume that it is a save point, and 
>> would start filtering out all of the subsequent messages. When this 
>> checkpoint completes, you could manually shutdown your Flink application. 
>> This could guarantee that there are no duplicated writes after a restart. 
>> This might work for clean shutdown, but it would be a very hacky solution. 
>> 
>> Btw, keep in mind that even with clean shutdown you can end up with 
>> duplicated messages after a crash and there is no way around this with Kafka 
>> 0.9.
>> 
>> Piotrek
>> 
>>> On Oct 2, 2017, at 5:30 PM, Antoine Philippot <antoine.philip...@teads.tv 
>>> <mailto:antoine.philip...@teads.tv>> wrote:
>>> 
>>> Thanks Piotr for your answer, we sadly can't use kafka 0.11 for now (and 
>>> until a while).
>>> 
>>> We can not afford tens of thousands of duplicated messages for each 
>>> application upgrade, can I help by working on this feature ?
>>> Do you have any hint or details on this part of that "todo list" ? 
>>>  
>>> 
>>> Le lun. 2 oct. 2017 à 16:50, Piotr Nowojski <pi...@data-artisans.com 
>>> <mailto:pi...@data-artisans.com>> a écrit :
>>> Hi,
>>> 
>>> For failures recovery with Kafka 0.9 it is not possible to avoid duplicated 
>>> messages. Using Flink 1.4 (unreleased yet) combined with Kafka 0.11 it will 
>>> be possible to achieve exactly-once end to end semantic when writing to 
>>> Kafka. However this still a work in progress:
>>> 
>>> https://issues.apache.org/jira/browse/FLINK-6988 
>>> <https://issues.apache.org/jira/browse/FLINK-6988>
>>> 
>>> However this is a superset of functionality that you are asking for. 
>>> Exactly-once just for clean shutdowns is also on our “TODO” list (it 
>>> would/could support Kafka 0.9), but it is not currently being actively 
>>> developed.
>>> 
>>> Piotr Nowojski
>>> 
>>>> On Oct 2, 2017, at 3:35 PM, Antoine Philippot <antoine.philip...@teads.tv 
>>>> <mailto:antoine.philip...@teads.tv>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> I'm working on a flink streaming app with a kafka09 to kafka09 use case 
>>>> which handles around 100k messages per seconds.
>>>> 
>>>> To upgrade our application we used to run a flink cancel with savepoint 
>>>> command followed by a flink run with the previous saved savepoint and the 
>>>> new application fat jar as parameter. We notice that we can have more than 
>>>> 50k of duplicated messages in the kafka sink wich is not idempotent.
>>>> 
>>>> This behaviour is actually problematic for this project and I try to find 
>>>> a solution / workaround to avoid these duplicated messages.
>>>> 
>>>> The JobManager indicates clearly that the cancel call is triggered once 
>>>> the savepoint is finished, but during the savepoint execution, kafka 
>>>> source continue to poll new messages which will not be part of the 
>>>> savepoint and will be replayed on the next application start.
>>>> 
>>>> I try to find a solution with the stop command line argument but the kafka 
>>>> source doesn't implement StoppableFunction 
>>>> (https://issues.apache.org/jira/browse/FLINK-3404 
>>>> <https://issues.apache.org/jira/browse/FLINK-3404>) and the savepoint 
>>>> generation is not available with stop in contrary to cancel.
>>>> 
>>>> Is there an other solution to not process duplicated messages for each 
>>>> application upgrade or rescaling ?
>>>> 
>>>> If no, has someone planned to implement it? Otherwise, I can propose a 
>>>> pull request after some architecture advices.
>>>> 
>>>> The final goal is to stop polling source and trigger a savepoint once 
>>>> polling stopped.
>>>> 
>>>> Thanks
>>> 
>> 
> 

Reply via email to