Hello guys

Now I was able to pass that error.

I had to set the consumer factory function
.withConsumerFactoryFn(new KafkaConsumerFactory<Void,V>(config))

This is because my cluster uses SASL authentication mechanism, and the
small consumer created to fetch the topics metadata was throwing that
error.

There are other couple things I noticed:

 - Now I have a lot of backpressure, I assigned x3 resources to the
cluster and even with that the back pressure is high . Any advice on
this? I already increased the shards to equal the number of partitions
of the destination topic.

- I have an error where
"State exists for shard mytopic-0, but there is no state stored with
Kafka topic mytopic' group id myconsumergroup'

The only way I found to recover from this error is to change the group
name. Any other advice on how to recover from this error?


Thank you very much for following this up!

On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas <ruben.var...@metova.com> wrote:
>
> Hello Jan
>
> Thanks for the suggestions
>
> Any benefit of using aligned vs unaligned?
>
>
> At the end I found one problem that was preventing  flink from doing
> the checkpointing. It was a DoFn function that has some "non
> serializable" objects, so I made those transient and initialized those
> on the setup.
>
> Weird, because I usually was able to detect these kinds of errors just
> running in the direct runner, or even in flink before enabling EOS.
>
>
> Now I'm facing another weird issue
>
> org.apache.beam.sdk.util.UserCodeException:
> org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms
> expired before the last committed offset for partitions
> [behavioral-signals-6] could be determined. Try tuning
> default.api.timeout.ms larger to relax the threshold.
> at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> at 
> org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
>
> I tried to extend the timeout and it didn't work, my shards are equal
> to my number of partitions.
>
> I appreciate any kind of guidance
>
> Thanks.
>
> On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský <je...@seznam.cz> wrote:
> >
> > I'd suggest:
> >  a) use unaligned checkpoints, if possible
> >
> >  b) verify the number of buckets you use for EOS sink, this limits 
> > parallelism [1].
> >
> > Best,
> >
> >  Jan
> >
> > [1] 
> > https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-
> >
> > On 6/18/24 09:32, Ruben Vargas wrote:
> >
> > Hello Lukavsky
> >
> > Thanks for your reply !
> >
> > I thought was due backpreassure but i increased the resources of the 
> > cluster and problem still presist. More that that, data stop flowing and 
> > the checkpoint still fail.
> >
> > I have configured the checkpoint to do it per minute. The timeout is 1h. Is 
> > aligned checkpoint.
> >
> > El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský 
> > <je...@seznam.cz> escribió:
> >>
> >> H Ruben,
> >>
> >> from the provided screenshot it seems to me, that the pipeline in
> >> backpressured by the sink. Can you please share your checkpoint
> >> configuration? Are you using unaligned checkpoints? What is the
> >> checkpointing interval and the volume of data coming in from the source?
> >> With EOS data is committed after checkpoint, before that, the data is
> >> buffered in state, which makes the sink more resource intensive.
> >>
> >>   Jan
> >>
> >> On 6/18/24 05:30, Ruben Vargas wrote:
> >> > Attached a better image of the console.
> >> >
> >> > Thanks!
> >> >
> >> > On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas <ruben.var...@metova.com> 
> >> > wrote:
> >> >> Hello guys
> >> >>
> >> >> Wondering if some of you have experiences enabling Exactly Once in
> >> >> KafkaIO with Flink runner? I enabled it and now I'm facing an issue
> >> >> where all the checkpoints are failing. I cannot see any exception on
> >> >> the logs.
> >> >>
> >> >> Flink console only mentions this "Asynchronous task checkpoint
> >> >> failed." I also noticed that some operators don't acknowledge the
> >> >> checkpointing  (Attached a screenshot).
> >> >>
> >> >> I did this:
> >> >>
> >> >> 1) KafkaIO.Read:
> >> >>
> >> >> update consumer properties with enable.auto.commit = false
> >> >> .withReadCommitted()
> >> >> .commitOffsetsInFinalize()
> >> >>
> >> >> 2) KafkaIO#write:
> >> >>
> >> >> .withEOS(numShards, sinkGroupId)
> >> >>
> >> >> But my application is not able to deliver messages to the output topic
> >> >> due the checkpoint failing.
> >> >> I also reviewed the timeout and other time sensitive parameters, those
> >> >> are high right now.
> >> >>
> >> >> I really appreciate your guidance on this. Thank you

Reply via email to