You mean the retention time of the topic? That's one month
I'm just saying that a message which processing throws an exception is gone
within minutes.

I will handle duplicates better later, if we can't be sure that we don't
skip/lose these messages then it's useless to use kafka.
That's why I'm thinking if there's something wrong with my code.

--
Alessandro Tagliapietra


On Wed, Sep 25, 2019 at 4:29 PM M. Manna <manme...@gmail.com> wrote:

> How long is your message retention set for ? Perhaps you want to increase
> that to a large enough value.
>
> I have almost identical use case, but I would strongly recommend that you
> handle duplicates as they are due to your process ( not Kafka duplicate).
>
> Regards,
>
> On Wed, 25 Sep 2019 at 22:37, Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > I've disabled the auto commit, so what I thought that code would do is:
> >
> >  - it fetches a message
> >  - it processes a message
> >  - if everything went fine it commits its offset
> >  - if there's an exception, it didn't commit, so after the error it would
> > just poll again and get the same message over and over
> >
> > instead it seems that message has been skipped somehow.
> >
> > In this case what the processing is doing is:
> >  - opening new orders (with a start datetime and an uuid generated by the
> > stream producing to the topic this consumer is consuming from)
> >  - closing previously created orders (by setting an end datetime using
> the
> > ID stored in a store)
> >
> > And so far I've seen multiple exceptions like updating a non existent
> order
> > (meaning the create message didn't complete the request and it has been
> > skipped) or create with same ID (meaning that the backend returned an
> error
> > but correctly created the record on the database).
> > I would expect that the above code wouldn't just commit somehow and the
> > consumer would be stuck there indefinitely.
> >
> > Regarding ignoring/handling duplicates that's definitely something that
> > will be done, but right now i'm implementing it this way so that I can
> > confirm that I don't lose messages if I don't manually commit them which
> > doesn't seem to be the case
> >
> > Any help is really appreciated
> >
> > --
> > Alessandro Tagliapietra
> >
> > On Wed, Sep 25, 2019 at 2:20 PM M. Manna <manme...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > How are you managing your offset commits ?
> > >
> > >  Also, if it’s a duplicate record issue ( sounds like database entry to
> > > me), have you thought about ignoring/handling duplicates?
> > >
> > > Thanks,
> > >
> > > On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra <
> > > tagliapietra.alessan...@gmail.com> wrote:
> > >
> > > > Hello everyone,
> > > >
> > > > I've a consumer that fetches messages from a topic, for each message
> it
> > > > makes an API call to our backend. To ensure that if a message fails
> it
> > > > tries again to process the message I've set max.poll.records to 1 and
> > > I've
> > > > a code like this:
> > > >
> > > >   consumer.subscribe(arrayListOf("orders"))
> > > >   while (!stopProcessing.get()) {
> > > >       try {
> > > >           val records = consumer.poll(Duration.ofHours(1))
> > > >           records.forEach {
> > > >               processRecord(it)
> > > >           }
> > > >           consumer.commitSync()
> > > >       } catch (e: Exception) {
> > > >           logger.error("Error processing order message", e)
> > > >           Sentry.capture(e)
> > > >           Thread.sleep(30000)
> > > >       }
> > > >   }
> > > >
> > > > now, if a request fails because the backend complains about a
> duplicate
> > > > primary ID, due to the nature of the error trying to insert the same
> > > thing
> > > > would generate that same error over and over again.
> > > > Instead it seems that after some retries the message is skipped and
> it
> > > goes
> > > > one with the next one.
> > > > What could be the reason? If in the next loop iteration it gets a
> > message
> > > > from another partition it would also commit the offset of the other
> > > failed
> > > > partition?
> > > >
> > > > Thank you
> > > >
> > > > --
> > > > Alessandro Tagliapietra
> > > >
> > >
> >
>

Reply via email to