[ 
https://issues.apache.org/jira/browse/CAMEL-19972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Otavio Rodolfo Piske reopened CAMEL-19972:
------------------------------------------
      Assignee: Otavio Rodolfo Piske

> camel-kafka manual commit configuration inconsistency
> -----------------------------------------------------
>
>                 Key: CAMEL-19972
>                 URL: https://issues.apache.org/jira/browse/CAMEL-19972
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.18.5, 3.21.1
>            Reporter: Sami Peltola
>            Assignee: Otavio Rodolfo Piske
>            Priority: Minor
>              Labels: help-wanted
>
> h2. Expected behavior
>  * Should not matter if manual commits are enabled component level or 
> endpoint level, consumer should not commit automatically when 
> autoCommitEnable is set to false
> h2. Actual behavior
> The behavior for manual commits is inconsistent at the moment (Test on Camel 
> versions 3.18.5 and 3.21.1), depending on whether the KafkaComponent is 
> configured on component or endpoint level
>  * If configured on component level, will still automatically commit offsets 
> back to broker (after records from a partition have been processed)
>  * If configured on endpoint level, will never automatically commit offsets
>  * autoCommitEnable is
> h3. Configuration of manual commit for Kafka client during startup
> During startup for Kafka component, Camel context will go through the 
> following classes and methods while setting up manual commit configuration.
> h4. KafkaComponent
> [https://github.com/apache/camel/blob/camel-3.18.5/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java#L222]
> *Class:* org.apache.camel.component.kafka.KafkaComponent
> *Method:* doInit
>  
>  * When the component is being initialized, will check if 
> configuration.isAllowManualCommit() is true AND kafkaManualCommitFactory is 
> null
>  ** If they are, will initialize kafkaManualCommitFactory as 
> DefaultKafkaManualCommitFactory
>  ** If not, will leave kafkaManualCommitFactory as null
>  
> h4. CommitManagers
> [https://github.com/apache/camel/blob/camel-3.18.5/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java#L32]
> *Class:* org.apache.camel.component.kafka.consumer.CommitManagers
> *Method:* createCommitManager
>  * Called when initializing Consumer
>  ** For clarity's sake, omitting Async-related logic
>  ** If configuration.isAllowManualCommit() is true AND manualCommitFactory is 
> instance of DefaultKafkaManualCommitFactory
>  *** Will initialize SyncCommitManager
>  *** Otherwise, will initialize NoopCommitManager
> h4. Summary
>  * Configuring manual commits on component level still results in automatic 
> commits when the changing partitions (or all records processed) since 
> *SyncCommitManager* is configured
>  ** allowManualCommit is true when calling *doInit* -> initializes 
> kafkaManualCommitFactory as DefaultKafkaManualCommitFactory -> initilize 
> commitManager as SyncCommitManager
>  * If allowManualCommit is only configured on endpoint level, it will be 
> false when invoking {*}doInit{*}, therefore leaving kafkaManualCommitFactory 
> as null, resulting in NoopCommitManager in *createCommitManager*
>  
> h2. Workaround
> * Configure manual commits on endpoint level



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to