You mean the retention time of the topic? That's one month I'm just saying that a message which processing throws an exception is gone within minutes.
I will handle duplicates better later, if we can't be sure that we don't skip/lose these messages then it's useless to use kafka. That's why I'm thinking if there's something wrong with my code. -- Alessandro Tagliapietra On Wed, Sep 25, 2019 at 4:29 PM M. Manna <manme...@gmail.com> wrote: > How long is your message retention set for ? Perhaps you want to increase > that to a large enough value. > > I have almost identical use case, but I would strongly recommend that you > handle duplicates as they are due to your process ( not Kafka duplicate). > > Regards, > > On Wed, 25 Sep 2019 at 22:37, Alessandro Tagliapietra < > tagliapietra.alessan...@gmail.com> wrote: > > > I've disabled the auto commit, so what I thought that code would do is: > > > > - it fetches a message > > - it processes a message > > - if everything went fine it commits its offset > > - if there's an exception, it didn't commit, so after the error it would > > just poll again and get the same message over and over > > > > instead it seems that message has been skipped somehow. > > > > In this case what the processing is doing is: > > - opening new orders (with a start datetime and an uuid generated by the > > stream producing to the topic this consumer is consuming from) > > - closing previously created orders (by setting an end datetime using > the > > ID stored in a store) > > > > And so far I've seen multiple exceptions like updating a non existent > order > > (meaning the create message didn't complete the request and it has been > > skipped) or create with same ID (meaning that the backend returned an > error > > but correctly created the record on the database). > > I would expect that the above code wouldn't just commit somehow and the > > consumer would be stuck there indefinitely. > > > > Regarding ignoring/handling duplicates that's definitely something that > > will be done, but right now i'm implementing it this way so that I can > > confirm that I don't lose messages if I don't manually commit them which > > doesn't seem to be the case > > > > Any help is really appreciated > > > > -- > > Alessandro Tagliapietra > > > > On Wed, Sep 25, 2019 at 2:20 PM M. Manna <manme...@gmail.com> wrote: > > > > > Hi, > > > > > > How are you managing your offset commits ? > > > > > > Also, if it’s a duplicate record issue ( sounds like database entry to > > > me), have you thought about ignoring/handling duplicates? > > > > > > Thanks, > > > > > > On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra < > > > tagliapietra.alessan...@gmail.com> wrote: > > > > > > > Hello everyone, > > > > > > > > I've a consumer that fetches messages from a topic, for each message > it > > > > makes an API call to our backend. To ensure that if a message fails > it > > > > tries again to process the message I've set max.poll.records to 1 and > > > I've > > > > a code like this: > > > > > > > > consumer.subscribe(arrayListOf("orders")) > > > > while (!stopProcessing.get()) { > > > > try { > > > > val records = consumer.poll(Duration.ofHours(1)) > > > > records.forEach { > > > > processRecord(it) > > > > } > > > > consumer.commitSync() > > > > } catch (e: Exception) { > > > > logger.error("Error processing order message", e) > > > > Sentry.capture(e) > > > > Thread.sleep(30000) > > > > } > > > > } > > > > > > > > now, if a request fails because the backend complains about a > duplicate > > > > primary ID, due to the nature of the error trying to insert the same > > > thing > > > > would generate that same error over and over again. > > > > Instead it seems that after some retries the message is skipped and > it > > > goes > > > > one with the next one. > > > > What could be the reason? If in the next loop iteration it gets a > > message > > > > from another partition it would also commit the offset of the other > > > failed > > > > partition? > > > > > > > > Thank you > > > > > > > > -- > > > > Alessandro Tagliapietra > > > > > > > > > >