[ 
https://issues.apache.org/jira/browse/CAMEL-12525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570250#comment-16570250
 ] 

Claus Ibsen commented on CAMEL-12525:
-------------------------------------

Can you try with a newer Camel version

> camel-kafka component commits the offset as soon as it is retrieved
> -------------------------------------------------------------------
>
>                 Key: CAMEL-12525
>                 URL: https://issues.apache.org/jira/browse/CAMEL-12525
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.21.0
>         Environment: Linux
>            Reporter: Mukesh
>            Priority: Major
>
> I am trying the maual commit from consumer below is the code snippet, i want 
> to consume and  commit the message after 2 mins of its arrival in the topic. 
> My consumer retrieves and checks the time difference if it is above 2 mins 
> then it should commit. But message once retrieved and not committed manually. 
> I am expecting it to come back but it does not comeback ever.  when i try 
> creating kafka consumer it works fine
> public void configure() throws Exception {
> from("kafka:BENEFITSLOADER.LOAD?brokers=xxxx:9092,xxxx:9092,xxxx:9092&groupId=BENEFITSLOADER&consumersCount=1&pollTimeoutMs=1000&autoCommitEnable=false&allowManualCommit=true&maxPollRecords=1")
> .process(new Processor() {
>  @Override
>  public void process(Exchange exchange) throws Exception {
>  Long msgDateTime = (Long) 
> exchange.getIn().getHeaders().get(KafkaConstants.TIMESTAMP);
>  System.out.println("Message : " + (exchange.getIn().getHeaders()));
>  System.out.println("Message : " + (exchange.getIn().getBody()));
>  Date msgDate = new Date(msgDateTime);
>  Date currentDate = new Date();
>  long diff = currentDate.getTime() - msgDate.getTime();
>  long diffMinutes = diff / (60 * 1000) % 60;
>  System.out.println("Difference in Minutes " + diffMinutes);
>  KafkaManualCommit manualCommit = 
> exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, 
> KafkaManualCommit.class);
> if(diffMinutes > 2)
>  {
>  System.out.println("Commiting Message " + exchange.getIn().getBody()); 
>  manualCommit.commitSync(); 
>  } 
>  }
>  });
>  }
>  }
>  
>  
> Code that works fine
>  
> public class TestKafkaConsumer {
>  static Consumer<String, String> consumer = null;
>  static ConsumerRecord<String,String> fetchedRecord; 
>  static ConsumerRecords<String, String> records;
>  public static void main(String... args) {
> String topicName = "BENEFITSLOADER.LOAD";
>  consumer = createConsumer();
>  consumer.subscribe(Collections.singletonList(topicName));
> try {
>  while (true) {
>  
>  if(fetchedRecord == null)
>  records = consumer.poll(1000);
>  
>  
>  records.forEach(record -> { 
>  fetchedRecord = record; 
>  });
>  
>  if(fetchedRecord != null)
>  {
>  Date msgDate = new Date(fetchedRecord.timestamp());
>  Date date = new Date(System.currentTimeMillis());
>  long diff = date.getTime() - msgDate.getTime();
>  long diffMinutes = diff / (60 * 1000) % 60;
>  
>  System.out.printf("Consumer Record:(%s, %s, %d, %d)\n",
>  fetchedRecord.key(), fetchedRecord.value(),
>  fetchedRecord.partition(), fetchedRecord.offset());
>  if(diffMinutes > 2)
>  {
>  System.out.printf("Consumer Record Commiting:(%s, %s, %d, %d)\n",
>  fetchedRecord.key(), fetchedRecord.value(),
>  fetchedRecord.partition(), fetchedRecord.offset());
>  consumer.commitSync();
>  System.out.println("Commited");
>  fetchedRecord = null;
>  }
>  } 
>  }
>  }
> catch (Exception ex) {
>  ex.printStackTrace();
>  } finally { 
>  consumer.close();
>  }
> }
> private static Consumer<String, String> createConsumer() {
>  Properties props = new Properties();
>  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "xxx:9092,xxx:9092,xxx:9093");
>  props.put(ConsumerConfig.GROUP_ID_CONFIG, "BENEFITSLOADER");
>  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
>  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
>  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
>  props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
>  return new KafkaConsumer<>(props);
>  }
> }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to