Hello guys, me again I was trying to debug the issue with the backpressure and I noticed that even if I set the shards = 16, not all tasks are receiving messages (attaching screenshot). You know potential causes and solutions?
I really appreciate any help you can provide Thank you very much! Regards. On Wed, Jun 19, 2024 at 11:09 PM Ruben Vargas <ruben.var...@metova.com> wrote: > > Hello again > > Thank you for all the suggestions. > > Unfortunately if I put more shards than partitions it throws me this exception > > "message": > "PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist > ids -> ToGBKResult -> > PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write > to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter) > (4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to > FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException: > java.lang.RuntimeException: > java.lang.reflect.InvocationTargetException\n\tat > .. > .. > .. > org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat > java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: > org.apache.kafka.common.errors.TimeoutException: Timeout expired after > 60000ms while awaiting AddOffsetsToTxn\n", > > > Any other alternative? Thank you very much! > > Regards > > On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský <je...@seznam.cz> wrote: > > > > Hi, > > > > regarding aligned vs unaligned checkpoints I recommend reading [1], it > > explains it quite well. Generally, I would prefer unaligned checkpoints > > in this case. > > > > Another thing to consider is the number of shards of the EOS sink. > > Because how the shards are distributed among workers, it might be good > > idea to actually increase that to some number higher than number of > > target partitions (e.g. targetPartitions * 10 or so). Additional thing > > to consider is increasing maxParallelism of the pipeline (e.g. max value > > is 32768), as it also affects how 'evenly' Flink assigns shards to > > workers. You can check if the assignment is even using counters in the > > sink operator(s). > > > > Jan > > > > [1] > > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/ > > > > On 6/19/24 05:15, Ruben Vargas wrote: > > > Hello guys > > > > > > Now I was able to pass that error. > > > > > > I had to set the consumer factory function > > > .withConsumerFactoryFn(new KafkaConsumerFactory<Void,V>(config)) > > > > > > This is because my cluster uses SASL authentication mechanism, and the > > > small consumer created to fetch the topics metadata was throwing that > > > error. > > > > > > There are other couple things I noticed: > > > > > > - Now I have a lot of backpressure, I assigned x3 resources to the > > > cluster and even with that the back pressure is high . Any advice on > > > this? I already increased the shards to equal the number of partitions > > > of the destination topic. > > > > > > - I have an error where > > > "State exists for shard mytopic-0, but there is no state stored with > > > Kafka topic mytopic' group id myconsumergroup' > > > > > > The only way I found to recover from this error is to change the group > > > name. Any other advice on how to recover from this error? > > > > > > > > > Thank you very much for following this up! > > > > > > On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas <ruben.var...@metova.com> > > > wrote: > > >> Hello Jan > > >> > > >> Thanks for the suggestions > > >> > > >> Any benefit of using aligned vs unaligned? > > >> > > >> > > >> At the end I found one problem that was preventing flink from doing > > >> the checkpointing. It was a DoFn function that has some "non > > >> serializable" objects, so I made those transient and initialized those > > >> on the setup. > > >> > > >> Weird, because I usually was able to detect these kinds of errors just > > >> running in the direct runner, or even in flink before enabling EOS. > > >> > > >> > > >> Now I'm facing another weird issue > > >> > > >> org.apache.beam.sdk.util.UserCodeException: > > >> org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms > > >> expired before the last committed offset for partitions > > >> [behavioral-signals-6] could be determined. Try tuning > > >> default.api.timeout.ms larger to relax the threshold. > > >> at > > >> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) > > >> at > > >> org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown > > >> Source) > > >> at > > >> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212) > > >> > > >> I tried to extend the timeout and it didn't work, my shards are equal > > >> to my number of partitions. > > >> > > >> I appreciate any kind of guidance > > >> > > >> Thanks. > > >> > > >> On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský <je...@seznam.cz> wrote: > > >>> I'd suggest: > > >>> a) use unaligned checkpoints, if possible > > >>> > > >>> b) verify the number of buckets you use for EOS sink, this limits > > >>> parallelism [1]. > > >>> > > >>> Best, > > >>> > > >>> Jan > > >>> > > >>> [1] > > >>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String- > > >>> > > >>> On 6/18/24 09:32, Ruben Vargas wrote: > > >>> > > >>> Hello Lukavsky > > >>> > > >>> Thanks for your reply ! > > >>> > > >>> I thought was due backpreassure but i increased the resources of the > > >>> cluster and problem still presist. More that that, data stop flowing > > >>> and the checkpoint still fail. > > >>> > > >>> I have configured the checkpoint to do it per minute. The timeout is > > >>> 1h. Is aligned checkpoint. > > >>> > > >>> El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský > > >>> <je...@seznam.cz> escribió: > > >>>> H Ruben, > > >>>> > > >>>> from the provided screenshot it seems to me, that the pipeline in > > >>>> backpressured by the sink. Can you please share your checkpoint > > >>>> configuration? Are you using unaligned checkpoints? What is the > > >>>> checkpointing interval and the volume of data coming in from the > > >>>> source? > > >>>> With EOS data is committed after checkpoint, before that, the data is > > >>>> buffered in state, which makes the sink more resource intensive. > > >>>> > > >>>> Jan > > >>>> > > >>>> On 6/18/24 05:30, Ruben Vargas wrote: > > >>>>> Attached a better image of the console. > > >>>>> > > >>>>> Thanks! > > >>>>> > > >>>>> On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas > > >>>>> <ruben.var...@metova.com> wrote: > > >>>>>> Hello guys > > >>>>>> > > >>>>>> Wondering if some of you have experiences enabling Exactly Once in > > >>>>>> KafkaIO with Flink runner? I enabled it and now I'm facing an issue > > >>>>>> where all the checkpoints are failing. I cannot see any exception on > > >>>>>> the logs. > > >>>>>> > > >>>>>> Flink console only mentions this "Asynchronous task checkpoint > > >>>>>> failed." I also noticed that some operators don't acknowledge the > > >>>>>> checkpointing (Attached a screenshot). > > >>>>>> > > >>>>>> I did this: > > >>>>>> > > >>>>>> 1) KafkaIO.Read: > > >>>>>> > > >>>>>> update consumer properties with enable.auto.commit = false > > >>>>>> .withReadCommitted() > > >>>>>> .commitOffsetsInFinalize() > > >>>>>> > > >>>>>> 2) KafkaIO#write: > > >>>>>> > > >>>>>> .withEOS(numShards, sinkGroupId) > > >>>>>> > > >>>>>> But my application is not able to deliver messages to the output > > >>>>>> topic > > >>>>>> due the checkpoint failing. > > >>>>>> I also reviewed the timeout and other time sensitive parameters, > > >>>>>> those > > >>>>>> are high right now. > > >>>>>> > > >>>>>> I really appreciate your guidance on this. Thank you
tasks-utilization
Description: Binary data