Hi 

**I also posted this question on StackOverflow** => 
https://stackoverflow.com/q/63519628/8035582


I "monitor" the number of consecutive failures in my Camel processing pipeline 
with a Camel RoutePolicy.

When a threshold of failures is reached, I want to pause the processing for a 
configured amount of time because it probably means that the data from another 
system is not yet ready and therefore every message fails.

Since the source of my pipeline is a Kafka topic, I should not just stop the 
whole route because the broker would assume my consumer died and rebalance.

The best way to "pause" topic consumption seems to be to pause[1] the 
KafkaConsumer (the native, not the one of Camel). Like this, the consumer 
continues to poll the broker, but it does not fetch any messages. 

**How can I access the native KafkaConsumer from the RoutePolicy context to 
call the pause and resume methods?**


The spring-kafka listener containers offers these methods[2], but when I use a 
Spring consumer, I would need to connect it with my Camel routes and it would 
get hairy to commit messages.

[1] 
https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#pause-java.util.Collection-
[2] 
https://docs.spring.io/spring-kafka/docs/2.2.14.RELEASE/reference/html/#pause-resume

Thanks 
Stephan


Reply via email to