I don't use Kafka transactions, so I could only speculate. Seems that the transaction times out before being committed. Looking into the code, this could happen if there is *huge* amount of work between checkpoints (i.e. checkpoints do not happen often enough). I'll suggest investigating the logs looking for logs coming from the KafkaExactlyOnceSink.

 Jan

[1] https://github.com/apache/beam/blob/a944bf87cd03d32105d87fc986ecba5b656683bc/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L245

On 6/24/24 16:35, Ruben Vargas wrote:
On Mon, Jun 24, 2024 at 2:02 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi,

the distribution of keys to workers might not be uniform, when the
number of keys is comparable to total parallelism. General advise would be:

   a) try to increase number of keys (EOS parallelism in this case) to be
at least several times higher than parallelism
Make sense, unfortunately I faced an error when I tried to put the
shards > partitions. :(

"message": 
"PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
ids -> ToGBKResult ->
PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
(4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException\n\tat

Do I need to move any configuration to do that?

Thanks

   b) increase maxParallelism (default 128, maximum 32768), as it might
influence the assignment of keys to downstream workers

Best,

   Jan

On 6/21/24 05:25, Ruben Vargas wrote:
Image as not correctly attached. sending it again. Sorry

Thanks

On Thu, Jun 20, 2024 at 9:25 PM Ruben Vargas <ruben.var...@metova.com> wrote:
Hello guys, me again

I was trying to debug the issue with the  backpressure and I noticed
that even if I set the shards = 16, not all tasks are receiving
messages (attaching screenshot). You know potential causes and
solutions?

I really appreciate any help you can provide


Thank you very much!

Regards.


On Wed, Jun 19, 2024 at 11:09 PM Ruben Vargas <ruben.var...@metova.com> wrote:
Hello again

Thank you for all the suggestions.

Unfortunately if I put more shards than partitions it throws me this exception

"message": 
"PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
ids -> ToGBKResult ->
PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
(4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException\n\tat
..
..
..
org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat
java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
org.apache.kafka.common.errors.TimeoutException: Timeout expired after
60000ms while awaiting AddOffsetsToTxn\n",


Any other alternative? Thank you very much!

Regards

On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi,

regarding aligned vs unaligned checkpoints I recommend reading [1], it
explains it quite well. Generally, I would prefer unaligned checkpoints
in this case.

Another thing to consider is the number of shards of the EOS sink.
Because how the shards are distributed among workers, it might be good
idea to actually increase that to some number higher than number of
target partitions (e.g. targetPartitions * 10 or so). Additional thing
to consider is increasing maxParallelism of the pipeline (e.g. max value
is 32768), as it also affects how 'evenly' Flink assigns shards to
workers. You can check if the assignment is even using counters in the
sink operator(s).

    Jan

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/

On 6/19/24 05:15, Ruben Vargas wrote:
Hello guys

Now I was able to pass that error.

I had to set the consumer factory function
.withConsumerFactoryFn(new KafkaConsumerFactory<Void,V>(config))

This is because my cluster uses SASL authentication mechanism, and the
small consumer created to fetch the topics metadata was throwing that
error.

There are other couple things I noticed:

    - Now I have a lot of backpressure, I assigned x3 resources to the
cluster and even with that the back pressure is high . Any advice on
this? I already increased the shards to equal the number of partitions
of the destination topic.

- I have an error where
"State exists for shard mytopic-0, but there is no state stored with
Kafka topic mytopic' group id myconsumergroup'

The only way I found to recover from this error is to change the group
name. Any other advice on how to recover from this error?


Thank you very much for following this up!

On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas <ruben.var...@metova.com> wrote:
Hello Jan

Thanks for the suggestions

Any benefit of using aligned vs unaligned?


At the end I found one problem that was preventing  flink from doing
the checkpointing. It was a DoFn function that has some "non
serializable" objects, so I made those transient and initialized those
on the setup.

Weird, because I usually was able to detect these kinds of errors just
running in the direct runner, or even in flink before enabling EOS.


Now I'm facing another weird issue

org.apache.beam.sdk.util.UserCodeException:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms
expired before the last committed offset for partitions
[behavioral-signals-6] could be determined. Try tuning
default.api.timeout.ms larger to relax the threshold.
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)

I tried to extend the timeout and it didn't work, my shards are equal
to my number of partitions.

I appreciate any kind of guidance

Thanks.

On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský <je...@seznam.cz> wrote:
I'd suggest:
    a) use unaligned checkpoints, if possible

    b) verify the number of buckets you use for EOS sink, this limits 
parallelism [1].

Best,

    Jan

[1] 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-

On 6/18/24 09:32, Ruben Vargas wrote:

Hello Lukavsky

Thanks for your reply !

I thought was due backpreassure but i increased the resources of the cluster 
and problem still presist. More that that, data stop flowing and the checkpoint 
still fail.

I have configured the checkpoint to do it per minute. The timeout is 1h. Is 
aligned checkpoint.

El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský <je...@seznam.cz> 
escribió:
H Ruben,

from the provided screenshot it seems to me, that the pipeline in
backpressured by the sink. Can you please share your checkpoint
configuration? Are you using unaligned checkpoints? What is the
checkpointing interval and the volume of data coming in from the source?
With EOS data is committed after checkpoint, before that, the data is
buffered in state, which makes the sink more resource intensive.

     Jan

On 6/18/24 05:30, Ruben Vargas wrote:
Attached a better image of the console.

Thanks!

On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas <ruben.var...@metova.com> wrote:
Hello guys

Wondering if some of you have experiences enabling Exactly Once in
KafkaIO with Flink runner? I enabled it and now I'm facing an issue
where all the checkpoints are failing. I cannot see any exception on
the logs.

Flink console only mentions this "Asynchronous task checkpoint
failed." I also noticed that some operators don't acknowledge the
checkpointing  (Attached a screenshot).

I did this:

1) KafkaIO.Read:

update consumer properties with enable.auto.commit = false
.withReadCommitted()
.commitOffsetsInFinalize()

2) KafkaIO#write:

.withEOS(numShards, sinkGroupId)

But my application is not able to deliver messages to the output topic
due the checkpoint failing.
I also reviewed the timeout and other time sensitive parameters, those
are high right now.

I really appreciate your guidance on this. Thank you

Reply via email to