[ https://issues.apache.org/jira/browse/CAMEL-12732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16580921#comment-16580921 ]
michael elbaz commented on CAMEL-12732: --------------------------------------- There is a second point even if i d'ont do the manual commit {code:java} .process(Route::commitKafka){code} camel will write into the repo file this is not supposed to happen i think. > Kafka manual commit to file repository doesn't work (using Spring boot) > ----------------------------------------------------------------------- > > Key: CAMEL-12732 > URL: https://issues.apache.org/jira/browse/CAMEL-12732 > Project: Camel > Issue Type: Bug > Components: camel-kafka > Affects Versions: 2.22.0 > Environment: Spring boot > kafka_2.11-1.1.0 > Reporter: michael elbaz > Priority: Major > > I'im trying to save the Kafka offset into FileStateRepository, the offset is > correctly writing but it is not reading at route start so camel will read all > the topic every time > > {code:java} > @Component > public class Route extends RouteBuilder { > @Override > public void configure() throws Exception { > from(kafka()) > .to("log:TEST?level=INFO") > .process(Route::commitKafka); > } > private String kafka() { > String kafkaEndpoint = "kafka:"; > kafkaEndpoint += "topictest"; > kafkaEndpoint += "?brokers="; > kafkaEndpoint += "localhost:9092"; > kafkaEndpoint += "&groupId="; > kafkaEndpoint += "TEST"; > kafkaEndpoint += "&autoOffsetReset="; > kafkaEndpoint += "earliest"; > kafkaEndpoint += "&autoCommitEnable="; > kafkaEndpoint += false; > kafkaEndpoint += "&allowManualCommit="; > kafkaEndpoint += true; > kafkaEndpoint += "&offsetRepository="; > kafkaEndpoint += "#fileStore"; > return kafkaEndpoint; > } > @Bean(name = "fileStore") > private FileStateRepository fileStateRepository() { > FileStateRepository fileStateRepository = > FileStateRepository.fileStateRepository(new > File("/kafka/offset_repo/repo.dat")); > // This will be empty > // System.out.println(fileStateRepository.getCache()); > return fileStateRepository; > } > private static void commitKafka(Exchange exchange) { > KafkaManualCommit manual = > exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, > KafkaManualCommit.class); > manual.commitSync(); > } > } > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)