[ https://issues.apache.org/jira/browse/CAMEL-16181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17795447#comment-17795447 ]
Arseniy Tashoyan commented on CAMEL-16181: ------------------------------------------ Either your approach or use assign instead of subscribe and set the consumer position to the beginning > KafkaIdempotentRepository cache incorrectly flagged as ready > ------------------------------------------------------------ > > Key: CAMEL-16181 > URL: https://issues.apache.org/jira/browse/CAMEL-16181 > Project: Camel > Issue Type: Improvement > Components: camel-kafka > Affects Versions: 3.7.2 > Reporter: Javier Holguera > Priority: Major > Fix For: 3.7.3, 3.8.0 > > Attachments: kafka-idempotent-repository.log > > > The `KafkaIdempotentRepository` initialises its cache off the back of the > pre-existing Kafka topic with previous entries, with the following code: > > {code:java} > log.debug("Subscribing consumer to {}", topic); > consumer.subscribe(Collections.singleton(topic)); > log.debug("Seeking to beginning"); > consumer.seekToBeginning(consumer.assignment()); > > POLL_LOOP: while (running.get()) { > log.trace("Polling"); > ConsumerRecords<String, String> consumerRecords = > consumer.poll(pollDurationMs); > if (consumerRecords.isEmpty()) { > // the first time this happens, we can assume that we have consumed all > messages up to this point > log.trace("0 messages fetched on poll"); > if (cacheReadyLatch.getCount() > 0) { > log.debug("Cache warmed up"); > cacheReadyLatch.countDown(); > } > }{code} > > The problem with this code is: > # `consumer.subscribe` doesn't instantaneously assign partitions to the > consumer > # When `consumer.seekToBeginning` is called, the operation doesn't do > anything because it has no partitions yet (see [seekToBeginning doesn't work > without auto.offset.reset > (apache.org)|https://mail-archives.apache.org/mod_mbox/kafka-users/201603.mbox/%3ccakwx9vumpliqtu9o0mpepaupszapw9lm91mwexvafwktgd3...@mail.gmail.com%3e] > > # When later the first `consumer.poll` is issued, it returns nothing, > triggering the sequence to *confirm the cache as ready when it isn't yet*. > That can cause upstream messages not been correctly de-duplicated. > The solution is: > # Use a different overload of `consumer.subscribe` that accepts an > implementation of the `ConsumerRebalanceListener`. > # When partitions are assigned to the `consumer` instance, call > `seekToBeginning` there. > # Doing an initial `poll(0)` that will never return records but will force > the partition assignment process -- This message was sent by Atlassian Jira (v8.20.10#820010)