[ 
https://issues.apache.org/jira/browse/CAMEL-20373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arseniy Tashoyan updated CAMEL-20373:
-------------------------------------
    Description: 
Current implementation of _KafkaIdempotentRepository_ gets initialized as 
follows (after CAMEL-20218 fixed):

- Run a separate thread: TopicPoller. This TopicPoller executes the 
KafkaConsumer.poll() method in a while-loop to retrieve the cached keys from 
the Kafka topic and populate the local in-memory cache.
- TopicPoller stops retrieving records from the Kafka topic when 
KafkaConsumer.poll() returns zero records. The empty output from poll() is 
considered as a flag, that all records are retrieved.
- The main thread waits for the TopicPoller thread to finish, but no longer 
than 30 seconds (the values is hardcoded).

This implementation allows partially initialized local cache due to the 
following reasons:
1. If TopicPoller doesn't manage to consume all Kafka records within 30-seconds 
interval.
2. If KafkaConsumer.poll() returns empty record set, despite it is not reached 
the end of the Kafka topic (this is possible).

Hence we may have the situation, when after application restart, 
KafkaIdempotentRepository could not restore the local cache. Then the consumer 
will re-consume already processed input. This will cause duplicates.

h3. Proposed implementation

- Remove asynchronous TopicPoller, retrieve all records from Kafka 
synchronously in KafkaIdempotentRepository .doStart()
- Read records from Kafka until end offsets are reached. Do not rely on the 
condition "poll() returns empty record set".

Pseudocode:
{code:java}
partitions = consumer.partitionsFor(topic)
consumer.assign(partitions)
consumer.seekToBeginning(partitions)
endOffsets = consumer.endOffsets(partitions)
while(!isReachedOffsets(consumer, endOffsets)) {
  consumerRecords = consumer.poll()
  addToLocalCache(consumerRecords)
}
{code}


  was:
Current implementation KafkaIdempotentRepository gets initialized as follows 
(after CAMEL-20218 fixed):

- Run a separate thread: TopicPoller. This TopicPoller executes the 
KafkaConsumer.poll() method in a while-loop to retrieve the cached keys from 
the Kafka topic and populate the local in-memory cache.
- TopicPoller stops retrieving records from the Kafka topic when 
KafkaConsumer.poll() returns zero records. The empty output from poll() is 
considered as a flag, that all records are retrieved.
- The main thread waits for the TopicPoller thread to finish, but no longer 
than 30 seconds (the values is hardcoded).

This implementation allows partially initialized local cache due to the 
following reasons:
1. If TopicPoller doesn't manage to consume all Kafka records within 30-seconds 
interval.
2. If KafkaConsumer.poll() returns empty record set, despite it is not reached 
the end of the Kafka topic (this is possible).

Hence we may have the situation, when after application restart, 
KafkaIdempotentRepository could not restore the local cache. Then the consumer 
will re-consume already processed input. This will cause duplicates.

h3. Proposed implementation

- Remove asynchronous TopicPoller, retrieve all records from Kafka 
synchronously in KafkaIdempotentRepository .doStart()
- Read records from Kafka until end offsets are reached. Do not rely on the 
condition "poll() returns empty record set".

Pseudocode:
{code:java}
partitions = consumer.partitionsFor(topic)
consumer.assign(partitions)
consumer.seekToBeginning(partitions)
endOffsets = consumer.endOffsets(partitions)
while(!isReachedOffsets(consumer, endOffsets)) {
  consumerRecords = consumer.poll()
  addToLocalCache(consumerRecords)
}
{code}



> KafkaIdempotentRepository may allow some duplicates after application restart
> -----------------------------------------------------------------------------
>
>                 Key: CAMEL-20373
>                 URL: https://issues.apache.org/jira/browse/CAMEL-20373
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.22.0, 4.2.0
>            Reporter: Arseniy Tashoyan
>            Priority: Major
>
> Current implementation of _KafkaIdempotentRepository_ gets initialized as 
> follows (after CAMEL-20218 fixed):
> - Run a separate thread: TopicPoller. This TopicPoller executes the 
> KafkaConsumer.poll() method in a while-loop to retrieve the cached keys from 
> the Kafka topic and populate the local in-memory cache.
> - TopicPoller stops retrieving records from the Kafka topic when 
> KafkaConsumer.poll() returns zero records. The empty output from poll() is 
> considered as a flag, that all records are retrieved.
> - The main thread waits for the TopicPoller thread to finish, but no longer 
> than 30 seconds (the values is hardcoded).
> This implementation allows partially initialized local cache due to the 
> following reasons:
> 1. If TopicPoller doesn't manage to consume all Kafka records within 
> 30-seconds interval.
> 2. If KafkaConsumer.poll() returns empty record set, despite it is not 
> reached the end of the Kafka topic (this is possible).
> Hence we may have the situation, when after application restart, 
> KafkaIdempotentRepository could not restore the local cache. Then the 
> consumer will re-consume already processed input. This will cause duplicates.
> h3. Proposed implementation
> - Remove asynchronous TopicPoller, retrieve all records from Kafka 
> synchronously in KafkaIdempotentRepository .doStart()
> - Read records from Kafka until end offsets are reached. Do not rely on the 
> condition "poll() returns empty record set".
> Pseudocode:
> {code:java}
> partitions = consumer.partitionsFor(topic)
> consumer.assign(partitions)
> consumer.seekToBeginning(partitions)
> endOffsets = consumer.endOffsets(partitions)
> while(!isReachedOffsets(consumer, endOffsets)) {
>   consumerRecords = consumer.poll()
>   addToLocalCache(consumerRecords)
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to