Hi,
thanks for your answer. It seems like it will not be possible for me to upgrade 
to the newer universal Flink producer, because of an older Kafka version I am 
reading from. So unfortunately for now I will have to go with the hack.
Thanks
________________________________
From: Piotr Nowojski <pnowoj...@apache.org>
Sent: 03 March 2021 21:10
To: Witzany, Tomas <tomas.witz...@blindspot.ai>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Flink KafkaProducer flushing on savepoints

Hi,

What Flink version and which FlinkKafkaProducer version are you using?  
`FlinkKafkaProducerBase` is no longer used in the latest version. I would guess 
some older versions, and FlinkKafkaProducer010 or later (no longer supported).

I would suggest either to use the universal FlinkKafkaProducer (universal), or 
FliknKafkaProducer011 (if you are using a really old Flink version that doesn't 
have the universal Kafka connector). Both of those should work with any Kafka 
version and by looking at the code it seems to me like neither of those has the 
problem you mentioned. If you select 
`org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic#AT_LEAST_ONCE`
 and disable checkpointing it should be still flushing records on savepoints.

> The only thing I can think about is have checkpoints enabled with some very 
> high periodicity so that they are never(almost) triggered. But this is a hack.

Yes, it would be a hack. But it would work.

Best,
Piotrek

wt., 2 mar 2021 o 12:09 Witzany, Tomas 
<tomas.witz...@blindspot.ai<mailto:tomas.witz...@blindspot.ai>> napisaƂ(a):
Hi,
I have a question about the at-least-once guarantees for Kafka producers when 
checkpointing is disabled. In our data pipeline we have a Flink job on an 
unlimited stream that originally, we had checkpoints turned on. Further this 
job is cancelled with a savepoint once a day to do some data pre and 
post-processing for the next day, afterwards this job is restarted from the 
savepoint.

The issue we have is that we want to turn off checkpointing, since it does not 
give us much value and only creates extra IO. When this is done this 
message<https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java#L263>
 shows up:
"Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling 
flushing."
This prompted us to investigate, and it seems that if you have checkpointing 
disabled, there are no at-least-once guarantees. 
<https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java#L65>

What about if you have no checkpointing, but you make savepoints that you 
restore from yourself? Savepoints are the same thing as checkpoints in the 
code. The flink producer makes it impossible to turn on flushing and have 
checkpointing disabled. I can see why this is the case as there is some extra 
synchronization overhead related to the flushing flag being on. Is there a way 
to have checkpointing disabled and have at least once guarantees on savepoints?

The only thing I can think about is have checkpoints enabled with some very 
high periodicity so that they are never(almost) triggered. But this is a hack.

Tomas Witzany

Reply via email to