[ 
https://issues.apache.org/jira/browse/CAMEL-19972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sami Peltola updated CAMEL-19972:
---------------------------------
    Description: 
h2. Expected behavior
* Should not matter if manual commits are enabled component level or endpoint 
level, consumer should not commit automatically when autoCommitEnable is set to 
false

h2. Actual behavior

The behavior for manual commits is inconsistent at the moment (Test on Camel 
versions 3.18.5 and 3.21.1),  depending on whether the KafkaComponent is 
configured on component or endpoint level
* If configured on component level, will still automatically commit offsets 
back to broker (after records from a partition have been processed)
* If configured on endpoint level, will never automatically commit offsets
* autoCommitEnable is 


h3. Configuration of manual commit for Kafka client during startup

During startup for Kafka component, Camel context will go through the following 
classes and methods while setting up manual commit configuration.
h4. KafkaComponent

[https://github.com/apache/camel/blob/camel-3.18.5/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java#L222]

*Class:* org.apache.camel.component.kafka.KafkaComponent

*Method:* doInit
 
 * When the component is being initialized, will check if 
configuration.isAllowManualCommit() is true AND kafkaManualCommitFactory is null
 ** If they are, will initialize kafkaManualCommitFactory as 
DefaultKafkaManualCommitFactory
 ** If not, will leave kafkaManualCommitFactory as null

 
h4. CommitManagers

[https://github.com/apache/camel/blob/camel-3.18.5/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java#L32]

*Class:* org.apache.camel.component.kafka.consumer.CommitManagers

*Method:* createCommitManager
* Called when initializing Consumer
** For clarity's sake, omitting Async-related logic
** If configuration.isAllowManualCommit() is true AND manualCommitFactory is 
instance of DefaultKafkaManualCommitFactory
*** Will initialize SyncCommitManager
*** Otherwise, will initialize NoopCommitManager

h4. Summary

* Configuring manual commits on component level still results in automatic 
commits when the changing partitions (or all records processed) since 
*SyncCommitManager* is configured
** allowManualCommit is true when calling *doInit* -> initializes 
kafkaManualCommitFactory as DefaultKafkaManualCommitFactory -> initilize 
commitManager as SyncCommitManager
* If allowManualCommit is only configured on endpoint level, it will be false 
when invoking *doInit*, therefore leaving kafkaManualCommitFactory as null, 
resulting in NoopCommitManager in *createCommitManager*

* Configure manual commits on endpoint level


  was:
h2. Expected behavior
* Should not matter if manual commits are enabled component level or endpoint 
level, consumer should not commit automatically when autoCommitEnable is set to 
false

h2. Actual behavior

The behavior for manual commits is inconsistent at the moment (Test on Camel 
versions 3.18.5 and 3.21.1),  depending on whether the KafkaComponent is 
configured on component or endpoint level
* If configured on component level, will still automatically commit offsets 
back to broker (after records from a partition have been processed)
* If configured on endpoint level, will never automatically commit offsets
* autoCommitEnable is 


h3. Configuration of manual commit for Kafka client during startup

During startup for Kafka component, Camel context will go through the following 
classes and methods while setting up manual commit configuration.
h4. KafkaComponent

[https://github.com/apache/camel/blob/camel-3.18.5/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java#L222]

*Class:* org.apache.camel.component.kafka.KafkaComponent

*Method:* doInit
 
 * When the component is being initialized, will check if 
configuration.isAllowManualCommit() is true AND kafkaManualCommitFactory is null
 ** If they are, will initialize kafkaManualCommitFactory as 
DefaultKafkaManualCommitFactory
 ** If not, will leave kafkaManualCommitFactory as null

 
h4. CommitManagers

[https://github.com/apache/camel/blob/camel-3.18.5/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java#L32]

*Class:* org.apache.camel.component.kafka.consumer.CommitManagers

*Method:* createCommitManager
* Called when initializing Consumer
** For clarity's sake, omitting Async-related logic
** If configuration.isAllowManualCommit() is true AND manualCommitFactory is 
instance of DefaultKafkaManualCommitFactory
*** Will initialize SyncCommitManager
*** Otherwise, will initialize NoopCommitManager

h4. Summary

* Configuring manual commits on component level still results in automatic 
commits when the changing partitions (or all records processed) since 
*SyncCommitManager* is configured


h2. Workaround

* Configure manual commits on endpoint level



> camel-kafka manual commit configuration inconsistency
> -----------------------------------------------------
>
>                 Key: CAMEL-19972
>                 URL: https://issues.apache.org/jira/browse/CAMEL-19972
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.18.5, 3.21.1
>            Reporter: Sami Peltola
>            Priority: Minor
>
> h2. Expected behavior
> * Should not matter if manual commits are enabled component level or endpoint 
> level, consumer should not commit automatically when autoCommitEnable is set 
> to false
> h2. Actual behavior
> The behavior for manual commits is inconsistent at the moment (Test on Camel 
> versions 3.18.5 and 3.21.1),  depending on whether the KafkaComponent is 
> configured on component or endpoint level
> * If configured on component level, will still automatically commit offsets 
> back to broker (after records from a partition have been processed)
> * If configured on endpoint level, will never automatically commit offsets
> * autoCommitEnable is 
> h3. Configuration of manual commit for Kafka client during startup
> During startup for Kafka component, Camel context will go through the 
> following classes and methods while setting up manual commit configuration.
> h4. KafkaComponent
> [https://github.com/apache/camel/blob/camel-3.18.5/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java#L222]
> *Class:* org.apache.camel.component.kafka.KafkaComponent
> *Method:* doInit
>  
>  * When the component is being initialized, will check if 
> configuration.isAllowManualCommit() is true AND kafkaManualCommitFactory is 
> null
>  ** If they are, will initialize kafkaManualCommitFactory as 
> DefaultKafkaManualCommitFactory
>  ** If not, will leave kafkaManualCommitFactory as null
>  
> h4. CommitManagers
> [https://github.com/apache/camel/blob/camel-3.18.5/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java#L32]
> *Class:* org.apache.camel.component.kafka.consumer.CommitManagers
> *Method:* createCommitManager
> * Called when initializing Consumer
> ** For clarity's sake, omitting Async-related logic
> ** If configuration.isAllowManualCommit() is true AND manualCommitFactory is 
> instance of DefaultKafkaManualCommitFactory
> *** Will initialize SyncCommitManager
> *** Otherwise, will initialize NoopCommitManager
> h4. Summary
> * Configuring manual commits on component level still results in automatic 
> commits when the changing partitions (or all records processed) since 
> *SyncCommitManager* is configured
> ** allowManualCommit is true when calling *doInit* -> initializes 
> kafkaManualCommitFactory as DefaultKafkaManualCommitFactory -> initilize 
> commitManager as SyncCommitManager
> * If allowManualCommit is only configured on endpoint level, it will be false 
> when invoking *doInit*, therefore leaving kafkaManualCommitFactory as null, 
> resulting in NoopCommitManager in *createCommitManager*
> * Configure manual commits on endpoint level



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to