The only way this works is if I don't catch the exception, let the consumer
crash and fully restart it.

Maybe the consumer has an internal state that always gets updated when it
receives a message during poll?

--
Alessandro Tagliapietra



On Wed, Sep 25, 2019 at 7:37 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> If I change my code slightly trying to manually commit offsets at
> partition level
>
>     while (true) {
>         try {
>             val records = consumer.poll(Duration.ofHours(1))
>             val partitionOffsetMap = mutableMapOf<TopicPartition,
> OffsetAndMetadata>()
>             records.forEach {
>                 partitionOffsetMap[TopicPartition(it.topic(),
> it.partition())] = OffsetAndMetadata(it.offset())
>                 println("Received message on key ${it.key()} with value
> ${it.value()} and offset ${it.offset()}")
>                 if (Random.nextInt(0, 100) > 80) {
>                     throw Exception("Thrown error on key ${it.key()} with
> value ${it.value()}")
>                 }
>             }
>             consumer.commitSync(partitionOffsetMap)
>             println("Committed offset
> ${partitionOffsetMap.values.first().offset()} on partition
> ${partitionOffsetMap.keys.first().partition()}")
>         } catch (e: Throwable) {
>             println("Error Processing message $e")
>             Thread.sleep(5000)
>         }
>     }
>
> it logs this
>
> Received message on key 2 with value 93 and offset 92
> Committed offset 92 on partition 2
> Received message on key 2 with value 94 and offset 93
> Error Processing message java.lang.Exception: Thrown error on key 2 with
> value 94
> Received message on key 2 with value 95 and offset 94
> Error Processing message java.lang.Exception: Thrown error on key 2 with
> value 95
> Received message on key 2 with value 96 and offset 95
> Committed offset 95 on partition 2
> Received message on key 2 with value 97 and offset 96
> Committed offset 96 on partition 2
> Received message on key 2 with value 98 and offset 97
> Error Processing message java.lang.Exception: Thrown error on key 2 with
> value 98
>
> as you can see the offset increases even without logging the "Committed
> offset ...." part
>
> --
> Alessandro Tagliapietra
>
>
> On Wed, Sep 25, 2019 at 7:09 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
>> It's still is in the topic because is weeks after the deletion threshold
>> (today's message with a 4 weeks retention).
>> So I assume the consumer just moves on to the next one.
>> As a test I've created this test script
>> https://gist.github.com/alex88/85ba5c3288a531a107ed6a22751c1088
>>
>> After running this I get in the logs:
>>
>> Received message on key 2 with value 1
>> Received message on key 2 with value 2
>> Received message on key 2 with value 3
>> Error Processing message java.lang.Exception: Thrown error on key 2 with
>> value 3
>> Received message on key 2 with value 4
>> Received message on key 2 with value 5
>> Received message on key 2 with value 6
>> Received message on key 2 with value 7
>> Received message on key 2 with value 8
>> Received message on key 2 with value 9
>> Received message on key 2 with value 10
>> Error Processing message java.lang.Exception: Thrown error on key 2 with
>> value 10
>> Received message on key 2 with value 11
>> Received message on key 2 with value 12
>> Received message on key 2 with value 13
>> Received message on key 2 with value 14
>> Received message on key 2 with value 15
>> Received message on key 2 with value 16
>> Error Processing message java.lang.Exception: Thrown error on key 2 with
>> value 16
>>
>> it seems that the offset just automatically increases, with another
>> client I also see all the values in the topic so they're not deleted.
>>
>> Shouldn't it have retried the message with value 3?
>>
>> --
>> Alessandro Tagliapietra
>>
>>
>> On Wed, Sep 25, 2019 at 6:19 PM Steve Howard <steve.how...@confluent.io>
>> wrote:
>>
>>> "I'm just saying that a message which processing throws an exception is
>>> gone within minutes."
>>>
>>> Is the message no longer in the topic or is your consumer group current
>>> offset just higher than the offset of the message in question?
>>>
>>> On Wed, Sep 25, 2019 at 7:38 PM Alessandro Tagliapietra <
>>> tagliapietra.alessan...@gmail.com> wrote:
>>>
>>> > You mean the retention time of the topic? That's one month
>>> > I'm just saying that a message which processing throws an exception is
>>> gone
>>> > within minutes.
>>> >
>>> > I will handle duplicates better later, if we can't be sure that we
>>> don't
>>> > skip/lose these messages then it's useless to use kafka.
>>> > That's why I'm thinking if there's something wrong with my code.
>>> >
>>> > --
>>> > Alessandro Tagliapietra
>>> >
>>> >
>>> > On Wed, Sep 25, 2019 at 4:29 PM M. Manna <manme...@gmail.com> wrote:
>>> >
>>> > > How long is your message retention set for ? Perhaps you want to
>>> increase
>>> > > that to a large enough value.
>>> > >
>>> > > I have almost identical use case, but I would strongly recommend
>>> that you
>>> > > handle duplicates as they are due to your process ( not Kafka
>>> duplicate).
>>> > >
>>> > > Regards,
>>> > >
>>> > > On Wed, 25 Sep 2019 at 22:37, Alessandro Tagliapietra <
>>> > > tagliapietra.alessan...@gmail.com> wrote:
>>> > >
>>> > > > I've disabled the auto commit, so what I thought that code would
>>> do is:
>>> > > >
>>> > > >  - it fetches a message
>>> > > >  - it processes a message
>>> > > >  - if everything went fine it commits its offset
>>> > > >  - if there's an exception, it didn't commit, so after the error it
>>> > would
>>> > > > just poll again and get the same message over and over
>>> > > >
>>> > > > instead it seems that message has been skipped somehow.
>>> > > >
>>> > > > In this case what the processing is doing is:
>>> > > >  - opening new orders (with a start datetime and an uuid generated
>>> by
>>> > the
>>> > > > stream producing to the topic this consumer is consuming from)
>>> > > >  - closing previously created orders (by setting an end datetime
>>> using
>>> > > the
>>> > > > ID stored in a store)
>>> > > >
>>> > > > And so far I've seen multiple exceptions like updating a non
>>> existent
>>> > > order
>>> > > > (meaning the create message didn't complete the request and it has
>>> been
>>> > > > skipped) or create with same ID (meaning that the backend returned
>>> an
>>> > > error
>>> > > > but correctly created the record on the database).
>>> > > > I would expect that the above code wouldn't just commit somehow
>>> and the
>>> > > > consumer would be stuck there indefinitely.
>>> > > >
>>> > > > Regarding ignoring/handling duplicates that's definitely something
>>> that
>>> > > > will be done, but right now i'm implementing it this way so that I
>>> can
>>> > > > confirm that I don't lose messages if I don't manually commit them
>>> > which
>>> > > > doesn't seem to be the case
>>> > > >
>>> > > > Any help is really appreciated
>>> > > >
>>> > > > --
>>> > > > Alessandro Tagliapietra
>>> > > >
>>> > > > On Wed, Sep 25, 2019 at 2:20 PM M. Manna <manme...@gmail.com>
>>> wrote:
>>> > > >
>>> > > > > Hi,
>>> > > > >
>>> > > > > How are you managing your offset commits ?
>>> > > > >
>>> > > > >  Also, if it’s a duplicate record issue ( sounds like database
>>> entry
>>> > to
>>> > > > > me), have you thought about ignoring/handling duplicates?
>>> > > > >
>>> > > > > Thanks,
>>> > > > >
>>> > > > > On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra <
>>> > > > > tagliapietra.alessan...@gmail.com> wrote:
>>> > > > >
>>> > > > > > Hello everyone,
>>> > > > > >
>>> > > > > > I've a consumer that fetches messages from a topic, for each
>>> > message
>>> > > it
>>> > > > > > makes an API call to our backend. To ensure that if a message
>>> fails
>>> > > it
>>> > > > > > tries again to process the message I've set max.poll.records
>>> to 1
>>> > and
>>> > > > > I've
>>> > > > > > a code like this:
>>> > > > > >
>>> > > > > >   consumer.subscribe(arrayListOf("orders"))
>>> > > > > >   while (!stopProcessing.get()) {
>>> > > > > >       try {
>>> > > > > >           val records = consumer.poll(Duration.ofHours(1))
>>> > > > > >           records.forEach {
>>> > > > > >               processRecord(it)
>>> > > > > >           }
>>> > > > > >           consumer.commitSync()
>>> > > > > >       } catch (e: Exception) {
>>> > > > > >           logger.error("Error processing order message", e)
>>> > > > > >           Sentry.capture(e)
>>> > > > > >           Thread.sleep(30000)
>>> > > > > >       }
>>> > > > > >   }
>>> > > > > >
>>> > > > > > now, if a request fails because the backend complains about a
>>> > > duplicate
>>> > > > > > primary ID, due to the nature of the error trying to insert the
>>> > same
>>> > > > > thing
>>> > > > > > would generate that same error over and over again.
>>> > > > > > Instead it seems that after some retries the message is
>>> skipped and
>>> > > it
>>> > > > > goes
>>> > > > > > one with the next one.
>>> > > > > > What could be the reason? If in the next loop iteration it
>>> gets a
>>> > > > message
>>> > > > > > from another partition it would also commit the offset of the
>>> other
>>> > > > > failed
>>> > > > > > partition?
>>> > > > > >
>>> > > > > > Thank you
>>> > > > > >
>>> > > > > > --
>>> > > > > > Alessandro Tagliapietra
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>

Reply via email to