[ https://issues.apache.org/jira/browse/CAMEL-20373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17811382#comment-17811382 ]
Arseniy Tashoyan commented on CAMEL-20373: ------------------------------------------ If the proposal is accepted, I can submit my implementation in a PR > KafkaIdempotentRepository may allow some duplicates after application restart > ----------------------------------------------------------------------------- > > Key: CAMEL-20373 > URL: https://issues.apache.org/jira/browse/CAMEL-20373 > Project: Camel > Issue Type: Bug > Components: camel-kafka > Affects Versions: 3.22.0, 4.2.0 > Reporter: Arseniy Tashoyan > Priority: Major > > Current implementation of _KafkaIdempotentRepository_ gets initialized as > follows (after CAMEL-20218 fixed): > - Run a separate thread: _TopicPoller_. The _TopicPoller_ executes the > _KafkaConsumer.poll()_ method in a while-loop to retrieve the cached keys > from the Kafka topic and to populate the local in-memory cache. > - _TopicPoller_ stops retrieving records from the Kafka topic when > _KafkaConsumer.poll()_ returns zero records. The empty output from _poll()_ > is considered as a flag, that all records are retrieved. > - The main thread waits for the _TopicPoller_ thread to finish, but no longer > than 30 seconds (the values is hardcoded). > This implementation allows partially initialized local cache due to the > following reasons: > 1. If _TopicPoller_ doesn't manage to consume all Kafka records in the topic > within 30-seconds interval. > 2. If _KafkaConsumer.poll()_ returns empty record set, despite it is not > reached the end of the Kafka topic (this is possible). > Hence we may have the situation, when after application restart, > _KafkaIdempotentRepository_ could not restore the local cache. Then the > consumer will re-consume already processed input. This will cause duplicates. > h3. Proposed implementation > - Remove asynchronous _TopicPoller_, retrieve all records from Kafka > synchronously in _KafkaIdempotentRepository.doStart()_ > - Read records from Kafka until end offsets are reached. Do not rely on the > condition "_poll()_ returns empty record set". > Pseudocode: > {code:java} > partitions = consumer.partitionsFor(topic) > consumer.assign(partitions) > consumer.seekToBeginning(partitions) > endOffsets = consumer.endOffsets(partitions) > while(!isReachedOffsets(consumer, endOffsets)) { > consumerRecords = consumer.poll() > addToLocalCache(consumerRecords) > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)