It's still is in the topic because is weeks after the deletion threshold (today's message with a 4 weeks retention). So I assume the consumer just moves on to the next one. As a test I've created this test script https://gist.github.com/alex88/85ba5c3288a531a107ed6a22751c1088
After running this I get in the logs: Received message on key 2 with value 1 Received message on key 2 with value 2 Received message on key 2 with value 3 Error Processing message java.lang.Exception: Thrown error on key 2 with value 3 Received message on key 2 with value 4 Received message on key 2 with value 5 Received message on key 2 with value 6 Received message on key 2 with value 7 Received message on key 2 with value 8 Received message on key 2 with value 9 Received message on key 2 with value 10 Error Processing message java.lang.Exception: Thrown error on key 2 with value 10 Received message on key 2 with value 11 Received message on key 2 with value 12 Received message on key 2 with value 13 Received message on key 2 with value 14 Received message on key 2 with value 15 Received message on key 2 with value 16 Error Processing message java.lang.Exception: Thrown error on key 2 with value 16 it seems that the offset just automatically increases, with another client I also see all the values in the topic so they're not deleted. Shouldn't it have retried the message with value 3? -- Alessandro Tagliapietra On Wed, Sep 25, 2019 at 6:19 PM Steve Howard <steve.how...@confluent.io> wrote: > "I'm just saying that a message which processing throws an exception is > gone within minutes." > > Is the message no longer in the topic or is your consumer group current > offset just higher than the offset of the message in question? > > On Wed, Sep 25, 2019 at 7:38 PM Alessandro Tagliapietra < > tagliapietra.alessan...@gmail.com> wrote: > > > You mean the retention time of the topic? That's one month > > I'm just saying that a message which processing throws an exception is > gone > > within minutes. > > > > I will handle duplicates better later, if we can't be sure that we don't > > skip/lose these messages then it's useless to use kafka. > > That's why I'm thinking if there's something wrong with my code. > > > > -- > > Alessandro Tagliapietra > > > > > > On Wed, Sep 25, 2019 at 4:29 PM M. Manna <manme...@gmail.com> wrote: > > > > > How long is your message retention set for ? Perhaps you want to > increase > > > that to a large enough value. > > > > > > I have almost identical use case, but I would strongly recommend that > you > > > handle duplicates as they are due to your process ( not Kafka > duplicate). > > > > > > Regards, > > > > > > On Wed, 25 Sep 2019 at 22:37, Alessandro Tagliapietra < > > > tagliapietra.alessan...@gmail.com> wrote: > > > > > > > I've disabled the auto commit, so what I thought that code would do > is: > > > > > > > > - it fetches a message > > > > - it processes a message > > > > - if everything went fine it commits its offset > > > > - if there's an exception, it didn't commit, so after the error it > > would > > > > just poll again and get the same message over and over > > > > > > > > instead it seems that message has been skipped somehow. > > > > > > > > In this case what the processing is doing is: > > > > - opening new orders (with a start datetime and an uuid generated by > > the > > > > stream producing to the topic this consumer is consuming from) > > > > - closing previously created orders (by setting an end datetime > using > > > the > > > > ID stored in a store) > > > > > > > > And so far I've seen multiple exceptions like updating a non existent > > > order > > > > (meaning the create message didn't complete the request and it has > been > > > > skipped) or create with same ID (meaning that the backend returned an > > > error > > > > but correctly created the record on the database). > > > > I would expect that the above code wouldn't just commit somehow and > the > > > > consumer would be stuck there indefinitely. > > > > > > > > Regarding ignoring/handling duplicates that's definitely something > that > > > > will be done, but right now i'm implementing it this way so that I > can > > > > confirm that I don't lose messages if I don't manually commit them > > which > > > > doesn't seem to be the case > > > > > > > > Any help is really appreciated > > > > > > > > -- > > > > Alessandro Tagliapietra > > > > > > > > On Wed, Sep 25, 2019 at 2:20 PM M. Manna <manme...@gmail.com> wrote: > > > > > > > > > Hi, > > > > > > > > > > How are you managing your offset commits ? > > > > > > > > > > Also, if it’s a duplicate record issue ( sounds like database > entry > > to > > > > > me), have you thought about ignoring/handling duplicates? > > > > > > > > > > Thanks, > > > > > > > > > > On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra < > > > > > tagliapietra.alessan...@gmail.com> wrote: > > > > > > > > > > > Hello everyone, > > > > > > > > > > > > I've a consumer that fetches messages from a topic, for each > > message > > > it > > > > > > makes an API call to our backend. To ensure that if a message > fails > > > it > > > > > > tries again to process the message I've set max.poll.records to 1 > > and > > > > > I've > > > > > > a code like this: > > > > > > > > > > > > consumer.subscribe(arrayListOf("orders")) > > > > > > while (!stopProcessing.get()) { > > > > > > try { > > > > > > val records = consumer.poll(Duration.ofHours(1)) > > > > > > records.forEach { > > > > > > processRecord(it) > > > > > > } > > > > > > consumer.commitSync() > > > > > > } catch (e: Exception) { > > > > > > logger.error("Error processing order message", e) > > > > > > Sentry.capture(e) > > > > > > Thread.sleep(30000) > > > > > > } > > > > > > } > > > > > > > > > > > > now, if a request fails because the backend complains about a > > > duplicate > > > > > > primary ID, due to the nature of the error trying to insert the > > same > > > > > thing > > > > > > would generate that same error over and over again. > > > > > > Instead it seems that after some retries the message is skipped > and > > > it > > > > > goes > > > > > > one with the next one. > > > > > > What could be the reason? If in the next loop iteration it gets a > > > > message > > > > > > from another partition it would also commit the offset of the > other > > > > > failed > > > > > > partition? > > > > > > > > > > > > Thank you > > > > > > > > > > > > -- > > > > > > Alessandro Tagliapietra > > > > > > > > > > > > > > > > > > > > >