So I have some code I've tried writing and I think it solves the problem. I just want to check to make sure I'm not goofing anything up. Also, in AggregationStrategy.aggregate() what exchange should I return from what I have?
public static void main(String args[]) { CamelContext c = new DefaultCamelContext(); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); context.addRoutes(new RouteBuilder() { public void configure() { from("jms:analytics.camelqueue").aggregate(new MyAggregationStrategy()).header("subject").completionSize(2).to("exec://FILEPATH?args="); } }); context.start(); } private static class MyAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) return newExchange; // and here is where combo stuff goes String oldBody = oldExchange.getIn().getBody(String.class); String newBody = newExchange.getIn().getBody(String.class); boolean oldSet = oldBody.contains("set"); boolean newSet = newBody.contains("set"); boolean oldFlow = oldBody.contains("flow"); boolean newFlow = newBody.contains("flow"); if ( (oldSet && newFlow) || (oldFlow && newSet) ) { //they match so start extractor then return } else { // no match so do something.... }