I am trying to get a Camel Quarkus example that only use the Vertx IO threads
on front and backside. I have from:( platform-http:/xxxx) on the frontside and
sending the exchange to a multicast to 3 routes that all us async processors
that use the Quarkus Resteasy reactive HTTP clients to make the outbound calls
and return Completionstages. The multicast uses a
GroupedExchangeAggregationStrategy to collect the list of exchanges. The last
step is sending the list of exchanges to async processor that uses the list to
aggregate the backside responses . The thread that everything runs on is:
vert.x-worker-thread-0 according to the log output and the debugger.
GroupedExchangeAggregationStrategy aggregationStrategy = new
GroupedExchangeAggregationStrategy();
from("platform-http:/movie?produces=application/json")
.routeId("platform-http")
.to("log:Before Multicast")
.multicast(aggregationStrategy)
.parallelAggregate()
.to("direct:movies", "direct:episodes", "direct:people")
.end()
.process(aggregateProcessor) // Combine the responses
.to("log:platform-http Route");
from("direct:movies").routeId("movies")
.process(movieProcessor);
from("direct:episodes").routeId("episodes")
.process(episodesProcessor);
from("direct:people").routeId("people")
.process(peopleProcessor);
==========================================================================================
@ApplicationScoped
public class AggregateProcessor implements AsyncProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(AggregateProcessor.class);
@NonBlocking
public boolean process(Exchange exchange, AsyncCallback callback) {
ResponseHolder aResponseHolder = new ResponseHolder();
processAllExchanges(exchange, aResponseHolder);
setExchangeBodyToResponseHolderContents(exchange, aResponseHolder);
LOG.info("AggregateProcessor Done");
callback.done(false);
return false;
}
Keith Link
[email protected]