michael elbaz created CAMEL-12732: ------------------------------------- Summary: Kafka manual commit to file repository doesn't work Key: CAMEL-12732 URL: https://issues.apache.org/jira/browse/CAMEL-12732 Project: Camel Issue Type: Bug Components: camel-kafka Affects Versions: 2.22.0 Environment: Spring boot
kafka_2.11-1.1.0 Reporter: michael elbaz I'im trying to save the Kafka offset into FileStateRepository, the offset is correctly writing but it is not reading at route start so camel will read all the topic every time {code:java} @Component public class Route extends RouteBuilder { @Override public void configure() throws Exception { from(kafka()) .to("log:TEST?level=INFO") .process(Route::commitKafka); } private String kafka() { String kafkaEndpoint = "kafka:"; kafkaEndpoint += "topictest"; kafkaEndpoint += "?brokers="; kafkaEndpoint += "localhost:9092"; kafkaEndpoint += "&groupId="; kafkaEndpoint += "TEST"; kafkaEndpoint += "&autoOffsetReset="; kafkaEndpoint += "earliest"; kafkaEndpoint += "&autoCommitEnable="; kafkaEndpoint += false; kafkaEndpoint += "&allowManualCommit="; kafkaEndpoint += true; kafkaEndpoint += "&offsetRepository="; kafkaEndpoint += "#fileStore"; return kafkaEndpoint; } @Bean(name = "fileStore") private FileStateRepository fileStateRepository() { FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat")); // This will be empty // System.out.println(fileStateRepository.getCache()); return fileStateRepository; } private static void commitKafka(Exchange exchange) { KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); manual.commitSync(); } } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)