[ 
https://issues.apache.org/jira/browse/CAMEL-12732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16581403#comment-16581403
 ] 

ASF GitHub Bot commented on CAMEL-12732:
----------------------------------------

GitHub user mikadev opened a pull request:

    https://github.com/apache/camel/pull/2475

    CAMEL-12732 Take into account autoCommitEnable

    Check if autocommit is enabled before setting the state
    
    (Fix the manual commit for the state)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mikadev/camel patch-3

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/camel/pull/2475.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2475
    
----
commit 880656f64c35cb3f8cdac09236890917d2a0f2e9
Author: elbaz michael <michaelelbaz@...>
Date:   2018-08-15T17:40:22Z

    CAMEL-12732 Take into account autoCommitEnable
    
    Check if autocommit is enabled before setting the state
    
    (Fix the manual commit for the state)

----


> Kafka manual commit to file repository doesn't work properly (using Spring 
> boot)
> --------------------------------------------------------------------------------
>
>                 Key: CAMEL-12732
>                 URL: https://issues.apache.org/jira/browse/CAMEL-12732
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.22.0
>         Environment: Spring boot
> kafka_2.11-1.1.0
>            Reporter: michael elbaz
>            Priority: Major
>
> I'im trying to save the Kafka offset into FileStateRepository, the offset is 
> correctly writing but it is not reading at route start so camel will read all 
> the topic every time
>  
> {code:java}
> @Component
> public class Route extends RouteBuilder {
>     @Override
>     public void configure() throws Exception {
>         from(kafka())
>                 .to("log:TEST?level=INFO")
>                 .process(Route::commitKafka);
>     }
>     private String kafka() {
>         String kafkaEndpoint = "kafka:";
>         kafkaEndpoint += "topictest";
>         kafkaEndpoint += "?brokers=";
>         kafkaEndpoint += "localhost:9092";
>         kafkaEndpoint += "&groupId=";
>         kafkaEndpoint += "TEST";
>         kafkaEndpoint += "&autoOffsetReset=";
>         kafkaEndpoint += "earliest";
>         kafkaEndpoint += "&autoCommitEnable=";
>         kafkaEndpoint += false;
>         kafkaEndpoint += "&allowManualCommit=";
>         kafkaEndpoint += true;
>         kafkaEndpoint += "&offsetRepository=";
>         kafkaEndpoint += "#fileStore";
>         return kafkaEndpoint;
>     }
>     @Bean(name = "fileStore")
>     private FileStateRepository fileStateRepository() {
>         FileStateRepository fileStateRepository = 
> FileStateRepository.fileStateRepository(new 
> File("/kafka/offset_repo/repo.dat"));
>         // This will be empty
>         // System.out.println(fileStateRepository.getCache());
>         return fileStateRepository;
>     }
>     private static void commitKafka(Exchange exchange) {
>         KafkaManualCommit manual = 
> exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, 
> KafkaManualCommit.class);
>         manual.commitSync();
>     }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to