Hi Team, I am using multicast with AggreationStrategy an also using onException on my route.
Happy case is working fine, But when there is any exception, my exception processor handle it and set a header and add error message in the body of the exchange. When call go back to aggregator, it check whether the body of the exchange received is not null and of type Class A or not. if not then we know this exchange is returned by the Exception handler. and we return that exchange. When testing for exception , my mock end point does not receive exchange set by the exception processor. Kindly guide me where I am getting things wrong.: Below is the source code: Route definition :------------------------------------- onException(RecoverableException.class) .maximumRedeliveries(2) .handled(true) .beanRef("myCustomeExceptionHandler","handleException"); from("{{route.direct.endpoint}}") .routeId(ROUTE_ID) .multicast(new MyAggregationStrategy()) .parallelProcessing() .timeout(COMPLETION_TIMEOUT) .to(GET_DETAILS, GET_SUMMARY, GET_IMAGE) .end() .marshal(responseFormat); from(GET_SUMMARY) .choice() .when(summaryIsRequested) .beanRef("myCustomeService", "getSummary") .otherwise() .setBody().constant(null) .log(LoggingLevel.DEBUG, "Skipping summary - not requested.") .end(); from(GET_DETAILS) .choice() .when(detailsAreRequested) .beanRef("myCustomeService", "getDetails") .otherwise() .setBody().constant(null) .log(LoggingLevel.DEBUG, "Skipping details - not requested.") .end(); from(GET_IMAGE) .routeId("podImageRoute") .choice() .when(podImageIsRequested) .beanRef("myCustomeService", "getImage") .otherwise() .setBody().constant(null) .log(LoggingLevel.DEBUG, "Skipping image - not requested.") .end(); Exception Handler :------------------------------------------------------------------ public class HttpResponseExceptionHandler implements ExceptionHandler { @Override public void handleRecoverableException(Exchange exchange) { Throwable cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); String failureRouteId = exchange.getProperty(Exchange.FAILURE_ROUTE_ID, String.class); String errorMessage = cause.getMessage(); LOGGER.error("Exception occurred in the route {}. Exception details are: {}", failureRouteId, errorMessage); exchange.getIn().removeHeaders("*"); exchange.getIn().setHeader(Exchange.HTTP_RESPONSE_CODE, statusCode); exchange.getIn().setBody(errorMessage); } } Aggregator code: ---------------------------- public class MyCustomeAggregationStrategy implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { // occurs for a new group return newExchange; } if (exchangeHasNonNullBody(oldExchange) && exchangeContainBodyofTypeClassA(oldExchange)) { oldBody=oldExchange.getIn().getBody(); } else { return oldExchange; // assuming that body contain string message set in Exception Handler } } if (exchangeHasNonNullBody(newExchange) && exchangeContainBodyofTypeClassA(newExchange)) { newBody=newExchange.getIn().getBody(); } else { return newExchange; // assuming that body contain string message set in Exception Handler } } MergeBodyofOldAndNewExchange(oldExchange,oldBody,newBody); return oldExchange; } ---------------------- Test case :--------------------- @DirtiesContext @Test public void testExceptionHandeling() throws Exception { // Given when(myService.getImage(anyString(), anyString())).thenThrow(new ConnectionException("Exception of type ConnectionException")); mockEndpointResult.expectedMessageCount(1); // When try { template.sendBody("{{route.direct.endpoint}}", request); Thread.sleep(1000); // wait a few moments... //then mockEndpointResult.assertIsSatisfied(); // fails here } catch (CamelExecutionException ex) { //then fail(); } } -- Regards, atg roxx