Dear all,

I was trying to consume events from Azure Event Hub using Camel. The code
snippet for the producer and consumer routes is given below:
As per the camel documentation
https://camel.apache.org/components/latest/azure-eventhubs-component.html#_consumer_example,
do we have any reference on how to implement the EventPosition. Using the
below code (AzureEventHubRouteBuilder), I was able to post to the event
hub, but on consuming from the offset, I get the below warning in the
console. Any pointers on how to consume the events from Azure Event Hub
from Camel ?

[INFO ] 2021-10-04 14:36:12.472 --- [pool-7-thread-1]
PartitionBasedLoadBalancer -  -  - Load balancer already running
[WARN ] 2021-10-04 14:36:22.400 --- [parallel-3] PartitionBasedLoadBalancer
-  -  - Load balancing for event processor failed - Did not observe any
item or terminal signal within 60000ms in 'filter' (and no fallback has
been configured)
Did not observe any item or terminal signal within 60000ms in 'filter' (and
no fallback has been configured)
[WARN ] 2021-10-04 14:36:22.400 --- [parallel-3] EventHubsConsumer -  -  -
Error processing exchange. Exchange[]. Caused by:
[java.util.concurrent.TimeoutException - Did not observe any item or
terminal signal within 60000ms in 'filter' (and no fallback has been
configured)]
java.util.concurrent.TimeoutException: Did not observe any item or terminal
signal within 60000ms in 'filter' (and no fallback has been configured)
at
reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:295)


Code snippet below:

@Component
public class AzureEventHubRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        //producer
        from("timer://runOnce?repeatCount=1")
                .log("Event hub route")
                .process(exchange -> {
                   exchange.getIn().setHeader(EventHubsConstants.
PARTITION_KEY, "testPartition");

          exchange.getIn().setHeader(EventHubsConstants.OFFSET, 1);

                    exchange.getIn().setBody("test event");
                })
                .to("azure-eventhubs://?connectionString=Endpoint=sb://
XXX.servicebus.windows.net/;SharedAccessKeyName=ABC;SharedAccessKey=ZZZ;EntityPath=yyy
")
                .log("Success from event hub");

         //consumer
from("azure-eventhubs://?connectionString=Endpoint=sb://
XXX.servicebus.windows.net/;SharedAccessKeyName=ABC;SharedAccessKey=ZZZ;EntityPath=yyy
"+

"&blobAccountName=QQQ&blobAccessKey=ABC=&blobContainerName=yyy&eventPosition=#eventPosition")
               .log("${body}")
                .log("SUCCESS");

    }

    @Bean

public Map<String,EventPosition> eventPosition() {
   return Map.of("testPartition",EventPosition.fromOffset(1));
}

}


Thanks,
Divya

Reply via email to