Hi Team,

I am using multicast with AggreationStrategy an also using onException on
my route.

Happy case is working fine,

But when there is any exception, my exception processor handle it and set a
header and add error message in the body of the exchange.

When call go back to aggregator, it check whether the body of the exchange
received  is not null and of type Class A or not. if not then we know this
exchange is returned by the Exception handler. and we return that exchange.

When testing for exception , my mock end point does not receive exchange
set by the exception processor.

Kindly guide me where I am getting things wrong.:

Below is the source code:

Route definition :-------------------------------------

 onException(RecoverableException.class)
                .maximumRedeliveries(2)
                .handled(true)
                .beanRef("myCustomeExceptionHandler","handleException");


   from("{{route.direct.endpoint}}")
            .routeId(ROUTE_ID)
            .multicast(new MyAggregationStrategy())
                .parallelProcessing()
                .timeout(COMPLETION_TIMEOUT)
                .to(GET_DETAILS, GET_SUMMARY, GET_IMAGE)
            .end()
            .marshal(responseFormat);

        from(GET_SUMMARY)
            .choice()
                .when(summaryIsRequested)
                    .beanRef("myCustomeService", "getSummary")
            .otherwise()
                .setBody().constant(null)
                .log(LoggingLevel.DEBUG, "Skipping  summary - not
requested.")
            .end();

        from(GET_DETAILS)
            .choice()
                .when(detailsAreRequested)
                    .beanRef("myCustomeService", "getDetails")
            .otherwise()
                .setBody().constant(null)
                .log(LoggingLevel.DEBUG, "Skipping  details - not
requested.")
            .end();

        from(GET_IMAGE)
            .routeId("podImageRoute")
            .choice()
                .when(podImageIsRequested)
                    .beanRef("myCustomeService", "getImage")
            .otherwise()
                .setBody().constant(null)
                .log(LoggingLevel.DEBUG, "Skipping image - not requested.")
            .end();

Exception Handler
:------------------------------------------------------------------
public class HttpResponseExceptionHandler implements ExceptionHandler {
@Override
public void handleRecoverableException(Exchange exchange) {
Throwable cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT,
Exception.class);
String failureRouteId = exchange.getProperty(Exchange.FAILURE_ROUTE_ID,
String.class);
String errorMessage = cause.getMessage();
LOGGER.error("Exception occurred in the route {}. Exception details are:
{}", failureRouteId, errorMessage);
exchange.getIn().removeHeaders("*");
exchange.getIn().setHeader(Exchange.HTTP_RESPONSE_CODE, statusCode);
exchange.getIn().setBody(errorMessage);
}
}

Aggregator code: ----------------------------

public class MyCustomeAggregationStrategy implements AggregationStrategy {

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

        if (oldExchange == null) { // occurs for a new group
            return newExchange;
        }

        if (exchangeHasNonNullBody(oldExchange) &&
exchangeContainBodyofTypeClassA(oldExchange)) {
            oldBody=oldExchange.getIn().getBody();
            } else {
                return oldExchange; // assuming that body contain string
message set in Exception Handler
            }
        }

        if (exchangeHasNonNullBody(newExchange) &&
exchangeContainBodyofTypeClassA(newExchange)) {
            newBody=newExchange.getIn().getBody();
            } else {
                return newExchange; // assuming that body contain string
message set in Exception Handler
            }
        }

         MergeBodyofOldAndNewExchange(oldExchange,oldBody,newBody);

        return oldExchange;
    }

----------------------

Test case :---------------------


 @DirtiesContext
    @Test
    public void testExceptionHandeling() throws Exception {

        // Given
        when(myService.getImage(anyString(), anyString())).thenThrow(new
ConnectionException("Exception of type ConnectionException"));
        mockEndpointResult.expectedMessageCount(1);

        // When
        try {
            template.sendBody("{{route.direct.endpoint}}", request);
            Thread.sleep(1000); // wait a few moments...
            //then
            mockEndpointResult.assertIsSatisfied();  // fails here

        } catch (CamelExecutionException ex) {
            //then
           fail();
        }
    }


-- Regards,
atg roxx

Reply via email to