consumer offset commit can be controlled automatically or manually. What
you are mentioning here is your own application problems. As I previously
recommended, you should not hold your application in infinite loop just to
process these failed messages.

Also, I would certainly not recommend stopping at a specific offset forever
as you are saying. Kafka is a service which delivers you the messages (with
consistency and speed). What/how of the message is entirely up to you.
Using auto.offset.reset you have a choice to start either from the
beginning, or the latest.

Thanks,

On Thu, 26 Sep 2019 at 04:14, Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> Ok I think I've found the problem
>
> looking at
>
> https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-
> it
> says:
>
> On each poll, consumer will try to use the last consumed offset as the
> > starting offset and fetch sequentially. The last consumed offset can be
> > manually set through seek(TopicPartition, long) or automatically set as
> the
> > last committed offset for the subscribed list of partitions
>
>
> so the part where it says "automatically set as the last committed offset"
> happens only on consumer start, when not restarting the consumer, what is
> happening is "consumer will try to use the last consumed offset" meaning
> that it'll use the last consumed offset no matter if you've committed it or
> not.
> So adapting my code to:
>
>     while (true) {
> >         val partitionOffsetMap = mutableMapOf<TopicPartition,
> > OffsetAndMetadata>()
> >         try {
> >             val records = consumer.poll(Duration.ofHours(1))
> >             records.forEach {
> >                 partitionOffsetMap[TopicPartition(it.topic(),
> > it.partition())] = OffsetAndMetadata(it.offset() + 1)
> >                 println("Received message on key ${it.key()} with value
> > ${it.value()} and offset ${it.offset()}")
> >                 if (Random.nextInt(0, 100) > 80) {
> >                     throw Exception("Thrown error on key ${it.key()} with
> > value ${it.value()}")
> >                 }
> >             }
> >             consumer.commitSync(partitionOffsetMap)
> >             println("Committed offset
> > ${partitionOffsetMap.values.first().offset()} on partition
> > ${partitionOffsetMap.keys.first().partition()}")
> >         } catch (e: Throwable) {
> >             println("Error Processing message, resetting offset $e")
> >             consumer.seek(partitionOffsetMap.keys.first(),
> > partitionOffsetMap.values.first().offset() - 1)
> >             Thread.sleep(5000)
> >         }
> >     }
>
>
> makes everything work fine:
>
> Received message on key 1 with value 1 and offset 0
> > Committed offset 1 on partition 0
> > Received message on key 1 with value 2 and offset 1
> > Committed offset 2 on partition 0
> > Received message on key 1 with value 3 and offset 2
> > Committed offset 3 on partition 0
> > Received message on key 1 with value 4 and offset 3
> > Error Processing message, resetting offset java.lang.Exception: Thrown
> > error on key 1 with value 4
> > [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
> > clientId=consumer-1, groupId=test-group-id] Seeking to offset 3 for
> > partition topic-0
> > Received message on key 1 with value 4 and offset 3
> > Error Processing message, resetting offset java.lang.Exception: Thrown
> > error on key 1 with value 4
> > [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
> > clientId=consumer-1, groupId=test-group-id] Seeking to offset 3 for
> > partition topic-0
> > Received message on key 1 with value 4 and offset 3
> > Committed offset 4 on partition 0
> > .......
> > Committed offset 19 on partition 0
> > Received message on key 1 with value 20 and offset 19
> > Error Processing message, resetting offset java.lang.Exception: Thrown
> > error on key 1 with value 20
> > [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
> > clientId=consumer-1, groupId=test-group-id] Seeking to offset 19 for
> > partition topic-0
> > --------- RESTART ---------
> > Received message on key 1 with value 20 and offset 19
> > Committed offset 20 on partition 0
> > Received message on key 1 with value 21 and offset 20
> > Committed offset 21 on partition 0
> > ......
>
>
> As you can see if an exception is caught it resets the offset to the same
> value before the consume, otherwise it commits the next offset.
> In fact, you can see that value 4 is reprocessed multiple times without a
> restart and value 20 which throws and exception is reprocessed after a
> restart because nothing has been committed.
>
> Now, kafka gurus, is this the best way to achieve this? Wouldn't be better
> to have a config like "enable.auto.advance.consume.offset" that disables
> the implicit behavior and gets advanced automatically on consume if it's
> true or on commit if it's false?
> Or is there already some config I've missed?
>
> Thanks
>
> --
> Alessandro Tagliapietra
>
>
> On Wed, Sep 25, 2019 at 7:55 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > The only way this works is if I don't catch the exception, let the
> > consumer crash and fully restart it.
> >
> > Maybe the consumer has an internal state that always gets updated when it
> > receives a message during poll?
> >
> > --
> > Alessandro Tagliapietra
> >
> >
> >
> > On Wed, Sep 25, 2019 at 7:37 PM Alessandro Tagliapietra <
> > tagliapietra.alessan...@gmail.com> wrote:
> >
> >> If I change my code slightly trying to manually commit offsets at
> >> partition level
> >>
> >>     while (true) {
> >>         try {
> >>             val records = consumer.poll(Duration.ofHours(1))
> >>             val partitionOffsetMap = mutableMapOf<TopicPartition,
> >> OffsetAndMetadata>()
> >>             records.forEach {
> >>                 partitionOffsetMap[TopicPartition(it.topic(),
> >> it.partition())] = OffsetAndMetadata(it.offset())
> >>                 println("Received message on key ${it.key()} with value
> >> ${it.value()} and offset ${it.offset()}")
> >>                 if (Random.nextInt(0, 100) > 80) {
> >>                     throw Exception("Thrown error on key ${it.key()}
> with
> >> value ${it.value()}")
> >>                 }
> >>             }
> >>             consumer.commitSync(partitionOffsetMap)
> >>             println("Committed offset
> >> ${partitionOffsetMap.values.first().offset()} on partition
> >> ${partitionOffsetMap.keys.first().partition()}")
> >>         } catch (e: Throwable) {
> >>             println("Error Processing message $e")
> >>             Thread.sleep(5000)
> >>         }
> >>     }
> >>
> >> it logs this
> >>
> >> Received message on key 2 with value 93 and offset 92
> >> Committed offset 92 on partition 2
> >> Received message on key 2 with value 94 and offset 93
> >> Error Processing message java.lang.Exception: Thrown error on key 2 with
> >> value 94
> >> Received message on key 2 with value 95 and offset 94
> >> Error Processing message java.lang.Exception: Thrown error on key 2 with
> >> value 95
> >> Received message on key 2 with value 96 and offset 95
> >> Committed offset 95 on partition 2
> >> Received message on key 2 with value 97 and offset 96
> >> Committed offset 96 on partition 2
> >> Received message on key 2 with value 98 and offset 97
> >> Error Processing message java.lang.Exception: Thrown error on key 2 with
> >> value 98
> >>
> >> as you can see the offset increases even without logging the "Committed
> >> offset ...." part
> >>
> >> --
> >> Alessandro Tagliapietra
> >>
> >>
> >> On Wed, Sep 25, 2019 at 7:09 PM Alessandro Tagliapietra <
> >> tagliapietra.alessan...@gmail.com> wrote:
> >>
> >>> It's still is in the topic because is weeks after the deletion
> threshold
> >>> (today's message with a 4 weeks retention).
> >>> So I assume the consumer just moves on to the next one.
> >>> As a test I've created this test script
> >>> https://gist.github.com/alex88/85ba5c3288a531a107ed6a22751c1088
> >>>
> >>> After running this I get in the logs:
> >>>
> >>> Received message on key 2 with value 1
> >>> Received message on key 2 with value 2
> >>> Received message on key 2 with value 3
> >>> Error Processing message java.lang.Exception: Thrown error on key 2
> with
> >>> value 3
> >>> Received message on key 2 with value 4
> >>> Received message on key 2 with value 5
> >>> Received message on key 2 with value 6
> >>> Received message on key 2 with value 7
> >>> Received message on key 2 with value 8
> >>> Received message on key 2 with value 9
> >>> Received message on key 2 with value 10
> >>> Error Processing message java.lang.Exception: Thrown error on key 2
> with
> >>> value 10
> >>> Received message on key 2 with value 11
> >>> Received message on key 2 with value 12
> >>> Received message on key 2 with value 13
> >>> Received message on key 2 with value 14
> >>> Received message on key 2 with value 15
> >>> Received message on key 2 with value 16
> >>> Error Processing message java.lang.Exception: Thrown error on key 2
> with
> >>> value 16
> >>>
> >>> it seems that the offset just automatically increases, with another
> >>> client I also see all the values in the topic so they're not deleted.
> >>>
> >>> Shouldn't it have retried the message with value 3?
> >>>
> >>> --
> >>> Alessandro Tagliapietra
> >>>
> >>>
> >>> On Wed, Sep 25, 2019 at 6:19 PM Steve Howard <
> steve.how...@confluent.io>
> >>> wrote:
> >>>
> >>>> "I'm just saying that a message which processing throws an exception
> is
> >>>> gone within minutes."
> >>>>
> >>>> Is the message no longer in the topic or is your consumer group
> current
> >>>> offset just higher than the offset of the message in question?
> >>>>
> >>>> On Wed, Sep 25, 2019 at 7:38 PM Alessandro Tagliapietra <
> >>>> tagliapietra.alessan...@gmail.com> wrote:
> >>>>
> >>>> > You mean the retention time of the topic? That's one month
> >>>> > I'm just saying that a message which processing throws an exception
> >>>> is gone
> >>>> > within minutes.
> >>>> >
> >>>> > I will handle duplicates better later, if we can't be sure that we
> >>>> don't
> >>>> > skip/lose these messages then it's useless to use kafka.
> >>>> > That's why I'm thinking if there's something wrong with my code.
> >>>> >
> >>>> > --
> >>>> > Alessandro Tagliapietra
> >>>> >
> >>>> >
> >>>> > On Wed, Sep 25, 2019 at 4:29 PM M. Manna <manme...@gmail.com>
> wrote:
> >>>> >
> >>>> > > How long is your message retention set for ? Perhaps you want to
> >>>> increase
> >>>> > > that to a large enough value.
> >>>> > >
> >>>> > > I have almost identical use case, but I would strongly recommend
> >>>> that you
> >>>> > > handle duplicates as they are due to your process ( not Kafka
> >>>> duplicate).
> >>>> > >
> >>>> > > Regards,
> >>>> > >
> >>>> > > On Wed, 25 Sep 2019 at 22:37, Alessandro Tagliapietra <
> >>>> > > tagliapietra.alessan...@gmail.com> wrote:
> >>>> > >
> >>>> > > > I've disabled the auto commit, so what I thought that code would
> >>>> do is:
> >>>> > > >
> >>>> > > >  - it fetches a message
> >>>> > > >  - it processes a message
> >>>> > > >  - if everything went fine it commits its offset
> >>>> > > >  - if there's an exception, it didn't commit, so after the error
> >>>> it
> >>>> > would
> >>>> > > > just poll again and get the same message over and over
> >>>> > > >
> >>>> > > > instead it seems that message has been skipped somehow.
> >>>> > > >
> >>>> > > > In this case what the processing is doing is:
> >>>> > > >  - opening new orders (with a start datetime and an uuid
> >>>> generated by
> >>>> > the
> >>>> > > > stream producing to the topic this consumer is consuming from)
> >>>> > > >  - closing previously created orders (by setting an end datetime
> >>>> using
> >>>> > > the
> >>>> > > > ID stored in a store)
> >>>> > > >
> >>>> > > > And so far I've seen multiple exceptions like updating a non
> >>>> existent
> >>>> > > order
> >>>> > > > (meaning the create message didn't complete the request and it
> >>>> has been
> >>>> > > > skipped) or create with same ID (meaning that the backend
> >>>> returned an
> >>>> > > error
> >>>> > > > but correctly created the record on the database).
> >>>> > > > I would expect that the above code wouldn't just commit somehow
> >>>> and the
> >>>> > > > consumer would be stuck there indefinitely.
> >>>> > > >
> >>>> > > > Regarding ignoring/handling duplicates that's definitely
> >>>> something that
> >>>> > > > will be done, but right now i'm implementing it this way so that
> >>>> I can
> >>>> > > > confirm that I don't lose messages if I don't manually commit
> them
> >>>> > which
> >>>> > > > doesn't seem to be the case
> >>>> > > >
> >>>> > > > Any help is really appreciated
> >>>> > > >
> >>>> > > > --
> >>>> > > > Alessandro Tagliapietra
> >>>> > > >
> >>>> > > > On Wed, Sep 25, 2019 at 2:20 PM M. Manna <manme...@gmail.com>
> >>>> wrote:
> >>>> > > >
> >>>> > > > > Hi,
> >>>> > > > >
> >>>> > > > > How are you managing your offset commits ?
> >>>> > > > >
> >>>> > > > >  Also, if it’s a duplicate record issue ( sounds like database
> >>>> entry
> >>>> > to
> >>>> > > > > me), have you thought about ignoring/handling duplicates?
> >>>> > > > >
> >>>> > > > > Thanks,
> >>>> > > > >
> >>>> > > > > On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra <
> >>>> > > > > tagliapietra.alessan...@gmail.com> wrote:
> >>>> > > > >
> >>>> > > > > > Hello everyone,
> >>>> > > > > >
> >>>> > > > > > I've a consumer that fetches messages from a topic, for each
> >>>> > message
> >>>> > > it
> >>>> > > > > > makes an API call to our backend. To ensure that if a
> message
> >>>> fails
> >>>> > > it
> >>>> > > > > > tries again to process the message I've set max.poll.records
> >>>> to 1
> >>>> > and
> >>>> > > > > I've
> >>>> > > > > > a code like this:
> >>>> > > > > >
> >>>> > > > > >   consumer.subscribe(arrayListOf("orders"))
> >>>> > > > > >   while (!stopProcessing.get()) {
> >>>> > > > > >       try {
> >>>> > > > > >           val records = consumer.poll(Duration.ofHours(1))
> >>>> > > > > >           records.forEach {
> >>>> > > > > >               processRecord(it)
> >>>> > > > > >           }
> >>>> > > > > >           consumer.commitSync()
> >>>> > > > > >       } catch (e: Exception) {
> >>>> > > > > >           logger.error("Error processing order message", e)
> >>>> > > > > >           Sentry.capture(e)
> >>>> > > > > >           Thread.sleep(30000)
> >>>> > > > > >       }
> >>>> > > > > >   }
> >>>> > > > > >
> >>>> > > > > > now, if a request fails because the backend complains about
> a
> >>>> > > duplicate
> >>>> > > > > > primary ID, due to the nature of the error trying to insert
> >>>> the
> >>>> > same
> >>>> > > > > thing
> >>>> > > > > > would generate that same error over and over again.
> >>>> > > > > > Instead it seems that after some retries the message is
> >>>> skipped and
> >>>> > > it
> >>>> > > > > goes
> >>>> > > > > > one with the next one.
> >>>> > > > > > What could be the reason? If in the next loop iteration it
> >>>> gets a
> >>>> > > > message
> >>>> > > > > > from another partition it would also commit the offset of
> the
> >>>> other
> >>>> > > > > failed
> >>>> > > > > > partition?
> >>>> > > > > >
> >>>> > > > > > Thank you
> >>>> > > > > >
> >>>> > > > > > --
> >>>> > > > > > Alessandro Tagliapietra
> >>>> > > > > >
> >>>> > > > >
> >>>> > > >
> >>>> > >
> >>>> >
> >>>>
> >>>
>

Reply via email to