The only way this works is if I don't catch the exception, let the consumer crash and fully restart it.
Maybe the consumer has an internal state that always gets updated when it receives a message during poll? -- Alessandro Tagliapietra On Wed, Sep 25, 2019 at 7:37 PM Alessandro Tagliapietra < tagliapietra.alessan...@gmail.com> wrote: > If I change my code slightly trying to manually commit offsets at > partition level > > while (true) { > try { > val records = consumer.poll(Duration.ofHours(1)) > val partitionOffsetMap = mutableMapOf<TopicPartition, > OffsetAndMetadata>() > records.forEach { > partitionOffsetMap[TopicPartition(it.topic(), > it.partition())] = OffsetAndMetadata(it.offset()) > println("Received message on key ${it.key()} with value > ${it.value()} and offset ${it.offset()}") > if (Random.nextInt(0, 100) > 80) { > throw Exception("Thrown error on key ${it.key()} with > value ${it.value()}") > } > } > consumer.commitSync(partitionOffsetMap) > println("Committed offset > ${partitionOffsetMap.values.first().offset()} on partition > ${partitionOffsetMap.keys.first().partition()}") > } catch (e: Throwable) { > println("Error Processing message $e") > Thread.sleep(5000) > } > } > > it logs this > > Received message on key 2 with value 93 and offset 92 > Committed offset 92 on partition 2 > Received message on key 2 with value 94 and offset 93 > Error Processing message java.lang.Exception: Thrown error on key 2 with > value 94 > Received message on key 2 with value 95 and offset 94 > Error Processing message java.lang.Exception: Thrown error on key 2 with > value 95 > Received message on key 2 with value 96 and offset 95 > Committed offset 95 on partition 2 > Received message on key 2 with value 97 and offset 96 > Committed offset 96 on partition 2 > Received message on key 2 with value 98 and offset 97 > Error Processing message java.lang.Exception: Thrown error on key 2 with > value 98 > > as you can see the offset increases even without logging the "Committed > offset ...." part > > -- > Alessandro Tagliapietra > > > On Wed, Sep 25, 2019 at 7:09 PM Alessandro Tagliapietra < > tagliapietra.alessan...@gmail.com> wrote: > >> It's still is in the topic because is weeks after the deletion threshold >> (today's message with a 4 weeks retention). >> So I assume the consumer just moves on to the next one. >> As a test I've created this test script >> https://gist.github.com/alex88/85ba5c3288a531a107ed6a22751c1088 >> >> After running this I get in the logs: >> >> Received message on key 2 with value 1 >> Received message on key 2 with value 2 >> Received message on key 2 with value 3 >> Error Processing message java.lang.Exception: Thrown error on key 2 with >> value 3 >> Received message on key 2 with value 4 >> Received message on key 2 with value 5 >> Received message on key 2 with value 6 >> Received message on key 2 with value 7 >> Received message on key 2 with value 8 >> Received message on key 2 with value 9 >> Received message on key 2 with value 10 >> Error Processing message java.lang.Exception: Thrown error on key 2 with >> value 10 >> Received message on key 2 with value 11 >> Received message on key 2 with value 12 >> Received message on key 2 with value 13 >> Received message on key 2 with value 14 >> Received message on key 2 with value 15 >> Received message on key 2 with value 16 >> Error Processing message java.lang.Exception: Thrown error on key 2 with >> value 16 >> >> it seems that the offset just automatically increases, with another >> client I also see all the values in the topic so they're not deleted. >> >> Shouldn't it have retried the message with value 3? >> >> -- >> Alessandro Tagliapietra >> >> >> On Wed, Sep 25, 2019 at 6:19 PM Steve Howard <steve.how...@confluent.io> >> wrote: >> >>> "I'm just saying that a message which processing throws an exception is >>> gone within minutes." >>> >>> Is the message no longer in the topic or is your consumer group current >>> offset just higher than the offset of the message in question? >>> >>> On Wed, Sep 25, 2019 at 7:38 PM Alessandro Tagliapietra < >>> tagliapietra.alessan...@gmail.com> wrote: >>> >>> > You mean the retention time of the topic? That's one month >>> > I'm just saying that a message which processing throws an exception is >>> gone >>> > within minutes. >>> > >>> > I will handle duplicates better later, if we can't be sure that we >>> don't >>> > skip/lose these messages then it's useless to use kafka. >>> > That's why I'm thinking if there's something wrong with my code. >>> > >>> > -- >>> > Alessandro Tagliapietra >>> > >>> > >>> > On Wed, Sep 25, 2019 at 4:29 PM M. Manna <manme...@gmail.com> wrote: >>> > >>> > > How long is your message retention set for ? Perhaps you want to >>> increase >>> > > that to a large enough value. >>> > > >>> > > I have almost identical use case, but I would strongly recommend >>> that you >>> > > handle duplicates as they are due to your process ( not Kafka >>> duplicate). >>> > > >>> > > Regards, >>> > > >>> > > On Wed, 25 Sep 2019 at 22:37, Alessandro Tagliapietra < >>> > > tagliapietra.alessan...@gmail.com> wrote: >>> > > >>> > > > I've disabled the auto commit, so what I thought that code would >>> do is: >>> > > > >>> > > > - it fetches a message >>> > > > - it processes a message >>> > > > - if everything went fine it commits its offset >>> > > > - if there's an exception, it didn't commit, so after the error it >>> > would >>> > > > just poll again and get the same message over and over >>> > > > >>> > > > instead it seems that message has been skipped somehow. >>> > > > >>> > > > In this case what the processing is doing is: >>> > > > - opening new orders (with a start datetime and an uuid generated >>> by >>> > the >>> > > > stream producing to the topic this consumer is consuming from) >>> > > > - closing previously created orders (by setting an end datetime >>> using >>> > > the >>> > > > ID stored in a store) >>> > > > >>> > > > And so far I've seen multiple exceptions like updating a non >>> existent >>> > > order >>> > > > (meaning the create message didn't complete the request and it has >>> been >>> > > > skipped) or create with same ID (meaning that the backend returned >>> an >>> > > error >>> > > > but correctly created the record on the database). >>> > > > I would expect that the above code wouldn't just commit somehow >>> and the >>> > > > consumer would be stuck there indefinitely. >>> > > > >>> > > > Regarding ignoring/handling duplicates that's definitely something >>> that >>> > > > will be done, but right now i'm implementing it this way so that I >>> can >>> > > > confirm that I don't lose messages if I don't manually commit them >>> > which >>> > > > doesn't seem to be the case >>> > > > >>> > > > Any help is really appreciated >>> > > > >>> > > > -- >>> > > > Alessandro Tagliapietra >>> > > > >>> > > > On Wed, Sep 25, 2019 at 2:20 PM M. Manna <manme...@gmail.com> >>> wrote: >>> > > > >>> > > > > Hi, >>> > > > > >>> > > > > How are you managing your offset commits ? >>> > > > > >>> > > > > Also, if it’s a duplicate record issue ( sounds like database >>> entry >>> > to >>> > > > > me), have you thought about ignoring/handling duplicates? >>> > > > > >>> > > > > Thanks, >>> > > > > >>> > > > > On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra < >>> > > > > tagliapietra.alessan...@gmail.com> wrote: >>> > > > > >>> > > > > > Hello everyone, >>> > > > > > >>> > > > > > I've a consumer that fetches messages from a topic, for each >>> > message >>> > > it >>> > > > > > makes an API call to our backend. To ensure that if a message >>> fails >>> > > it >>> > > > > > tries again to process the message I've set max.poll.records >>> to 1 >>> > and >>> > > > > I've >>> > > > > > a code like this: >>> > > > > > >>> > > > > > consumer.subscribe(arrayListOf("orders")) >>> > > > > > while (!stopProcessing.get()) { >>> > > > > > try { >>> > > > > > val records = consumer.poll(Duration.ofHours(1)) >>> > > > > > records.forEach { >>> > > > > > processRecord(it) >>> > > > > > } >>> > > > > > consumer.commitSync() >>> > > > > > } catch (e: Exception) { >>> > > > > > logger.error("Error processing order message", e) >>> > > > > > Sentry.capture(e) >>> > > > > > Thread.sleep(30000) >>> > > > > > } >>> > > > > > } >>> > > > > > >>> > > > > > now, if a request fails because the backend complains about a >>> > > duplicate >>> > > > > > primary ID, due to the nature of the error trying to insert the >>> > same >>> > > > > thing >>> > > > > > would generate that same error over and over again. >>> > > > > > Instead it seems that after some retries the message is >>> skipped and >>> > > it >>> > > > > goes >>> > > > > > one with the next one. >>> > > > > > What could be the reason? If in the next loop iteration it >>> gets a >>> > > > message >>> > > > > > from another partition it would also commit the offset of the >>> other >>> > > > > failed >>> > > > > > partition? >>> > > > > > >>> > > > > > Thank you >>> > > > > > >>> > > > > > -- >>> > > > > > Alessandro Tagliapietra >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> >>