Hi Yordan,

Indeed it looks like a missing feature. Probably someone implementing the
new KafkaSink didn't realize how important this is. I've created a ticket
to work on this issue [1], but I don't know when or who could fix it.

I think a workaround might be to create a new `KafkaSink` instance that
will have a new, different operator uid, and simply drop/ignore the old
instance and its state (by using the `allowNonRestoredState` option [2]).

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-30068
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#allowing-non-restored-state


śr., 16 lis 2022 o 11:36 Yordan Pavlov <y.d.pav...@gmail.com> napisał(a):

> Hi Piotr,
>
> the option you mention is applicable only for the deprecated
> KafkaProducer, is there an equivalent to the modern KafkaSink? I found
> this article comparing the behavior of the two:
>
> https://ververica.zendesk.com/hc/en-us/articles/360013269680-Best-Practices-for-Using-Kafka-Sources-Sinks-in-Flink-Jobs
>
> it suggests that the default behavior of KafkaSink would be: "The
> recovery continues with an ERROR message like the following is
> logged:", however this is not what I observe, instead the job fails. I
> am attaching the relevant part of the log. This error happens upon
> trying to recover from a one month old savepoint.
>
> Regards,
> Yordan
>
> On Tue, 15 Nov 2022 at 18:53, Piotr Nowojski <pnowoj...@apache.org> wrote:
> >
> > Hi Yordan,
> >
> > I don't understand where the problem is, why do you think savepoints are
> unusable? If you recover with `ignoreFailuresAfterTransactionTimeout`
> enabled, the current Flink behaviour shouldn't cause any problems (except
> for maybe some logged errors).
> >
> > Best,
> > Piotrek
> >
> > wt., 15 lis 2022 o 15:36 Yordan Pavlov <y.d.pav...@gmail.com>
> napisał(a):
> >>
> >> Hi,
> >> we are using Kafka savepoints as a recovery tool and want to store
> >> multiple ones for the past months. However as we use Kafka
> >> transactions for our KafkaSink this puts expiration time on our
> >> savepoints. We can use a savepoint only as old as our Kafka
> >> transaction timeout. The problem is explained in this issue:
> >> https://issues.apache.org/jira/browse/FLINK-16419
> >> the relative comment being this one:
> >> "FlinkKafkaProducer or KafkaSink do not know during recovery if they
> >> have to recover and commit or if it has already happened. Due to that,
> >> they are always attempting to recover and commit transactions during
> >> startup."
> >> I'm surprised that more people are not hitting this problem as this
> >> makes Savepoints pretty much unusable as a recovery mechanism.
>

Reply via email to