Hello guys, me again

I was trying to debug the issue with the  backpressure and I noticed
that even if I set the shards = 16, not all tasks are receiving
messages (attaching screenshot). You know potential causes and
solutions?

I really appreciate any help you can provide


Thank you very much!

Regards.


On Wed, Jun 19, 2024 at 11:09 PM Ruben Vargas <ruben.var...@metova.com> wrote:
>
> Hello again
>
> Thank you for all the suggestions.
>
> Unfortunately if I put more shards than partitions it throws me this exception
>
> "message": 
> "PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
> ids -> ToGBKResult ->
> PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
> to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
> (4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
> FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
> java.lang.RuntimeException:
> java.lang.reflect.InvocationTargetException\n\tat
> ..
> ..
> ..
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat
> java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after
> 60000ms while awaiting AddOffsetsToTxn\n",
>
>
> Any other alternative? Thank you very much!
>
> Regards
>
> On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský <je...@seznam.cz> wrote:
> >
> > Hi,
> >
> > regarding aligned vs unaligned checkpoints I recommend reading [1], it
> > explains it quite well. Generally, I would prefer unaligned checkpoints
> > in this case.
> >
> > Another thing to consider is the number of shards of the EOS sink.
> > Because how the shards are distributed among workers, it might be good
> > idea to actually increase that to some number higher than number of
> > target partitions (e.g. targetPartitions * 10 or so). Additional thing
> > to consider is increasing maxParallelism of the pipeline (e.g. max value
> > is 32768), as it also affects how 'evenly' Flink assigns shards to
> > workers. You can check if the assignment is even using counters in the
> > sink operator(s).
> >
> >   Jan
> >
> > [1]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/
> >
> > On 6/19/24 05:15, Ruben Vargas wrote:
> > > Hello guys
> > >
> > > Now I was able to pass that error.
> > >
> > > I had to set the consumer factory function
> > > .withConsumerFactoryFn(new KafkaConsumerFactory<Void,V>(config))
> > >
> > > This is because my cluster uses SASL authentication mechanism, and the
> > > small consumer created to fetch the topics metadata was throwing that
> > > error.
> > >
> > > There are other couple things I noticed:
> > >
> > >   - Now I have a lot of backpressure, I assigned x3 resources to the
> > > cluster and even with that the back pressure is high . Any advice on
> > > this? I already increased the shards to equal the number of partitions
> > > of the destination topic.
> > >
> > > - I have an error where
> > > "State exists for shard mytopic-0, but there is no state stored with
> > > Kafka topic mytopic' group id myconsumergroup'
> > >
> > > The only way I found to recover from this error is to change the group
> > > name. Any other advice on how to recover from this error?
> > >
> > >
> > > Thank you very much for following this up!
> > >
> > > On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas <ruben.var...@metova.com> 
> > > wrote:
> > >> Hello Jan
> > >>
> > >> Thanks for the suggestions
> > >>
> > >> Any benefit of using aligned vs unaligned?
> > >>
> > >>
> > >> At the end I found one problem that was preventing  flink from doing
> > >> the checkpointing. It was a DoFn function that has some "non
> > >> serializable" objects, so I made those transient and initialized those
> > >> on the setup.
> > >>
> > >> Weird, because I usually was able to detect these kinds of errors just
> > >> running in the direct runner, or even in flink before enabling EOS.
> > >>
> > >>
> > >> Now I'm facing another weird issue
> > >>
> > >> org.apache.beam.sdk.util.UserCodeException:
> > >> org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms
> > >> expired before the last committed offset for partitions
> > >> [behavioral-signals-6] could be determined. Try tuning
> > >> default.api.timeout.ms larger to relax the threshold.
> > >> at 
> > >> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> > >> at 
> > >> org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
> > >> Source)
> > >> at 
> > >> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
> > >>
> > >> I tried to extend the timeout and it didn't work, my shards are equal
> > >> to my number of partitions.
> > >>
> > >> I appreciate any kind of guidance
> > >>
> > >> Thanks.
> > >>
> > >> On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský <je...@seznam.cz> wrote:
> > >>> I'd suggest:
> > >>>   a) use unaligned checkpoints, if possible
> > >>>
> > >>>   b) verify the number of buckets you use for EOS sink, this limits 
> > >>> parallelism [1].
> > >>>
> > >>> Best,
> > >>>
> > >>>   Jan
> > >>>
> > >>> [1] 
> > >>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-
> > >>>
> > >>> On 6/18/24 09:32, Ruben Vargas wrote:
> > >>>
> > >>> Hello Lukavsky
> > >>>
> > >>> Thanks for your reply !
> > >>>
> > >>> I thought was due backpreassure but i increased the resources of the 
> > >>> cluster and problem still presist. More that that, data stop flowing 
> > >>> and the checkpoint still fail.
> > >>>
> > >>> I have configured the checkpoint to do it per minute. The timeout is 
> > >>> 1h. Is aligned checkpoint.
> > >>>
> > >>> El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský 
> > >>> <je...@seznam.cz> escribió:
> > >>>> H Ruben,
> > >>>>
> > >>>> from the provided screenshot it seems to me, that the pipeline in
> > >>>> backpressured by the sink. Can you please share your checkpoint
> > >>>> configuration? Are you using unaligned checkpoints? What is the
> > >>>> checkpointing interval and the volume of data coming in from the 
> > >>>> source?
> > >>>> With EOS data is committed after checkpoint, before that, the data is
> > >>>> buffered in state, which makes the sink more resource intensive.
> > >>>>
> > >>>>    Jan
> > >>>>
> > >>>> On 6/18/24 05:30, Ruben Vargas wrote:
> > >>>>> Attached a better image of the console.
> > >>>>>
> > >>>>> Thanks!
> > >>>>>
> > >>>>> On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas 
> > >>>>> <ruben.var...@metova.com> wrote:
> > >>>>>> Hello guys
> > >>>>>>
> > >>>>>> Wondering if some of you have experiences enabling Exactly Once in
> > >>>>>> KafkaIO with Flink runner? I enabled it and now I'm facing an issue
> > >>>>>> where all the checkpoints are failing. I cannot see any exception on
> > >>>>>> the logs.
> > >>>>>>
> > >>>>>> Flink console only mentions this "Asynchronous task checkpoint
> > >>>>>> failed." I also noticed that some operators don't acknowledge the
> > >>>>>> checkpointing  (Attached a screenshot).
> > >>>>>>
> > >>>>>> I did this:
> > >>>>>>
> > >>>>>> 1) KafkaIO.Read:
> > >>>>>>
> > >>>>>> update consumer properties with enable.auto.commit = false
> > >>>>>> .withReadCommitted()
> > >>>>>> .commitOffsetsInFinalize()
> > >>>>>>
> > >>>>>> 2) KafkaIO#write:
> > >>>>>>
> > >>>>>> .withEOS(numShards, sinkGroupId)
> > >>>>>>
> > >>>>>> But my application is not able to deliver messages to the output 
> > >>>>>> topic
> > >>>>>> due the checkpoint failing.
> > >>>>>> I also reviewed the timeout and other time sensitive parameters, 
> > >>>>>> those
> > >>>>>> are high right now.
> > >>>>>>
> > >>>>>> I really appreciate your guidance on this. Thank you

Attachment: tasks-utilization
Description: Binary data

Reply via email to