consumer offset commit can be controlled automatically or manually. What you are mentioning here is your own application problems. As I previously recommended, you should not hold your application in infinite loop just to process these failed messages.
Also, I would certainly not recommend stopping at a specific offset forever as you are saying. Kafka is a service which delivers you the messages (with consistency and speed). What/how of the message is entirely up to you. Using auto.offset.reset you have a choice to start either from the beginning, or the latest. Thanks, On Thu, 26 Sep 2019 at 04:14, Alessandro Tagliapietra < tagliapietra.alessan...@gmail.com> wrote: > Ok I think I've found the problem > > looking at > > https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration- > it > says: > > On each poll, consumer will try to use the last consumed offset as the > > starting offset and fetch sequentially. The last consumed offset can be > > manually set through seek(TopicPartition, long) or automatically set as > the > > last committed offset for the subscribed list of partitions > > > so the part where it says "automatically set as the last committed offset" > happens only on consumer start, when not restarting the consumer, what is > happening is "consumer will try to use the last consumed offset" meaning > that it'll use the last consumed offset no matter if you've committed it or > not. > So adapting my code to: > > while (true) { > > val partitionOffsetMap = mutableMapOf<TopicPartition, > > OffsetAndMetadata>() > > try { > > val records = consumer.poll(Duration.ofHours(1)) > > records.forEach { > > partitionOffsetMap[TopicPartition(it.topic(), > > it.partition())] = OffsetAndMetadata(it.offset() + 1) > > println("Received message on key ${it.key()} with value > > ${it.value()} and offset ${it.offset()}") > > if (Random.nextInt(0, 100) > 80) { > > throw Exception("Thrown error on key ${it.key()} with > > value ${it.value()}") > > } > > } > > consumer.commitSync(partitionOffsetMap) > > println("Committed offset > > ${partitionOffsetMap.values.first().offset()} on partition > > ${partitionOffsetMap.keys.first().partition()}") > > } catch (e: Throwable) { > > println("Error Processing message, resetting offset $e") > > consumer.seek(partitionOffsetMap.keys.first(), > > partitionOffsetMap.values.first().offset() - 1) > > Thread.sleep(5000) > > } > > } > > > makes everything work fine: > > Received message on key 1 with value 1 and offset 0 > > Committed offset 1 on partition 0 > > Received message on key 1 with value 2 and offset 1 > > Committed offset 2 on partition 0 > > Received message on key 1 with value 3 and offset 2 > > Committed offset 3 on partition 0 > > Received message on key 1 with value 4 and offset 3 > > Error Processing message, resetting offset java.lang.Exception: Thrown > > error on key 1 with value 4 > > [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer > > clientId=consumer-1, groupId=test-group-id] Seeking to offset 3 for > > partition topic-0 > > Received message on key 1 with value 4 and offset 3 > > Error Processing message, resetting offset java.lang.Exception: Thrown > > error on key 1 with value 4 > > [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer > > clientId=consumer-1, groupId=test-group-id] Seeking to offset 3 for > > partition topic-0 > > Received message on key 1 with value 4 and offset 3 > > Committed offset 4 on partition 0 > > ....... > > Committed offset 19 on partition 0 > > Received message on key 1 with value 20 and offset 19 > > Error Processing message, resetting offset java.lang.Exception: Thrown > > error on key 1 with value 20 > > [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer > > clientId=consumer-1, groupId=test-group-id] Seeking to offset 19 for > > partition topic-0 > > --------- RESTART --------- > > Received message on key 1 with value 20 and offset 19 > > Committed offset 20 on partition 0 > > Received message on key 1 with value 21 and offset 20 > > Committed offset 21 on partition 0 > > ...... > > > As you can see if an exception is caught it resets the offset to the same > value before the consume, otherwise it commits the next offset. > In fact, you can see that value 4 is reprocessed multiple times without a > restart and value 20 which throws and exception is reprocessed after a > restart because nothing has been committed. > > Now, kafka gurus, is this the best way to achieve this? Wouldn't be better > to have a config like "enable.auto.advance.consume.offset" that disables > the implicit behavior and gets advanced automatically on consume if it's > true or on commit if it's false? > Or is there already some config I've missed? > > Thanks > > -- > Alessandro Tagliapietra > > > On Wed, Sep 25, 2019 at 7:55 PM Alessandro Tagliapietra < > tagliapietra.alessan...@gmail.com> wrote: > > > The only way this works is if I don't catch the exception, let the > > consumer crash and fully restart it. > > > > Maybe the consumer has an internal state that always gets updated when it > > receives a message during poll? > > > > -- > > Alessandro Tagliapietra > > > > > > > > On Wed, Sep 25, 2019 at 7:37 PM Alessandro Tagliapietra < > > tagliapietra.alessan...@gmail.com> wrote: > > > >> If I change my code slightly trying to manually commit offsets at > >> partition level > >> > >> while (true) { > >> try { > >> val records = consumer.poll(Duration.ofHours(1)) > >> val partitionOffsetMap = mutableMapOf<TopicPartition, > >> OffsetAndMetadata>() > >> records.forEach { > >> partitionOffsetMap[TopicPartition(it.topic(), > >> it.partition())] = OffsetAndMetadata(it.offset()) > >> println("Received message on key ${it.key()} with value > >> ${it.value()} and offset ${it.offset()}") > >> if (Random.nextInt(0, 100) > 80) { > >> throw Exception("Thrown error on key ${it.key()} > with > >> value ${it.value()}") > >> } > >> } > >> consumer.commitSync(partitionOffsetMap) > >> println("Committed offset > >> ${partitionOffsetMap.values.first().offset()} on partition > >> ${partitionOffsetMap.keys.first().partition()}") > >> } catch (e: Throwable) { > >> println("Error Processing message $e") > >> Thread.sleep(5000) > >> } > >> } > >> > >> it logs this > >> > >> Received message on key 2 with value 93 and offset 92 > >> Committed offset 92 on partition 2 > >> Received message on key 2 with value 94 and offset 93 > >> Error Processing message java.lang.Exception: Thrown error on key 2 with > >> value 94 > >> Received message on key 2 with value 95 and offset 94 > >> Error Processing message java.lang.Exception: Thrown error on key 2 with > >> value 95 > >> Received message on key 2 with value 96 and offset 95 > >> Committed offset 95 on partition 2 > >> Received message on key 2 with value 97 and offset 96 > >> Committed offset 96 on partition 2 > >> Received message on key 2 with value 98 and offset 97 > >> Error Processing message java.lang.Exception: Thrown error on key 2 with > >> value 98 > >> > >> as you can see the offset increases even without logging the "Committed > >> offset ...." part > >> > >> -- > >> Alessandro Tagliapietra > >> > >> > >> On Wed, Sep 25, 2019 at 7:09 PM Alessandro Tagliapietra < > >> tagliapietra.alessan...@gmail.com> wrote: > >> > >>> It's still is in the topic because is weeks after the deletion > threshold > >>> (today's message with a 4 weeks retention). > >>> So I assume the consumer just moves on to the next one. > >>> As a test I've created this test script > >>> https://gist.github.com/alex88/85ba5c3288a531a107ed6a22751c1088 > >>> > >>> After running this I get in the logs: > >>> > >>> Received message on key 2 with value 1 > >>> Received message on key 2 with value 2 > >>> Received message on key 2 with value 3 > >>> Error Processing message java.lang.Exception: Thrown error on key 2 > with > >>> value 3 > >>> Received message on key 2 with value 4 > >>> Received message on key 2 with value 5 > >>> Received message on key 2 with value 6 > >>> Received message on key 2 with value 7 > >>> Received message on key 2 with value 8 > >>> Received message on key 2 with value 9 > >>> Received message on key 2 with value 10 > >>> Error Processing message java.lang.Exception: Thrown error on key 2 > with > >>> value 10 > >>> Received message on key 2 with value 11 > >>> Received message on key 2 with value 12 > >>> Received message on key 2 with value 13 > >>> Received message on key 2 with value 14 > >>> Received message on key 2 with value 15 > >>> Received message on key 2 with value 16 > >>> Error Processing message java.lang.Exception: Thrown error on key 2 > with > >>> value 16 > >>> > >>> it seems that the offset just automatically increases, with another > >>> client I also see all the values in the topic so they're not deleted. > >>> > >>> Shouldn't it have retried the message with value 3? > >>> > >>> -- > >>> Alessandro Tagliapietra > >>> > >>> > >>> On Wed, Sep 25, 2019 at 6:19 PM Steve Howard < > steve.how...@confluent.io> > >>> wrote: > >>> > >>>> "I'm just saying that a message which processing throws an exception > is > >>>> gone within minutes." > >>>> > >>>> Is the message no longer in the topic or is your consumer group > current > >>>> offset just higher than the offset of the message in question? > >>>> > >>>> On Wed, Sep 25, 2019 at 7:38 PM Alessandro Tagliapietra < > >>>> tagliapietra.alessan...@gmail.com> wrote: > >>>> > >>>> > You mean the retention time of the topic? That's one month > >>>> > I'm just saying that a message which processing throws an exception > >>>> is gone > >>>> > within minutes. > >>>> > > >>>> > I will handle duplicates better later, if we can't be sure that we > >>>> don't > >>>> > skip/lose these messages then it's useless to use kafka. > >>>> > That's why I'm thinking if there's something wrong with my code. > >>>> > > >>>> > -- > >>>> > Alessandro Tagliapietra > >>>> > > >>>> > > >>>> > On Wed, Sep 25, 2019 at 4:29 PM M. Manna <manme...@gmail.com> > wrote: > >>>> > > >>>> > > How long is your message retention set for ? Perhaps you want to > >>>> increase > >>>> > > that to a large enough value. > >>>> > > > >>>> > > I have almost identical use case, but I would strongly recommend > >>>> that you > >>>> > > handle duplicates as they are due to your process ( not Kafka > >>>> duplicate). > >>>> > > > >>>> > > Regards, > >>>> > > > >>>> > > On Wed, 25 Sep 2019 at 22:37, Alessandro Tagliapietra < > >>>> > > tagliapietra.alessan...@gmail.com> wrote: > >>>> > > > >>>> > > > I've disabled the auto commit, so what I thought that code would > >>>> do is: > >>>> > > > > >>>> > > > - it fetches a message > >>>> > > > - it processes a message > >>>> > > > - if everything went fine it commits its offset > >>>> > > > - if there's an exception, it didn't commit, so after the error > >>>> it > >>>> > would > >>>> > > > just poll again and get the same message over and over > >>>> > > > > >>>> > > > instead it seems that message has been skipped somehow. > >>>> > > > > >>>> > > > In this case what the processing is doing is: > >>>> > > > - opening new orders (with a start datetime and an uuid > >>>> generated by > >>>> > the > >>>> > > > stream producing to the topic this consumer is consuming from) > >>>> > > > - closing previously created orders (by setting an end datetime > >>>> using > >>>> > > the > >>>> > > > ID stored in a store) > >>>> > > > > >>>> > > > And so far I've seen multiple exceptions like updating a non > >>>> existent > >>>> > > order > >>>> > > > (meaning the create message didn't complete the request and it > >>>> has been > >>>> > > > skipped) or create with same ID (meaning that the backend > >>>> returned an > >>>> > > error > >>>> > > > but correctly created the record on the database). > >>>> > > > I would expect that the above code wouldn't just commit somehow > >>>> and the > >>>> > > > consumer would be stuck there indefinitely. > >>>> > > > > >>>> > > > Regarding ignoring/handling duplicates that's definitely > >>>> something that > >>>> > > > will be done, but right now i'm implementing it this way so that > >>>> I can > >>>> > > > confirm that I don't lose messages if I don't manually commit > them > >>>> > which > >>>> > > > doesn't seem to be the case > >>>> > > > > >>>> > > > Any help is really appreciated > >>>> > > > > >>>> > > > -- > >>>> > > > Alessandro Tagliapietra > >>>> > > > > >>>> > > > On Wed, Sep 25, 2019 at 2:20 PM M. Manna <manme...@gmail.com> > >>>> wrote: > >>>> > > > > >>>> > > > > Hi, > >>>> > > > > > >>>> > > > > How are you managing your offset commits ? > >>>> > > > > > >>>> > > > > Also, if it’s a duplicate record issue ( sounds like database > >>>> entry > >>>> > to > >>>> > > > > me), have you thought about ignoring/handling duplicates? > >>>> > > > > > >>>> > > > > Thanks, > >>>> > > > > > >>>> > > > > On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra < > >>>> > > > > tagliapietra.alessan...@gmail.com> wrote: > >>>> > > > > > >>>> > > > > > Hello everyone, > >>>> > > > > > > >>>> > > > > > I've a consumer that fetches messages from a topic, for each > >>>> > message > >>>> > > it > >>>> > > > > > makes an API call to our backend. To ensure that if a > message > >>>> fails > >>>> > > it > >>>> > > > > > tries again to process the message I've set max.poll.records > >>>> to 1 > >>>> > and > >>>> > > > > I've > >>>> > > > > > a code like this: > >>>> > > > > > > >>>> > > > > > consumer.subscribe(arrayListOf("orders")) > >>>> > > > > > while (!stopProcessing.get()) { > >>>> > > > > > try { > >>>> > > > > > val records = consumer.poll(Duration.ofHours(1)) > >>>> > > > > > records.forEach { > >>>> > > > > > processRecord(it) > >>>> > > > > > } > >>>> > > > > > consumer.commitSync() > >>>> > > > > > } catch (e: Exception) { > >>>> > > > > > logger.error("Error processing order message", e) > >>>> > > > > > Sentry.capture(e) > >>>> > > > > > Thread.sleep(30000) > >>>> > > > > > } > >>>> > > > > > } > >>>> > > > > > > >>>> > > > > > now, if a request fails because the backend complains about > a > >>>> > > duplicate > >>>> > > > > > primary ID, due to the nature of the error trying to insert > >>>> the > >>>> > same > >>>> > > > > thing > >>>> > > > > > would generate that same error over and over again. > >>>> > > > > > Instead it seems that after some retries the message is > >>>> skipped and > >>>> > > it > >>>> > > > > goes > >>>> > > > > > one with the next one. > >>>> > > > > > What could be the reason? If in the next loop iteration it > >>>> gets a > >>>> > > > message > >>>> > > > > > from another partition it would also commit the offset of > the > >>>> other > >>>> > > > > failed > >>>> > > > > > partition? > >>>> > > > > > > >>>> > > > > > Thank you > >>>> > > > > > > >>>> > > > > > -- > >>>> > > > > > Alessandro Tagliapietra > >>>> > > > > > > >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>> >