It's still is in the topic because is weeks after the deletion threshold
(today's message with a 4 weeks retention).
So I assume the consumer just moves on to the next one.
As a test I've created this test script
https://gist.github.com/alex88/85ba5c3288a531a107ed6a22751c1088

After running this I get in the logs:

Received message on key 2 with value 1
Received message on key 2 with value 2
Received message on key 2 with value 3
Error Processing message java.lang.Exception: Thrown error on key 2 with
value 3
Received message on key 2 with value 4
Received message on key 2 with value 5
Received message on key 2 with value 6
Received message on key 2 with value 7
Received message on key 2 with value 8
Received message on key 2 with value 9
Received message on key 2 with value 10
Error Processing message java.lang.Exception: Thrown error on key 2 with
value 10
Received message on key 2 with value 11
Received message on key 2 with value 12
Received message on key 2 with value 13
Received message on key 2 with value 14
Received message on key 2 with value 15
Received message on key 2 with value 16
Error Processing message java.lang.Exception: Thrown error on key 2 with
value 16

it seems that the offset just automatically increases, with another client
I also see all the values in the topic so they're not deleted.

Shouldn't it have retried the message with value 3?

--
Alessandro Tagliapietra


On Wed, Sep 25, 2019 at 6:19 PM Steve Howard <steve.how...@confluent.io>
wrote:

> "I'm just saying that a message which processing throws an exception is
> gone within minutes."
>
> Is the message no longer in the topic or is your consumer group current
> offset just higher than the offset of the message in question?
>
> On Wed, Sep 25, 2019 at 7:38 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > You mean the retention time of the topic? That's one month
> > I'm just saying that a message which processing throws an exception is
> gone
> > within minutes.
> >
> > I will handle duplicates better later, if we can't be sure that we don't
> > skip/lose these messages then it's useless to use kafka.
> > That's why I'm thinking if there's something wrong with my code.
> >
> > --
> > Alessandro Tagliapietra
> >
> >
> > On Wed, Sep 25, 2019 at 4:29 PM M. Manna <manme...@gmail.com> wrote:
> >
> > > How long is your message retention set for ? Perhaps you want to
> increase
> > > that to a large enough value.
> > >
> > > I have almost identical use case, but I would strongly recommend that
> you
> > > handle duplicates as they are due to your process ( not Kafka
> duplicate).
> > >
> > > Regards,
> > >
> > > On Wed, 25 Sep 2019 at 22:37, Alessandro Tagliapietra <
> > > tagliapietra.alessan...@gmail.com> wrote:
> > >
> > > > I've disabled the auto commit, so what I thought that code would do
> is:
> > > >
> > > >  - it fetches a message
> > > >  - it processes a message
> > > >  - if everything went fine it commits its offset
> > > >  - if there's an exception, it didn't commit, so after the error it
> > would
> > > > just poll again and get the same message over and over
> > > >
> > > > instead it seems that message has been skipped somehow.
> > > >
> > > > In this case what the processing is doing is:
> > > >  - opening new orders (with a start datetime and an uuid generated by
> > the
> > > > stream producing to the topic this consumer is consuming from)
> > > >  - closing previously created orders (by setting an end datetime
> using
> > > the
> > > > ID stored in a store)
> > > >
> > > > And so far I've seen multiple exceptions like updating a non existent
> > > order
> > > > (meaning the create message didn't complete the request and it has
> been
> > > > skipped) or create with same ID (meaning that the backend returned an
> > > error
> > > > but correctly created the record on the database).
> > > > I would expect that the above code wouldn't just commit somehow and
> the
> > > > consumer would be stuck there indefinitely.
> > > >
> > > > Regarding ignoring/handling duplicates that's definitely something
> that
> > > > will be done, but right now i'm implementing it this way so that I
> can
> > > > confirm that I don't lose messages if I don't manually commit them
> > which
> > > > doesn't seem to be the case
> > > >
> > > > Any help is really appreciated
> > > >
> > > > --
> > > > Alessandro Tagliapietra
> > > >
> > > > On Wed, Sep 25, 2019 at 2:20 PM M. Manna <manme...@gmail.com> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > How are you managing your offset commits ?
> > > > >
> > > > >  Also, if it’s a duplicate record issue ( sounds like database
> entry
> > to
> > > > > me), have you thought about ignoring/handling duplicates?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra <
> > > > > tagliapietra.alessan...@gmail.com> wrote:
> > > > >
> > > > > > Hello everyone,
> > > > > >
> > > > > > I've a consumer that fetches messages from a topic, for each
> > message
> > > it
> > > > > > makes an API call to our backend. To ensure that if a message
> fails
> > > it
> > > > > > tries again to process the message I've set max.poll.records to 1
> > and
> > > > > I've
> > > > > > a code like this:
> > > > > >
> > > > > >   consumer.subscribe(arrayListOf("orders"))
> > > > > >   while (!stopProcessing.get()) {
> > > > > >       try {
> > > > > >           val records = consumer.poll(Duration.ofHours(1))
> > > > > >           records.forEach {
> > > > > >               processRecord(it)
> > > > > >           }
> > > > > >           consumer.commitSync()
> > > > > >       } catch (e: Exception) {
> > > > > >           logger.error("Error processing order message", e)
> > > > > >           Sentry.capture(e)
> > > > > >           Thread.sleep(30000)
> > > > > >       }
> > > > > >   }
> > > > > >
> > > > > > now, if a request fails because the backend complains about a
> > > duplicate
> > > > > > primary ID, due to the nature of the error trying to insert the
> > same
> > > > > thing
> > > > > > would generate that same error over and over again.
> > > > > > Instead it seems that after some retries the message is skipped
> and
> > > it
> > > > > goes
> > > > > > one with the next one.
> > > > > > What could be the reason? If in the next loop iteration it gets a
> > > > message
> > > > > > from another partition it would also commit the offset of the
> other
> > > > > failed
> > > > > > partition?
> > > > > >
> > > > > > Thank you
> > > > > >
> > > > > > --
> > > > > > Alessandro Tagliapietra
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to