Hi,

regarding aligned vs unaligned checkpoints I recommend reading [1], it explains it quite well. Generally, I would prefer unaligned checkpoints in this case.

Another thing to consider is the number of shards of the EOS sink. Because how the shards are distributed among workers, it might be good idea to actually increase that to some number higher than number of target partitions (e.g. targetPartitions * 10 or so). Additional thing to consider is increasing maxParallelism of the pipeline (e.g. max value is 32768), as it also affects how 'evenly' Flink assigns shards to workers. You can check if the assignment is even using counters in the sink operator(s).

 Jan

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/

On 6/19/24 05:15, Ruben Vargas wrote:
Hello guys

Now I was able to pass that error.

I had to set the consumer factory function
.withConsumerFactoryFn(new KafkaConsumerFactory<Void,V>(config))

This is because my cluster uses SASL authentication mechanism, and the
small consumer created to fetch the topics metadata was throwing that
error.

There are other couple things I noticed:

  - Now I have a lot of backpressure, I assigned x3 resources to the
cluster and even with that the back pressure is high . Any advice on
this? I already increased the shards to equal the number of partitions
of the destination topic.

- I have an error where
"State exists for shard mytopic-0, but there is no state stored with
Kafka topic mytopic' group id myconsumergroup'

The only way I found to recover from this error is to change the group
name. Any other advice on how to recover from this error?


Thank you very much for following this up!

On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas <ruben.var...@metova.com> wrote:
Hello Jan

Thanks for the suggestions

Any benefit of using aligned vs unaligned?


At the end I found one problem that was preventing  flink from doing
the checkpointing. It was a DoFn function that has some "non
serializable" objects, so I made those transient and initialized those
on the setup.

Weird, because I usually was able to detect these kinds of errors just
running in the direct runner, or even in flink before enabling EOS.


Now I'm facing another weird issue

org.apache.beam.sdk.util.UserCodeException:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms
expired before the last committed offset for partitions
[behavioral-signals-6] could be determined. Try tuning
default.api.timeout.ms larger to relax the threshold.
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)

I tried to extend the timeout and it didn't work, my shards are equal
to my number of partitions.

I appreciate any kind of guidance

Thanks.

On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský <je...@seznam.cz> wrote:
I'd suggest:
  a) use unaligned checkpoints, if possible

  b) verify the number of buckets you use for EOS sink, this limits parallelism 
[1].

Best,

  Jan

[1] 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-

On 6/18/24 09:32, Ruben Vargas wrote:

Hello Lukavsky

Thanks for your reply !

I thought was due backpreassure but i increased the resources of the cluster 
and problem still presist. More that that, data stop flowing and the checkpoint 
still fail.

I have configured the checkpoint to do it per minute. The timeout is 1h. Is 
aligned checkpoint.

El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský <je...@seznam.cz> 
escribió:
H Ruben,

from the provided screenshot it seems to me, that the pipeline in
backpressured by the sink. Can you please share your checkpoint
configuration? Are you using unaligned checkpoints? What is the
checkpointing interval and the volume of data coming in from the source?
With EOS data is committed after checkpoint, before that, the data is
buffered in state, which makes the sink more resource intensive.

   Jan

On 6/18/24 05:30, Ruben Vargas wrote:
Attached a better image of the console.

Thanks!

On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas <ruben.var...@metova.com> wrote:
Hello guys

Wondering if some of you have experiences enabling Exactly Once in
KafkaIO with Flink runner? I enabled it and now I'm facing an issue
where all the checkpoints are failing. I cannot see any exception on
the logs.

Flink console only mentions this "Asynchronous task checkpoint
failed." I also noticed that some operators don't acknowledge the
checkpointing  (Attached a screenshot).

I did this:

1) KafkaIO.Read:

update consumer properties with enable.auto.commit = false
.withReadCommitted()
.commitOffsetsInFinalize()

2) KafkaIO#write:

.withEOS(numShards, sinkGroupId)

But my application is not able to deliver messages to the output topic
due the checkpoint failing.
I also reviewed the timeout and other time sensitive parameters, those
are high right now.

I really appreciate your guidance on this. Thank you

Reply via email to