[ 
https://issues.apache.org/jira/browse/CAMEL-12732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16580921#comment-16580921
 ] 

michael elbaz edited comment on CAMEL-12732 at 8/15/18 10:55 AM:
-----------------------------------------------------------------

There is a second point even if i don't do the manual commit
{code:java}
.process(Route::commitKafka){code}
camel will write into the repo file this is not supposed to happen i think.


was (Author: michael992):
There is a second point even if i d'ont do the manual commit 
{code:java}
.process(Route::commitKafka){code}
camel will write into the repo file this is not supposed to happen i think.

> Kafka manual commit to file repository doesn't work (using Spring boot)
> -----------------------------------------------------------------------
>
>                 Key: CAMEL-12732
>                 URL: https://issues.apache.org/jira/browse/CAMEL-12732
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.22.0
>         Environment: Spring boot
> kafka_2.11-1.1.0
>            Reporter: michael elbaz
>            Priority: Major
>
> I'im trying to save the Kafka offset into FileStateRepository, the offset is 
> correctly writing but it is not reading at route start so camel will read all 
> the topic every time
>  
> {code:java}
> @Component
> public class Route extends RouteBuilder {
>     @Override
>     public void configure() throws Exception {
>         from(kafka())
>                 .to("log:TEST?level=INFO")
>                 .process(Route::commitKafka);
>     }
>     private String kafka() {
>         String kafkaEndpoint = "kafka:";
>         kafkaEndpoint += "topictest";
>         kafkaEndpoint += "?brokers=";
>         kafkaEndpoint += "localhost:9092";
>         kafkaEndpoint += "&groupId=";
>         kafkaEndpoint += "TEST";
>         kafkaEndpoint += "&autoOffsetReset=";
>         kafkaEndpoint += "earliest";
>         kafkaEndpoint += "&autoCommitEnable=";
>         kafkaEndpoint += false;
>         kafkaEndpoint += "&allowManualCommit=";
>         kafkaEndpoint += true;
>         kafkaEndpoint += "&offsetRepository=";
>         kafkaEndpoint += "#fileStore";
>         return kafkaEndpoint;
>     }
>     @Bean(name = "fileStore")
>     private FileStateRepository fileStateRepository() {
>         FileStateRepository fileStateRepository = 
> FileStateRepository.fileStateRepository(new 
> File("/kafka/offset_repo/repo.dat"));
>         // This will be empty
>         // System.out.println(fileStateRepository.getCache());
>         return fileStateRepository;
>     }
>     private static void commitKafka(Exchange exchange) {
>         KafkaManualCommit manual = 
> exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, 
> KafkaManualCommit.class);
>         manual.commitSync();
>     }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to