Hello everyone,

I've a consumer that fetches messages from a topic, for each message it
makes an API call to our backend. To ensure that if a message fails it
tries again to process the message I've set max.poll.records to 1 and I've
a code like this:

  consumer.subscribe(arrayListOf("orders"))
  while (!stopProcessing.get()) {
      try {
          val records = consumer.poll(Duration.ofHours(1))
          records.forEach {
              processRecord(it)
          }
          consumer.commitSync()
      } catch (e: Exception) {
          logger.error("Error processing order message", e)
          Sentry.capture(e)
          Thread.sleep(30000)
      }
  }

now, if a request fails because the backend complains about a duplicate
primary ID, due to the nature of the error trying to insert the same thing
would generate that same error over and over again.
Instead it seems that after some retries the message is skipped and it goes
one with the next one.
What could be the reason? If in the next loop iteration it gets a message
from another partition it would also commit the offset of the other failed
partition?

Thank you

--
Alessandro Tagliapietra

Reply via email to