H Ruben,

from the provided screenshot it seems to me, that the pipeline in backpressured by the sink. Can you please share your checkpoint configuration? Are you using unaligned checkpoints? What is the checkpointing interval and the volume of data coming in from the source? With EOS data is committed after checkpoint, before that, the data is buffered in state, which makes the sink more resource intensive.

 Jan

On 6/18/24 05:30, Ruben Vargas wrote:
Attached a better image of the console.

Thanks!

On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas <ruben.var...@metova.com> wrote:
Hello guys

Wondering if some of you have experiences enabling Exactly Once in
KafkaIO with Flink runner? I enabled it and now I'm facing an issue
where all the checkpoints are failing. I cannot see any exception on
the logs.

Flink console only mentions this "Asynchronous task checkpoint
failed." I also noticed that some operators don't acknowledge the
checkpointing  (Attached a screenshot).

I did this:

1) KafkaIO.Read:

update consumer properties with enable.auto.commit = false
.withReadCommitted()
.commitOffsetsInFinalize()

2) KafkaIO#write:

.withEOS(numShards, sinkGroupId)

But my application is not able to deliver messages to the output topic
due the checkpoint failing.
I also reviewed the timeout and other time sensitive parameters, those
are high right now.

I really appreciate your guidance on this. Thank you

Reply via email to