Hi guys,
I have a question about Kafka endpoint.
I’m not able to find a way to force a starting offset for a consumer group when
starting a route.
I would like to set the offset to the current timestamp for every new consumer
group (or better to be able to specify a logic to evaluate the starting point)
The option “autoOffsetReset=latest” doesn’t work because the group is a new one
and, in any case, I would like to skip to the latest available offset
regardless of possible commits.
In other words, I’d like to skip all the old messages for a topic once a new
group is added.
Something like:
KafkaConsumer<String, Object> kafkaConsumer = ….
….
Map<TopicPartition, Long> mapTopicPartition = new HashMap<>();
Set<TopicPartition> partitions = kafkaConsumer.assignment();
partitions.forEach(partition -> mapTopicPartition.put(partition, timestamp));
kafkaConsumer.offsetsForTimes(mapTopicPartition);
initialized = true;
Every suggestion will be appreciated!
Thanks!
Riccardo