Hi Vishwas

Image this scenario, if your last committed offset is A with a savepoint-A and 
then you just stop this job and try a new program logical such as print your 
output instead of writing to previous sink to do some experiments. The new 
experimental job might commit offset-B to kafka. Once verified, and then you 
still need to resume from kafka offset-A to ensure all data has been written to 
target sink. This would be easier If you just restore the job from savepoint-A.

In other words, Flink has already provided a more strong and flexible mechanism 
to resume kafka offsets, why not use this?

Yun Tang
From: Congxian Qiu <qcx978132...@gmail.com>
Sent: Thursday, October 10, 2019 11:52
To: theo.diefent...@scoop-software.de <theo.diefent...@scoop-software.de>
Cc: user <user@flink.apache.org>
Subject: Re: Flink restoring a job from a checkpoint

Hi Vishwas

Sorry for the confusing, what Theo said previous is the meaning I want to say.  
Previously, what I said is from Flink's side, if we do not restore from 
checkpoint/savepoint, all the TMs will have no state, so the Job starts from 


于2019年10月10日周四 上午1:15写道:
Hi Vishaws,

With "from scratch", Congxian means that Flink won't load any state 
automatically and starts as if there was no state. Of course if the kafka 
consumer group already exists and you have configured Flink to start from group 
offsets if there is no state yet, it will start from the group offsets.

I think your approach is totally fine. Ignoring savepoints and don't retaining 
checkpoints saves overhead and configuration burdens and works nicely as long 
as you don't have any state in your pipeline.

You should however be certain that nobody in your team will add something with 
state later on and forgets to think about the missing state...

Best regards

-------- Ursprüngliche Nachricht --------
Betreff: Re: Flink restoring a job from a checkpoint
Von: Vishwas Siravara
An: Congxian Qiu
Cc: Yun Tang ,user

Hi Congxian,
Thanks for getting back. Why would the streaming start from scratch if my 
consumer group does not change ? I start from the group offsets : 
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka 
So when I restart the job it should consume from the last committed offset to 
kafka isn't it ? Let me know what you think .

On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu 
<qcx978132...@gmail.com<mailto:qcx978132...@gmail.com>> wrote:
Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with 
parameter `-s`[1][2], otherwise, it will start from scratch.

checkpoint    --->     bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]



Vishwas Siravara <vsirav...@gmail.com<mailto:vsirav...@gmail.com>> 
于2019年10月9日周三 上午5:07写道:
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet :

env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka 

I have also enabled and externalized checkpointing to S3 .
Why is it not recommended to just restart the job once I cancel it, as long as 
the topology does not change? What is the advantage of explicitly restoring 
from last checkpoint by passing the -s option to the flink command line if it 
does the same thing? For instance if 
 is my last successful checkpoint, what is the difference between 1 and 2.

1. /usr/mware/flink/bin/flink run -d -C file:///usr/mware/flink/externalconfig/ 
-c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 8 -cp 
2. /usr/mware/flink/bin/flink run -s 
 -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main 
flink-job-assembly.jar flink druid -p 4 -cp qa_streaming


On Tue, Oct 8, 2019 at 1:51 PM Yun Tang 
<myas...@live.com<mailto:myas...@live.com>> wrote:
Hi Vishwas

If you did not configure your 
org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is 
GROUP_OFFSET by default, which means "Start from committed offsets in ZK / 
Kafka brokers of a specific consumer group". And you need  to enable checkpoint 
so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable 
checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could 
restore from last committed offset if previous checkpoint completed [1][2]. 
However, this is not really recommended, better to resume from last checkpoint 

[2] https://www.ververica.com/blog/kafka-flink-a-practical-how-to

Yun Tang

From: Vishwas Siravara <vsirav...@gmail.com<mailto:vsirav...@gmail.com>>
Sent: Wednesday, October 9, 2019 0:54
To: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Flink restoring a job from a checkpoint

Hi guys,
I have a flink streaming job which streams from a kafka source. There is no 
state in the job, just a simple filter , map and write to a kafka sink. Suppose 
I stop my job and then submit the job again to the cluster with the same 
consumer group, will the job restore automatically from the last successful 
checkpoint , since this is what is the last committed offset to kafka ?


Reply via email to