[ https://issues.apache.org/jira/browse/CAMEL-13768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Otavio Rodolfo Piske updated CAMEL-13768: ----------------------------------------- Fix Version/s: (was: 3.x) 3.12.0 > Seek to specific offset and KafkaConsumer access > ------------------------------------------------- > > Key: CAMEL-13768 > URL: https://issues.apache.org/jira/browse/CAMEL-13768 > Project: Camel > Issue Type: New Feature > Components: camel-kafka > Affects Versions: 2.24.1 > Reporter: michael elbaz > Assignee: Otavio Rodolfo Piske > Priority: Major > Fix For: 3.12.0 > > > 1. Provide a way to rewind kafka offset to specific offset (improve seekTo ?) > there is no way to do that using camel-kafka component. The main idea is to > replay older kafka messages without starting from the beginning. > for example: > https://blog.sysco.no/integration/kafka-rewind-consumers-offset/ > {code:java} > boolean flag = true; > while (true) { > ConsumerRecords<String, String> records = consumer.poll(100); > if(flag) { > Map<TopicPartition, Long> query = new HashMap<>(); > query.put( > new TopicPartition("simple-topic-1", 0), > Instant.now().minus(10, MINUTES).toEpochMilli()); > // Get offset from timestamp > Map<TopicPartition, OffsetAndTimestamp> result = > consumer.offsetsForTimes(query); > // Rewind offset to previous position using seekTo > result.entrySet() > .stream() > .forEach(entry -> consumer.seek(entry.getKey(), > entry.getValue().offset())); > flag = false; > } > for (ConsumerRecord<String, String> record : records) > System.out.printf("offset = %d, key = %s, value = %s%n", > record.offset(), record.key(), record.value()); > } > {code} > 2. Provide a way to access kafkaConsumer > Add camel header with reference to kafkaConsumer to be able to perform > some Kafka api call.We can use the same way that we do with KafkaManualCommit > {code:java} > public void process(Exchange exchange) { > KafkaManualCommit manual = > exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, > KafkaManualCommit.class); > manual.commitSync(); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)