Thank you! One last question regarding Gordons response. When a pipeline stops consuming and cleanly shuts down and there is no error during that process, and then it gets started again and uses the last committed offset in Kafka - there should be no data loss - or am I missing something?
In what scenario should I expect a data loss? (I can only think of the jobmanager or taskmanager getting killed before the shutdown is done.) Best, Tobi On Mon, Mar 2, 2020 at 1:45 PM Piotr Nowojski <pi...@ververica.com> wrote: > Hi, > > Sorry for my previous slightly confusing response, please take a look at > the response from Gordon. > > Piotrek > > On 2 Mar 2020, at 12:05, Kaymak, Tobias <tobias.kay...@ricardo.ch> wrote: > > Hi, > > let me refine my question: My pipeline is generated from Beam, so the > Flink pipeline is a translated Beam pipeline. When I update my Apache Beam > pipeline code, working with a snapshot in Flink to stop the pipeline is not > an option, as the snapshot will use the old representation of the the Flink > pipeline when resuming from that snapshot. > > Meaning that I am looking for a way to drain the pipeline cleanly and > using the last committed offset in Kafka to resume processing after I > started it again (launching it through Beam will regenerate the Flink > pipeline and it should resume at the offset where it left of, that is the > latest committed offset in Kafka). > > Can this be achieved with a cancel or stop of the Flink pipeline? > > Best, > Tobias > > On Mon, Mar 2, 2020 at 11:09 AM Piotr Nowojski <pi...@ververica.com> > wrote: > >> Hi Tobi, >> >> No, FlinkKafkaConsumer is not using committed Kafka’s offsets for >> recovery. Offsets where to start from are stored in the checkpoint itself. >> Updating the offsets back to Kafka is an optional, purely cosmetic thing >> from the Flink’s perspective, so the job will start from the correct >> offsets. >> >> However, if you for whatever the reason re-start the job from a >> savepoint/checkpoint that’s not the latest one, this will violate >> exactly-once guarantees - there will be some duplicated records committed >> two times in the sinks, as simply some records would be processed and >> committed twice. Committing happens on checkpoint, so if you are recovering >> to some previous checkpoint, there is nothing Flink can do - some records >> were already committed before. >> >> Piotrek >> >> On 2 Mar 2020, at 10:12, Kaymak, Tobias <tobias.kay...@ricardo.ch> wrote: >> >> Thank you Piotr! >> >> One last question - let's assume my source is a Kafka topic - if I stop >> via the CLI with a savepoint in Flink 1.9, but do not use that savepoint >> when restarting my job - the job would continue from the last offset that >> has been committed in Kafka and thus I would also not experience a loss of >> data in my sink. Is that correct? >> >> Best, >> Tobi >> >> On Fri, Feb 28, 2020 at 3:17 PM Piotr Nowojski <pi...@ververica.com> >> wrote: >> >>> Yes, that’s correct. There shouldn’t be any data loss. Stop with >>> savepoint is a solution to make sure, that if you are stopping a job >>> (either permanently or temporarily) that all of the results are >>> published/committed to external systems before you actually stop the job. >>> >>> If you just cancel/kill/crash a job, in some rare cases (if a checkpoint >>> was completing at the time cluster was crashing), some records might not be >>> committed before the cancellation/kill/crash happened. Also note that >>> doesn’t mean there is a data loss, just those records will be published >>> once you restore your job from a checkpoint. If you want to stop the job >>> permanently, that might not happen, hence we need stop with savepoint. >>> >>> Piotrek >>> >>> On 28 Feb 2020, at 15:02, Kaymak, Tobias <tobias.kay...@ricardo.ch> >>> wrote: >>> >>> Thank you! For understanding the matter: When I have a streaming >>> pipeline (reading from Kafka, writing somewhere) and I click "cancel" and >>> after that I restart the pipeline - I should not expect any data to be lost >>> - is that correct? >>> >>> Best, >>> Tobias >>> >>> On Fri, Feb 28, 2020 at 2:51 PM Piotr Nowojski <pi...@ververica.com> >>> wrote: >>> >>>> Thanks for confirming that Yadong. I’ve created a ticket for that [1]. >>>> >>>> Piotrek >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-16340 >>>> >>>> On 28 Feb 2020, at 14:32, Yadong Xie <vthink...@gmail.com> wrote: >>>> >>>> Hi >>>> >>>> 1. the old stop button was removed in flink 1.9.0 since it could not >>>> work properly as I know >>>> 2. if we have the feature of the stop with savepoint, we could add it >>>> to the web UI, but it may still need some work on the rest API to support >>>> the new feature >>>> >>>> >>>> Best, >>>> Yadong >>>> >>>> >>>> Piotr Nowojski <pi...@ververica.com> 于2020年2月28日周五 下午8:49写道: >>>> >>>>> Hi, >>>>> >>>>> I’m not sure. Maybe Yadong (CC) will know more, but to the best of my >>>>> knowledge and research: >>>>> >>>>> 1. In Flink 1.9 we switched from the old webUI to a new one, that >>>>> probably explains the difference you are seeing. >>>>> 2. The “Stop” button in the old webUI, was not working properly - that >>>>> was not stop with savepoint, as stop with savepoint is a relatively new >>>>> feature. >>>>> 3. Now that we have stop with savepoint (it can be used from CLI as >>>>> you wrote), probably we could expose this feature in the new UI as well, >>>>> unless it’s already exposed somewhere? Yadong, do you know an answer for >>>>> that? >>>>> >>>>> Piotrek >>>>> >>>>> On 27 Feb 2020, at 13:31, Kaymak, Tobias <tobias.kay...@ricardo.ch> >>>>> wrote: >>>>> >>>>> Hello, >>>>> >>>>> before Flink 1.9 I was able to "Stop" a streaming pipeline - after >>>>> clicking that button in the webinterface it performed a clean shutdown. >>>>> Now >>>>> with Flink 1.9 I just see the option to cancel it. >>>>> >>>>> However, using the commandline flink stop -d >>>>> 266c5b38cf9d8e61a398a0bef4a1b350 still does the trick. So the >>>>> functionality is there. >>>>> >>>>> Has the button been removed on purpose? >>>>> >>>>> Best, >>>>> Tobias >>>>> >>>>> >>>>> >>>> >>> >>> -- >>> >>> Tobias Kaymak >>> Data Engineer >>> Data Intelligence >>> >>> tobias.kay...@ricardo.ch >>> www.ricardo.ch >>> Theilerstrasse 1a, 6300 Zug >>> >>> >>> >> >