[ 
https://issues.apache.org/jira/browse/CAMEL-20373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17811382#comment-17811382
 ] 

Arseniy Tashoyan commented on CAMEL-20373:
------------------------------------------

If the proposal is accepted, I can submit my implementation in a PR

> KafkaIdempotentRepository may allow some duplicates after application restart
> -----------------------------------------------------------------------------
>
>                 Key: CAMEL-20373
>                 URL: https://issues.apache.org/jira/browse/CAMEL-20373
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.22.0, 4.2.0
>            Reporter: Arseniy Tashoyan
>            Priority: Major
>
> Current implementation of _KafkaIdempotentRepository_ gets initialized as 
> follows (after CAMEL-20218 fixed):
> - Run a separate thread: _TopicPoller_. The _TopicPoller_ executes the 
> _KafkaConsumer.poll()_ method in a while-loop to retrieve the cached keys 
> from the Kafka topic and to populate the local in-memory cache.
> - _TopicPoller_ stops retrieving records from the Kafka topic when 
> _KafkaConsumer.poll()_ returns zero records. The empty output from _poll()_ 
> is considered as a flag, that all records are retrieved.
> - The main thread waits for the _TopicPoller_ thread to finish, but no longer 
> than 30 seconds (the values is hardcoded).
> This implementation allows partially initialized local cache due to the 
> following reasons:
> 1. If _TopicPoller_ doesn't manage to consume all Kafka records in the topic 
> within 30-seconds interval.
> 2. If _KafkaConsumer.poll()_ returns empty record set, despite it is not 
> reached the end of the Kafka topic (this is possible).
> Hence we may have the situation, when after application restart, 
> _KafkaIdempotentRepository_ could not restore the local cache. Then the 
> consumer will re-consume already processed input. This will cause duplicates.
> h3. Proposed implementation
> - Remove asynchronous _TopicPoller_, retrieve all records from Kafka 
> synchronously in _KafkaIdempotentRepository.doStart()_
> - Read records from Kafka until end offsets are reached. Do not rely on the 
> condition "_poll()_ returns empty record set".
> Pseudocode:
> {code:java}
> partitions = consumer.partitionsFor(topic)
> consumer.assign(partitions)
> consumer.seekToBeginning(partitions)
> endOffsets = consumer.endOffsets(partitions)
> while(!isReachedOffsets(consumer, endOffsets)) {
>   consumerRecords = consumer.poll()
>   addToLocalCache(consumerRecords)
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to