Thank you! One last question regarding Gordons response. When a pipeline
stops consuming and cleanly shuts down and there is no error during that
process, and then it gets started again and uses the last committed offset
in Kafka - there should be no data loss - or am I missing something?

In what scenario should I expect a data loss? (I can only think of the
jobmanager or taskmanager getting killed before the shutdown is done.)

Best,
Tobi

On Mon, Mar 2, 2020 at 1:45 PM Piotr Nowojski <pi...@ververica.com> wrote:

> Hi,
>
> Sorry for my previous slightly confusing response, please take a look at
> the response from Gordon.
>
> Piotrek
>
> On 2 Mar 2020, at 12:05, Kaymak, Tobias <tobias.kay...@ricardo.ch> wrote:
>
> Hi,
>
> let me refine my question: My pipeline is generated from Beam, so the
> Flink pipeline is a translated Beam pipeline. When I update my Apache Beam
> pipeline code, working with a snapshot in Flink to stop the pipeline is not
> an option, as the snapshot will use the old representation of the the Flink
> pipeline when resuming from that snapshot.
>
> Meaning that I am looking for a way to drain the pipeline cleanly and
> using the last committed offset in Kafka to resume processing after I
> started it again (launching it through Beam will regenerate the Flink
> pipeline and it should resume at the offset where it left of, that is the
> latest committed offset in Kafka).
>
> Can this be achieved with a cancel or stop of the Flink pipeline?
>
> Best,
> Tobias
>
> On Mon, Mar 2, 2020 at 11:09 AM Piotr Nowojski <pi...@ververica.com>
> wrote:
>
>> Hi Tobi,
>>
>> No, FlinkKafkaConsumer is not using committed Kafka’s offsets for
>> recovery. Offsets where to start from are stored in the checkpoint itself.
>> Updating the offsets back to Kafka is an optional, purely cosmetic thing
>> from the Flink’s perspective, so the job will start from the correct
>> offsets.
>>
>> However, if you for whatever the reason re-start the job from a
>> savepoint/checkpoint that’s not the latest one, this will violate
>> exactly-once guarantees - there will be some duplicated records committed
>> two times in the sinks, as simply some records would be processed and
>> committed twice. Committing happens on checkpoint, so if you are recovering
>> to some previous checkpoint, there is nothing Flink can do - some records
>> were already committed before.
>>
>> Piotrek
>>
>> On 2 Mar 2020, at 10:12, Kaymak, Tobias <tobias.kay...@ricardo.ch> wrote:
>>
>> Thank you Piotr!
>>
>> One last question - let's assume my source is a Kafka topic - if I stop
>> via the CLI with a savepoint in Flink 1.9, but do not use that savepoint
>> when restarting my job - the job would continue from the last offset that
>> has been committed in Kafka and thus I would also not experience a loss of
>> data in my sink. Is that correct?
>>
>> Best,
>> Tobi
>>
>> On Fri, Feb 28, 2020 at 3:17 PM Piotr Nowojski <pi...@ververica.com>
>> wrote:
>>
>>> Yes, that’s correct. There shouldn’t be any data loss. Stop with
>>> savepoint is a solution to make sure, that if you are stopping a job
>>> (either permanently or temporarily) that all of the results are
>>> published/committed to external systems before you actually stop the job.
>>>
>>> If you just cancel/kill/crash a job, in some rare cases (if a checkpoint
>>> was completing at the time cluster was crashing), some records might not be
>>> committed before the cancellation/kill/crash happened. Also note that
>>> doesn’t mean there is a data loss, just those records will be published
>>> once you restore your job from a checkpoint. If you want to stop the job
>>> permanently, that might not happen, hence we need stop with savepoint.
>>>
>>> Piotrek
>>>
>>> On 28 Feb 2020, at 15:02, Kaymak, Tobias <tobias.kay...@ricardo.ch>
>>> wrote:
>>>
>>> Thank you! For understanding the matter: When I have a streaming
>>> pipeline (reading from Kafka, writing somewhere) and I click "cancel" and
>>> after that I restart the pipeline - I should not expect any data to be lost
>>> - is that correct?
>>>
>>> Best,
>>> Tobias
>>>
>>> On Fri, Feb 28, 2020 at 2:51 PM Piotr Nowojski <pi...@ververica.com>
>>> wrote:
>>>
>>>> Thanks for confirming that Yadong. I’ve created a ticket for that [1].
>>>>
>>>> Piotrek
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-16340
>>>>
>>>> On 28 Feb 2020, at 14:32, Yadong Xie <vthink...@gmail.com> wrote:
>>>>
>>>> Hi
>>>>
>>>> 1. the old stop button was removed in flink 1.9.0 since it could not
>>>> work properly as I know
>>>> 2. if we have the feature of the stop with savepoint, we could add it
>>>> to the web UI, but it may still need some work on the rest API to support
>>>> the new feature
>>>>
>>>>
>>>> Best,
>>>> Yadong
>>>>
>>>>
>>>> Piotr Nowojski <pi...@ververica.com> 于2020年2月28日周五 下午8:49写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> I’m not sure. Maybe Yadong (CC) will know more, but to the best of my
>>>>> knowledge and research:
>>>>>
>>>>> 1. In Flink 1.9 we switched from the old webUI to a new one, that
>>>>> probably explains the difference you are seeing.
>>>>> 2. The “Stop” button in the old webUI, was not working properly - that
>>>>> was not stop with savepoint, as stop with savepoint is a relatively new
>>>>> feature.
>>>>> 3. Now that we have stop with savepoint (it can be used from CLI as
>>>>> you wrote), probably we could expose this feature in the new UI as well,
>>>>> unless it’s already exposed somewhere? Yadong, do you know an answer for
>>>>> that?
>>>>>
>>>>> Piotrek
>>>>>
>>>>> On 27 Feb 2020, at 13:31, Kaymak, Tobias <tobias.kay...@ricardo.ch>
>>>>> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> before Flink 1.9 I was able to "Stop" a streaming pipeline - after
>>>>> clicking that button in the webinterface it performed a clean shutdown. 
>>>>> Now
>>>>> with Flink 1.9 I just see the option to cancel it.
>>>>>
>>>>> However, using the commandline flink stop -d
>>>>> 266c5b38cf9d8e61a398a0bef4a1b350 still does the trick. So the
>>>>> functionality is there.
>>>>>
>>>>> Has the button been removed on purpose?
>>>>>
>>>>> Best,
>>>>> Tobias
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>> --
>>>
>>> Tobias Kaymak
>>> Data Engineer
>>> Data Intelligence
>>>
>>> tobias.kay...@ricardo.ch
>>> www.ricardo.ch
>>> Theilerstrasse 1a, 6300 Zug
>>>
>>>
>>>
>>
>

Reply via email to