[ https://issues.apache.org/jira/browse/CAMEL-12525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570250#comment-16570250 ]
Claus Ibsen commented on CAMEL-12525: ------------------------------------- Can you try with a newer Camel version > camel-kafka component commits the offset as soon as it is retrieved > ------------------------------------------------------------------- > > Key: CAMEL-12525 > URL: https://issues.apache.org/jira/browse/CAMEL-12525 > Project: Camel > Issue Type: Bug > Components: camel-kafka > Affects Versions: 2.21.0 > Environment: Linux > Reporter: Mukesh > Priority: Major > > I am trying the maual commit from consumer below is the code snippet, i want > to consume and commit the message after 2 mins of its arrival in the topic. > My consumer retrieves and checks the time difference if it is above 2 mins > then it should commit. But message once retrieved and not committed manually. > I am expecting it to come back but it does not comeback ever. when i try > creating kafka consumer it works fine > public void configure() throws Exception { > from("kafka:BENEFITSLOADER.LOAD?brokers=xxxx:9092,xxxx:9092,xxxx:9092&groupId=BENEFITSLOADER&consumersCount=1&pollTimeoutMs=1000&autoCommitEnable=false&allowManualCommit=true&maxPollRecords=1") > .process(new Processor() { > @Override > public void process(Exchange exchange) throws Exception { > Long msgDateTime = (Long) > exchange.getIn().getHeaders().get(KafkaConstants.TIMESTAMP); > System.out.println("Message : " + (exchange.getIn().getHeaders())); > System.out.println("Message : " + (exchange.getIn().getBody())); > Date msgDate = new Date(msgDateTime); > Date currentDate = new Date(); > long diff = currentDate.getTime() - msgDate.getTime(); > long diffMinutes = diff / (60 * 1000) % 60; > System.out.println("Difference in Minutes " + diffMinutes); > KafkaManualCommit manualCommit = > exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, > KafkaManualCommit.class); > if(diffMinutes > 2) > { > System.out.println("Commiting Message " + exchange.getIn().getBody()); > manualCommit.commitSync(); > } > } > }); > } > } > > > Code that works fine > > public class TestKafkaConsumer { > static Consumer<String, String> consumer = null; > static ConsumerRecord<String,String> fetchedRecord; > static ConsumerRecords<String, String> records; > public static void main(String... args) { > String topicName = "BENEFITSLOADER.LOAD"; > consumer = createConsumer(); > consumer.subscribe(Collections.singletonList(topicName)); > try { > while (true) { > > if(fetchedRecord == null) > records = consumer.poll(1000); > > > records.forEach(record -> { > fetchedRecord = record; > }); > > if(fetchedRecord != null) > { > Date msgDate = new Date(fetchedRecord.timestamp()); > Date date = new Date(System.currentTimeMillis()); > long diff = date.getTime() - msgDate.getTime(); > long diffMinutes = diff / (60 * 1000) % 60; > > System.out.printf("Consumer Record:(%s, %s, %d, %d)\n", > fetchedRecord.key(), fetchedRecord.value(), > fetchedRecord.partition(), fetchedRecord.offset()); > if(diffMinutes > 2) > { > System.out.printf("Consumer Record Commiting:(%s, %s, %d, %d)\n", > fetchedRecord.key(), fetchedRecord.value(), > fetchedRecord.partition(), fetchedRecord.offset()); > consumer.commitSync(); > System.out.println("Commited"); > fetchedRecord = null; > } > } > } > } > catch (Exception ex) { > ex.printStackTrace(); > } finally { > consumer.close(); > } > } > private static Consumer<String, String> createConsumer() { > Properties props = new Properties(); > props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > "xxx:9092,xxx:9092,xxx:9093"); > props.put(ConsumerConfig.GROUP_ID_CONFIG, "BENEFITSLOADER"); > props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); > return new KafkaConsumer<>(props); > } > } -- This message was sent by Atlassian JIRA (v7.6.3#76005)