Image as not correctly attached. sending it again. Sorry Thanks
On Thu, Jun 20, 2024 at 9:25 PM Ruben Vargas <ruben.var...@metova.com> wrote: > > Hello guys, me again > > I was trying to debug the issue with the backpressure and I noticed > that even if I set the shards = 16, not all tasks are receiving > messages (attaching screenshot). You know potential causes and > solutions? > > I really appreciate any help you can provide > > > Thank you very much! > > Regards. > > > On Wed, Jun 19, 2024 at 11:09 PM Ruben Vargas <ruben.var...@metova.com> wrote: > > > > Hello again > > > > Thank you for all the suggestions. > > > > Unfortunately if I put more shards than partitions it throws me this > > exception > > > > "message": > > "PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist > > ids -> ToGBKResult -> > > PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write > > to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter) > > (4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to > > FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException: > > java.lang.RuntimeException: > > java.lang.reflect.InvocationTargetException\n\tat > > .. > > .. > > .. > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat > > java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: > > org.apache.kafka.common.errors.TimeoutException: Timeout expired after > > 60000ms while awaiting AddOffsetsToTxn\n", > > > > > > Any other alternative? Thank you very much! > > > > Regards > > > > On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský <je...@seznam.cz> wrote: > > > > > > Hi, > > > > > > regarding aligned vs unaligned checkpoints I recommend reading [1], it > > > explains it quite well. Generally, I would prefer unaligned checkpoints > > > in this case. > > > > > > Another thing to consider is the number of shards of the EOS sink. > > > Because how the shards are distributed among workers, it might be good > > > idea to actually increase that to some number higher than number of > > > target partitions (e.g. targetPartitions * 10 or so). Additional thing > > > to consider is increasing maxParallelism of the pipeline (e.g. max value > > > is 32768), as it also affects how 'evenly' Flink assigns shards to > > > workers. You can check if the assignment is even using counters in the > > > sink operator(s). > > > > > > Jan > > > > > > [1] > > > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/ > > > > > > On 6/19/24 05:15, Ruben Vargas wrote: > > > > Hello guys > > > > > > > > Now I was able to pass that error. > > > > > > > > I had to set the consumer factory function > > > > .withConsumerFactoryFn(new KafkaConsumerFactory<Void,V>(config)) > > > > > > > > This is because my cluster uses SASL authentication mechanism, and the > > > > small consumer created to fetch the topics metadata was throwing that > > > > error. > > > > > > > > There are other couple things I noticed: > > > > > > > > - Now I have a lot of backpressure, I assigned x3 resources to the > > > > cluster and even with that the back pressure is high . Any advice on > > > > this? I already increased the shards to equal the number of partitions > > > > of the destination topic. > > > > > > > > - I have an error where > > > > "State exists for shard mytopic-0, but there is no state stored with > > > > Kafka topic mytopic' group id myconsumergroup' > > > > > > > > The only way I found to recover from this error is to change the group > > > > name. Any other advice on how to recover from this error? > > > > > > > > > > > > Thank you very much for following this up! > > > > > > > > On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas <ruben.var...@metova.com> > > > > wrote: > > > >> Hello Jan > > > >> > > > >> Thanks for the suggestions > > > >> > > > >> Any benefit of using aligned vs unaligned? > > > >> > > > >> > > > >> At the end I found one problem that was preventing flink from doing > > > >> the checkpointing. It was a DoFn function that has some "non > > > >> serializable" objects, so I made those transient and initialized those > > > >> on the setup. > > > >> > > > >> Weird, because I usually was able to detect these kinds of errors just > > > >> running in the direct runner, or even in flink before enabling EOS. > > > >> > > > >> > > > >> Now I'm facing another weird issue > > > >> > > > >> org.apache.beam.sdk.util.UserCodeException: > > > >> org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms > > > >> expired before the last committed offset for partitions > > > >> [behavioral-signals-6] could be determined. Try tuning > > > >> default.api.timeout.ms larger to relax the threshold. > > > >> at > > > >> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) > > > >> at > > > >> org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown > > > >> Source) > > > >> at > > > >> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212) > > > >> > > > >> I tried to extend the timeout and it didn't work, my shards are equal > > > >> to my number of partitions. > > > >> > > > >> I appreciate any kind of guidance > > > >> > > > >> Thanks. > > > >> > > > >> On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský <je...@seznam.cz> wrote: > > > >>> I'd suggest: > > > >>> a) use unaligned checkpoints, if possible > > > >>> > > > >>> b) verify the number of buckets you use for EOS sink, this limits > > > >>> parallelism [1]. > > > >>> > > > >>> Best, > > > >>> > > > >>> Jan > > > >>> > > > >>> [1] > > > >>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String- > > > >>> > > > >>> On 6/18/24 09:32, Ruben Vargas wrote: > > > >>> > > > >>> Hello Lukavsky > > > >>> > > > >>> Thanks for your reply ! > > > >>> > > > >>> I thought was due backpreassure but i increased the resources of the > > > >>> cluster and problem still presist. More that that, data stop flowing > > > >>> and the checkpoint still fail. > > > >>> > > > >>> I have configured the checkpoint to do it per minute. The timeout is > > > >>> 1h. Is aligned checkpoint. > > > >>> > > > >>> El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský > > > >>> <je...@seznam.cz> escribió: > > > >>>> H Ruben, > > > >>>> > > > >>>> from the provided screenshot it seems to me, that the pipeline in > > > >>>> backpressured by the sink. Can you please share your checkpoint > > > >>>> configuration? Are you using unaligned checkpoints? What is the > > > >>>> checkpointing interval and the volume of data coming in from the > > > >>>> source? > > > >>>> With EOS data is committed after checkpoint, before that, the data is > > > >>>> buffered in state, which makes the sink more resource intensive. > > > >>>> > > > >>>> Jan > > > >>>> > > > >>>> On 6/18/24 05:30, Ruben Vargas wrote: > > > >>>>> Attached a better image of the console. > > > >>>>> > > > >>>>> Thanks! > > > >>>>> > > > >>>>> On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas > > > >>>>> <ruben.var...@metova.com> wrote: > > > >>>>>> Hello guys > > > >>>>>> > > > >>>>>> Wondering if some of you have experiences enabling Exactly Once in > > > >>>>>> KafkaIO with Flink runner? I enabled it and now I'm facing an issue > > > >>>>>> where all the checkpoints are failing. I cannot see any exception > > > >>>>>> on > > > >>>>>> the logs. > > > >>>>>> > > > >>>>>> Flink console only mentions this "Asynchronous task checkpoint > > > >>>>>> failed." I also noticed that some operators don't acknowledge the > > > >>>>>> checkpointing (Attached a screenshot). > > > >>>>>> > > > >>>>>> I did this: > > > >>>>>> > > > >>>>>> 1) KafkaIO.Read: > > > >>>>>> > > > >>>>>> update consumer properties with enable.auto.commit = false > > > >>>>>> .withReadCommitted() > > > >>>>>> .commitOffsetsInFinalize() > > > >>>>>> > > > >>>>>> 2) KafkaIO#write: > > > >>>>>> > > > >>>>>> .withEOS(numShards, sinkGroupId) > > > >>>>>> > > > >>>>>> But my application is not able to deliver messages to the output > > > >>>>>> topic > > > >>>>>> due the checkpoint failing. > > > >>>>>> I also reviewed the timeout and other time sensitive parameters, > > > >>>>>> those > > > >>>>>> are high right now. > > > >>>>>> > > > >>>>>> I really appreciate your guidance on this. Thank you