Image as not correctly attached. sending it again. Sorry

Thanks

On Thu, Jun 20, 2024 at 9:25 PM Ruben Vargas <ruben.var...@metova.com> wrote:
>
> Hello guys, me again
>
> I was trying to debug the issue with the  backpressure and I noticed
> that even if I set the shards = 16, not all tasks are receiving
> messages (attaching screenshot). You know potential causes and
> solutions?
>
> I really appreciate any help you can provide
>
>
> Thank you very much!
>
> Regards.
>
>
> On Wed, Jun 19, 2024 at 11:09 PM Ruben Vargas <ruben.var...@metova.com> wrote:
> >
> > Hello again
> >
> > Thank you for all the suggestions.
> >
> > Unfortunately if I put more shards than partitions it throws me this 
> > exception
> >
> > "message": 
> > "PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
> > ids -> ToGBKResult ->
> > PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
> > to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
> > (4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
> > FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
> > java.lang.RuntimeException:
> > java.lang.reflect.InvocationTargetException\n\tat
> > ..
> > ..
> > ..
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat
> > java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
> > org.apache.kafka.common.errors.TimeoutException: Timeout expired after
> > 60000ms while awaiting AddOffsetsToTxn\n",
> >
> >
> > Any other alternative? Thank you very much!
> >
> > Regards
> >
> > On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský <je...@seznam.cz> wrote:
> > >
> > > Hi,
> > >
> > > regarding aligned vs unaligned checkpoints I recommend reading [1], it
> > > explains it quite well. Generally, I would prefer unaligned checkpoints
> > > in this case.
> > >
> > > Another thing to consider is the number of shards of the EOS sink.
> > > Because how the shards are distributed among workers, it might be good
> > > idea to actually increase that to some number higher than number of
> > > target partitions (e.g. targetPartitions * 10 or so). Additional thing
> > > to consider is increasing maxParallelism of the pipeline (e.g. max value
> > > is 32768), as it also affects how 'evenly' Flink assigns shards to
> > > workers. You can check if the assignment is even using counters in the
> > > sink operator(s).
> > >
> > >   Jan
> > >
> > > [1]
> > > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/
> > >
> > > On 6/19/24 05:15, Ruben Vargas wrote:
> > > > Hello guys
> > > >
> > > > Now I was able to pass that error.
> > > >
> > > > I had to set the consumer factory function
> > > > .withConsumerFactoryFn(new KafkaConsumerFactory<Void,V>(config))
> > > >
> > > > This is because my cluster uses SASL authentication mechanism, and the
> > > > small consumer created to fetch the topics metadata was throwing that
> > > > error.
> > > >
> > > > There are other couple things I noticed:
> > > >
> > > >   - Now I have a lot of backpressure, I assigned x3 resources to the
> > > > cluster and even with that the back pressure is high . Any advice on
> > > > this? I already increased the shards to equal the number of partitions
> > > > of the destination topic.
> > > >
> > > > - I have an error where
> > > > "State exists for shard mytopic-0, but there is no state stored with
> > > > Kafka topic mytopic' group id myconsumergroup'
> > > >
> > > > The only way I found to recover from this error is to change the group
> > > > name. Any other advice on how to recover from this error?
> > > >
> > > >
> > > > Thank you very much for following this up!
> > > >
> > > > On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas <ruben.var...@metova.com> 
> > > > wrote:
> > > >> Hello Jan
> > > >>
> > > >> Thanks for the suggestions
> > > >>
> > > >> Any benefit of using aligned vs unaligned?
> > > >>
> > > >>
> > > >> At the end I found one problem that was preventing  flink from doing
> > > >> the checkpointing. It was a DoFn function that has some "non
> > > >> serializable" objects, so I made those transient and initialized those
> > > >> on the setup.
> > > >>
> > > >> Weird, because I usually was able to detect these kinds of errors just
> > > >> running in the direct runner, or even in flink before enabling EOS.
> > > >>
> > > >>
> > > >> Now I'm facing another weird issue
> > > >>
> > > >> org.apache.beam.sdk.util.UserCodeException:
> > > >> org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms
> > > >> expired before the last committed offset for partitions
> > > >> [behavioral-signals-6] could be determined. Try tuning
> > > >> default.api.timeout.ms larger to relax the threshold.
> > > >> at 
> > > >> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> > > >> at 
> > > >> org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
> > > >> Source)
> > > >> at 
> > > >> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
> > > >>
> > > >> I tried to extend the timeout and it didn't work, my shards are equal
> > > >> to my number of partitions.
> > > >>
> > > >> I appreciate any kind of guidance
> > > >>
> > > >> Thanks.
> > > >>
> > > >> On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský <je...@seznam.cz> wrote:
> > > >>> I'd suggest:
> > > >>>   a) use unaligned checkpoints, if possible
> > > >>>
> > > >>>   b) verify the number of buckets you use for EOS sink, this limits 
> > > >>> parallelism [1].
> > > >>>
> > > >>> Best,
> > > >>>
> > > >>>   Jan
> > > >>>
> > > >>> [1] 
> > > >>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-
> > > >>>
> > > >>> On 6/18/24 09:32, Ruben Vargas wrote:
> > > >>>
> > > >>> Hello Lukavsky
> > > >>>
> > > >>> Thanks for your reply !
> > > >>>
> > > >>> I thought was due backpreassure but i increased the resources of the 
> > > >>> cluster and problem still presist. More that that, data stop flowing 
> > > >>> and the checkpoint still fail.
> > > >>>
> > > >>> I have configured the checkpoint to do it per minute. The timeout is 
> > > >>> 1h. Is aligned checkpoint.
> > > >>>
> > > >>> El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský 
> > > >>> <je...@seznam.cz> escribió:
> > > >>>> H Ruben,
> > > >>>>
> > > >>>> from the provided screenshot it seems to me, that the pipeline in
> > > >>>> backpressured by the sink. Can you please share your checkpoint
> > > >>>> configuration? Are you using unaligned checkpoints? What is the
> > > >>>> checkpointing interval and the volume of data coming in from the 
> > > >>>> source?
> > > >>>> With EOS data is committed after checkpoint, before that, the data is
> > > >>>> buffered in state, which makes the sink more resource intensive.
> > > >>>>
> > > >>>>    Jan
> > > >>>>
> > > >>>> On 6/18/24 05:30, Ruben Vargas wrote:
> > > >>>>> Attached a better image of the console.
> > > >>>>>
> > > >>>>> Thanks!
> > > >>>>>
> > > >>>>> On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas 
> > > >>>>> <ruben.var...@metova.com> wrote:
> > > >>>>>> Hello guys
> > > >>>>>>
> > > >>>>>> Wondering if some of you have experiences enabling Exactly Once in
> > > >>>>>> KafkaIO with Flink runner? I enabled it and now I'm facing an issue
> > > >>>>>> where all the checkpoints are failing. I cannot see any exception 
> > > >>>>>> on
> > > >>>>>> the logs.
> > > >>>>>>
> > > >>>>>> Flink console only mentions this "Asynchronous task checkpoint
> > > >>>>>> failed." I also noticed that some operators don't acknowledge the
> > > >>>>>> checkpointing  (Attached a screenshot).
> > > >>>>>>
> > > >>>>>> I did this:
> > > >>>>>>
> > > >>>>>> 1) KafkaIO.Read:
> > > >>>>>>
> > > >>>>>> update consumer properties with enable.auto.commit = false
> > > >>>>>> .withReadCommitted()
> > > >>>>>> .commitOffsetsInFinalize()
> > > >>>>>>
> > > >>>>>> 2) KafkaIO#write:
> > > >>>>>>
> > > >>>>>> .withEOS(numShards, sinkGroupId)
> > > >>>>>>
> > > >>>>>> But my application is not able to deliver messages to the output 
> > > >>>>>> topic
> > > >>>>>> due the checkpoint failing.
> > > >>>>>> I also reviewed the timeout and other time sensitive parameters, 
> > > >>>>>> those
> > > >>>>>> are high right now.
> > > >>>>>>
> > > >>>>>> I really appreciate your guidance on this. Thank you

Reply via email to