[ 
https://issues.apache.org/jira/browse/CAMEL-12732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16582019#comment-16582019
 ] 

ASF GitHub Bot commented on CAMEL-12732:
----------------------------------------

dmvolod edited a comment on issue #2476: CAMEL-12732 Take into account 
autoCommitEnable for the state
URL: https://github.com/apache/camel/pull/2476#issuecomment-413438782
 
 
   If some parameters are incompatible you can detect this and raise an 
exception or waiting.
   Please also add a JUnit test for this option. You can add real test and mark 
it with @Ignored. I'm planning to add testcontainers option for Kafka component 
and able to run real test against container if possible.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka manual commit to file repository doesn't work properly (using Spring 
> boot)
> --------------------------------------------------------------------------------
>
>                 Key: CAMEL-12732
>                 URL: https://issues.apache.org/jira/browse/CAMEL-12732
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.22.0
>         Environment: Spring boot
> kafka_2.11-1.1.0
>            Reporter: michael elbaz
>            Priority: Major
>
> I'im trying to save the Kafka offset into FileStateRepository, the offset is 
> correctly writing but it is not reading at route start so camel will read all 
> the topic every time
>  
> {code:java}
> @Component
> public class Route extends RouteBuilder {
>     @Override
>     public void configure() throws Exception {
>         from(kafka())
>                 .to("log:TEST?level=INFO")
>                 .process(Route::commitKafka);
>     }
>     private String kafka() {
>         String kafkaEndpoint = "kafka:";
>         kafkaEndpoint += "topictest";
>         kafkaEndpoint += "?brokers=";
>         kafkaEndpoint += "localhost:9092";
>         kafkaEndpoint += "&groupId=";
>         kafkaEndpoint += "TEST";
>         kafkaEndpoint += "&autoOffsetReset=";
>         kafkaEndpoint += "earliest";
>         kafkaEndpoint += "&autoCommitEnable=";
>         kafkaEndpoint += false;
>         kafkaEndpoint += "&allowManualCommit=";
>         kafkaEndpoint += true;
>         kafkaEndpoint += "&offsetRepository=";
>         kafkaEndpoint += "#fileStore";
>         return kafkaEndpoint;
>     }
>     @Bean(name = "fileStore")
>     private FileStateRepository fileStateRepository() {
>         FileStateRepository fileStateRepository = 
> FileStateRepository.fileStateRepository(new 
> File("/kafka/offset_repo/repo.dat"));
>         // This will be empty
>         // System.out.println(fileStateRepository.getCache());
>         return fileStateRepository;
>     }
>     private static void commitKafka(Exchange exchange) {
>         KafkaManualCommit manual = 
> exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, 
> KafkaManualCommit.class);
>         manual.commitSync();
>     }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to