Re: Flink KafkaProducer flushing on savepoints

2021-03-05 Thread Piotr Nowojski
Yes, that might be an issue. As far as I remember, the universal connector
works with Kafka 0.10.x or higher.

Piotrek

pt., 5 mar 2021 o 11:20 Witzany, Tomas 
napisał(a):

> Hi,
> thanks for your answer. It seems like it will not be possible for me to
> upgrade to the newer universal Flink producer, because of an older Kafka
> version I am reading from. So unfortunately for now I will have to go with
> the hack.
> Thanks
> --
> *From:* Piotr Nowojski 
> *Sent:* 03 March 2021 21:10
> *To:* Witzany, Tomas 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Flink KafkaProducer flushing on savepoints
>
> Hi,
>
> What Flink version and which FlinkKafkaProducer version are you using?
> `FlinkKafkaProducerBase` is no longer used in the latest version. I would
> guess some older versions, and FlinkKafkaProducer010 or later (no longer
> supported).
>
> I would suggest either to use the universal FlinkKafkaProducer
> (universal), or FliknKafkaProducer011 (if you are using a really old Flink
> version that doesn't have the universal Kafka connector). Both of those
> should work with any Kafka version and by looking at the code it seems to
> me like neither of those has the problem you mentioned. If you select
> `org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic#AT_LEAST_ONCE`
> and disable checkpointing it should be still flushing records on savepoints.
>
> > The only thing I can think about is have checkpoints enabled with some
> very high periodicity so that they are never(almost) triggered. But this is
> a hack.
>
> Yes, it would be a hack. But it would work.
>
> Best,
> Piotrek
>
> wt., 2 mar 2021 o 12:09 Witzany, Tomas 
> napisał(a):
>
> Hi,
> I have a question about the at-least-once guarantees for Kafka producers
> when checkpointing is disabled. In our data pipeline we have a Flink job on
> an unlimited stream that originally, we had checkpoints turned on. Further
> this job is cancelled with a savepoint once a day to do some data pre and
> post-processing for the next day, afterwards this job is restarted from the
> savepoint.
>
> The issue we have is that we want to turn off checkpointing, since it
> does not give us much value and only creates extra IO. When this is done this
> message
> <https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java#L263>
> shows up:
> "Flushing on checkpoint is enabled, but checkpointing is not enabled.
> Disabling flushing."
> This prompted us to investigate, and it seems that if you have
> checkpointing disabled, there are no at-least-once guarantees.
> <https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java#L65>
>
> What about if you have no checkpointing, but you make savepoints that you
> restore from yourself? Savepoints are the same thing as checkpoints in the
> code. The flink producer makes it impossible to turn on flushing and have
> checkpointing disabled. I can see why this is the case as there is some
> extra synchronization overhead related to the flushing flag being on. Is
> there a way to have checkpointing disabled and have at least once
> guarantees on savepoints?
>
> The only thing I can think about is have checkpoints enabled with some
> very high periodicity so that they are never(almost) triggered. But this is
> a hack.
>
> Tomas Witzany
>
>


Re: Flink KafkaProducer flushing on savepoints

2021-03-05 Thread Witzany, Tomas
Hi,
thanks for your answer. It seems like it will not be possible for me to upgrade 
to the newer universal Flink producer, because of an older Kafka version I am 
reading from. So unfortunately for now I will have to go with the hack.
Thanks

From: Piotr Nowojski 
Sent: 03 March 2021 21:10
To: Witzany, Tomas 
Cc: user@flink.apache.org 
Subject: Re: Flink KafkaProducer flushing on savepoints

Hi,

What Flink version and which FlinkKafkaProducer version are you using?  
`FlinkKafkaProducerBase` is no longer used in the latest version. I would guess 
some older versions, and FlinkKafkaProducer010 or later (no longer supported).

I would suggest either to use the universal FlinkKafkaProducer (universal), or 
FliknKafkaProducer011 (if you are using a really old Flink version that doesn't 
have the universal Kafka connector). Both of those should work with any Kafka 
version and by looking at the code it seems to me like neither of those has the 
problem you mentioned. If you select 
`org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic#AT_LEAST_ONCE`
 and disable checkpointing it should be still flushing records on savepoints.

> The only thing I can think about is have checkpoints enabled with some very 
> high periodicity so that they are never(almost) triggered. But this is a hack.

Yes, it would be a hack. But it would work.

Best,
Piotrek

wt., 2 mar 2021 o 12:09 Witzany, Tomas 
mailto:tomas.witz...@blindspot.ai>> napisał(a):
Hi,
I have a question about the at-least-once guarantees for Kafka producers when 
checkpointing is disabled. In our data pipeline we have a Flink job on an 
unlimited stream that originally, we had checkpoints turned on. Further this 
job is cancelled with a savepoint once a day to do some data pre and 
post-processing for the next day, afterwards this job is restarted from the 
savepoint.

The issue we have is that we want to turn off checkpointing, since it does not 
give us much value and only creates extra IO. When this is done this 
message<https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java#L263>
 shows up:
"Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling 
flushing."
This prompted us to investigate, and it seems that if you have checkpointing 
disabled, there are no at-least-once guarantees. 
<https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java#L65>

What about if you have no checkpointing, but you make savepoints that you 
restore from yourself? Savepoints are the same thing as checkpoints in the 
code. The flink producer makes it impossible to turn on flushing and have 
checkpointing disabled. I can see why this is the case as there is some extra 
synchronization overhead related to the flushing flag being on. Is there a way 
to have checkpointing disabled and have at least once guarantees on savepoints?

The only thing I can think about is have checkpoints enabled with some very 
high periodicity so that they are never(almost) triggered. But this is a hack.

Tomas Witzany


Re: Flink KafkaProducer flushing on savepoints

2021-03-03 Thread Piotr Nowojski
Hi,

What Flink version and which FlinkKafkaProducer version are you using?
`FlinkKafkaProducerBase` is no longer used in the latest version. I would
guess some older versions, and FlinkKafkaProducer010 or later (no longer
supported).

I would suggest either to use the universal FlinkKafkaProducer (universal),
or FliknKafkaProducer011 (if you are using a really old Flink version that
doesn't have the universal Kafka connector). Both of those should work with
any Kafka version and by looking at the code it seems to me like neither of
those has the problem you mentioned. If you select
`org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic#AT_LEAST_ONCE`
and disable checkpointing it should be still flushing records on savepoints.

> The only thing I can think about is have checkpoints enabled with some
very high periodicity so that they are never(almost) triggered. But this is
a hack.

Yes, it would be a hack. But it would work.

Best,
Piotrek

wt., 2 mar 2021 o 12:09 Witzany, Tomas 
napisał(a):

> Hi,
> I have a question about the at-least-once guarantees for Kafka producers
> when checkpointing is disabled. In our data pipeline we have a Flink job on
> an unlimited stream that originally, we had checkpoints turned on. Further
> this job is cancelled with a savepoint once a day to do some data pre and
> post-processing for the next day, afterwards this job is restarted from the
> savepoint.
>
> The issue we have is that we want to turn off checkpointing, since it
> does not give us much value and only creates extra IO. When this is done this
> message
> 
> shows up:
> "Flushing on checkpoint is enabled, but checkpointing is not enabled.
> Disabling flushing."
> This prompted us to investigate, and it seems that if you have
> checkpointing disabled, there are no at-least-once guarantees.
> 
>
> What about if you have no checkpointing, but you make savepoints that you
> restore from yourself? Savepoints are the same thing as checkpoints in the
> code. The flink producer makes it impossible to turn on flushing and have
> checkpointing disabled. I can see why this is the case as there is some
> extra synchronization overhead related to the flushing flag being on. Is
> there a way to have checkpointing disabled and have at least once
> guarantees on savepoints?
>
> The only thing I can think about is have checkpoints enabled with some
> very high periodicity so that they are never(almost) triggered. But this is
> a hack.
>
> Tomas Witzany
>


Flink KafkaProducer flushing on savepoints

2021-03-02 Thread Witzany, Tomas
Hi,
I have a question about the at-least-once guarantees for Kafka producers when 
checkpointing is disabled. In our data pipeline we have a Flink job on an 
unlimited stream that originally, we had checkpoints turned on. Further this 
job is cancelled with a savepoint once a day to do some data pre and 
post-processing for the next day, afterwards this job is restarted from the 
savepoint.

The issue we have is that we want to turn off checkpointing, since it does not 
give us much value and only creates extra IO. When this is done this 
message
 shows up:
"Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling 
flushing."
This prompted us to investigate, and it seems that if you have checkpointing 
disabled, there are no at-least-once guarantees. 


What about if you have no checkpointing, but you make savepoints that you 
restore from yourself? Savepoints are the same thing as checkpoints in the 
code. The flink producer makes it impossible to turn on flushing and have 
checkpointing disabled. I can see why this is the case as there is some extra 
synchronization overhead related to the flushing flag being on. Is there a way 
to have checkpointing disabled and have at least once guarantees on savepoints?

The only thing I can think about is have checkpoints enabled with some very 
high periodicity so that they are never(almost) triggered. But this is a hack.

Tomas Witzany