[ 
https://issues.apache.org/jira/browse/CAMEL-16181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793863#comment-17793863
 ] 

Andrea Cosentino commented on CAMEL-16181:
------------------------------------------

Can you try with 3.21.2? Also this is an LTS, but we are focusing on Camel 4. 
so it would be good to test the same scenario with Camel 4.0.3 and 4.2.0

> KafkaIdempotentRepository cache incorrectly flagged as ready
> ------------------------------------------------------------
>
>                 Key: CAMEL-16181
>                 URL: https://issues.apache.org/jira/browse/CAMEL-16181
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-kafka
>    Affects Versions: 3.7.2
>            Reporter: Javier Holguera
>            Priority: Major
>             Fix For: 3.7.3, 3.8.0
>
>         Attachments: kafka-idempotent-repository.log
>
>
> The `KafkaIdempotentRepository` initialises its cache off the back of the 
> pre-existing Kafka topic with previous entries, with the following code:
>  
> {code:java}
> log.debug("Subscribing consumer to {}", topic);             
> consumer.subscribe(Collections.singleton(topic));             
> log.debug("Seeking to beginning");             
> consumer.seekToBeginning(consumer.assignment());
>              
> POLL_LOOP: while (running.get()) {                 
>   log.trace("Polling");                 
>   ConsumerRecords<String, String> consumerRecords = 
> consumer.poll(pollDurationMs);                 
>   if (consumerRecords.isEmpty()) {                     
>     // the first time this happens, we can assume that we have consumed all 
> messages up to this point                     
>     log.trace("0 messages fetched on poll");                     
>     if (cacheReadyLatch.getCount() > 0) {                         
>       log.debug("Cache warmed up");                         
>       cacheReadyLatch.countDown();                     
>     }
>   }{code}
>  
> The problem with this code is:
>  # `consumer.subscribe` doesn't instantaneously assign partitions to the 
> consumer
>  # When `consumer.seekToBeginning` is called, the operation doesn't do 
> anything because it has no partitions yet (see [seekToBeginning doesn't work 
> without auto.offset.reset 
> (apache.org)|https://mail-archives.apache.org/mod_mbox/kafka-users/201603.mbox/%3ccakwx9vumpliqtu9o0mpepaupszapw9lm91mwexvafwktgd3...@mail.gmail.com%3e]
>  
>  # When later the first `consumer.poll` is issued, it returns nothing, 
> triggering the sequence to *confirm the cache as ready when it isn't yet*. 
> That can cause upstream messages not been correctly de-duplicated.  
> The solution is:
>  # Use a different overload of `consumer.subscribe` that accepts an 
> implementation of the `ConsumerRebalanceListener`.
>  # When partitions are assigned to the `consumer` instance, call 
> `seekToBeginning` there.
>  # Doing an initial `poll(0)` that will never return records but will force 
> the partition assignment process



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to