Dear all, I was trying to consume events from Azure Event Hub using Camel. The code snippet for the producer and consumer routes is given below: As per the camel documentation https://camel.apache.org/components/latest/azure-eventhubs-component.html#_consumer_example, do we have any reference on how to implement the EventPosition. Using the below code (AzureEventHubRouteBuilder), I was able to post to the event hub, but on consuming from the offset, I get the below warning in the console. Any pointers on how to consume the events from Azure Event Hub from Camel ?
[INFO ] 2021-10-04 14:36:12.472 --- [pool-7-thread-1] PartitionBasedLoadBalancer - - - Load balancer already running [WARN ] 2021-10-04 14:36:22.400 --- [parallel-3] PartitionBasedLoadBalancer - - - Load balancing for event processor failed - Did not observe any item or terminal signal within 60000ms in 'filter' (and no fallback has been configured) Did not observe any item or terminal signal within 60000ms in 'filter' (and no fallback has been configured) [WARN ] 2021-10-04 14:36:22.400 --- [parallel-3] EventHubsConsumer - - - Error processing exchange. Exchange[]. Caused by: [java.util.concurrent.TimeoutException - Did not observe any item or terminal signal within 60000ms in 'filter' (and no fallback has been configured)] java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 60000ms in 'filter' (and no fallback has been configured) at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:295) Code snippet below: @Component public class AzureEventHubRouteBuilder extends RouteBuilder { @Override public void configure() throws Exception { //producer from("timer://runOnce?repeatCount=1") .log("Event hub route") .process(exchange -> { exchange.getIn().setHeader(EventHubsConstants. PARTITION_KEY, "testPartition"); exchange.getIn().setHeader(EventHubsConstants.OFFSET, 1); exchange.getIn().setBody("test event"); }) .to("azure-eventhubs://?connectionString=Endpoint=sb:// XXX.servicebus.windows.net/;SharedAccessKeyName=ABC;SharedAccessKey=ZZZ;EntityPath=yyy ") .log("Success from event hub"); //consumer from("azure-eventhubs://?connectionString=Endpoint=sb:// XXX.servicebus.windows.net/;SharedAccessKeyName=ABC;SharedAccessKey=ZZZ;EntityPath=yyy "+ "&blobAccountName=QQQ&blobAccessKey=ABC=&blobContainerName=yyy&eventPosition=#eventPosition") .log("${body}") .log("SUCCESS"); } @Bean public Map<String,EventPosition> eventPosition() { return Map.of("testPartition",EventPosition.fromOffset(1)); } } Thanks, Divya