"I'm just saying that a message which processing throws an exception is
gone within minutes."

Is the message no longer in the topic or is your consumer group current
offset just higher than the offset of the message in question?

On Wed, Sep 25, 2019 at 7:38 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> You mean the retention time of the topic? That's one month
> I'm just saying that a message which processing throws an exception is gone
> within minutes.
>
> I will handle duplicates better later, if we can't be sure that we don't
> skip/lose these messages then it's useless to use kafka.
> That's why I'm thinking if there's something wrong with my code.
>
> --
> Alessandro Tagliapietra
>
>
> On Wed, Sep 25, 2019 at 4:29 PM M. Manna <manme...@gmail.com> wrote:
>
> > How long is your message retention set for ? Perhaps you want to increase
> > that to a large enough value.
> >
> > I have almost identical use case, but I would strongly recommend that you
> > handle duplicates as they are due to your process ( not Kafka duplicate).
> >
> > Regards,
> >
> > On Wed, 25 Sep 2019 at 22:37, Alessandro Tagliapietra <
> > tagliapietra.alessan...@gmail.com> wrote:
> >
> > > I've disabled the auto commit, so what I thought that code would do is:
> > >
> > >  - it fetches a message
> > >  - it processes a message
> > >  - if everything went fine it commits its offset
> > >  - if there's an exception, it didn't commit, so after the error it
> would
> > > just poll again and get the same message over and over
> > >
> > > instead it seems that message has been skipped somehow.
> > >
> > > In this case what the processing is doing is:
> > >  - opening new orders (with a start datetime and an uuid generated by
> the
> > > stream producing to the topic this consumer is consuming from)
> > >  - closing previously created orders (by setting an end datetime using
> > the
> > > ID stored in a store)
> > >
> > > And so far I've seen multiple exceptions like updating a non existent
> > order
> > > (meaning the create message didn't complete the request and it has been
> > > skipped) or create with same ID (meaning that the backend returned an
> > error
> > > but correctly created the record on the database).
> > > I would expect that the above code wouldn't just commit somehow and the
> > > consumer would be stuck there indefinitely.
> > >
> > > Regarding ignoring/handling duplicates that's definitely something that
> > > will be done, but right now i'm implementing it this way so that I can
> > > confirm that I don't lose messages if I don't manually commit them
> which
> > > doesn't seem to be the case
> > >
> > > Any help is really appreciated
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> > > On Wed, Sep 25, 2019 at 2:20 PM M. Manna <manme...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > How are you managing your offset commits ?
> > > >
> > > >  Also, if it’s a duplicate record issue ( sounds like database entry
> to
> > > > me), have you thought about ignoring/handling duplicates?
> > > >
> > > > Thanks,
> > > >
> > > > On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra <
> > > > tagliapietra.alessan...@gmail.com> wrote:
> > > >
> > > > > Hello everyone,
> > > > >
> > > > > I've a consumer that fetches messages from a topic, for each
> message
> > it
> > > > > makes an API call to our backend. To ensure that if a message fails
> > it
> > > > > tries again to process the message I've set max.poll.records to 1
> and
> > > > I've
> > > > > a code like this:
> > > > >
> > > > >   consumer.subscribe(arrayListOf("orders"))
> > > > >   while (!stopProcessing.get()) {
> > > > >       try {
> > > > >           val records = consumer.poll(Duration.ofHours(1))
> > > > >           records.forEach {
> > > > >               processRecord(it)
> > > > >           }
> > > > >           consumer.commitSync()
> > > > >       } catch (e: Exception) {
> > > > >           logger.error("Error processing order message", e)
> > > > >           Sentry.capture(e)
> > > > >           Thread.sleep(30000)
> > > > >       }
> > > > >   }
> > > > >
> > > > > now, if a request fails because the backend complains about a
> > duplicate
> > > > > primary ID, due to the nature of the error trying to insert the
> same
> > > > thing
> > > > > would generate that same error over and over again.
> > > > > Instead it seems that after some retries the message is skipped and
> > it
> > > > goes
> > > > > one with the next one.
> > > > > What could be the reason? If in the next loop iteration it gets a
> > > message
> > > > > from another partition it would also commit the offset of the other
> > > > failed
> > > > > partition?
> > > > >
> > > > > Thank you
> > > > >
> > > > > --
> > > > > Alessandro Tagliapietra
> > > > >
> > > >
> > >
> >
>

Reply via email to