Hi Guys, I've been trying to sort this issue. I've created a custom aggregation strategy instead of relying on the inbuilt "groupExchanges()". My aggregation strategy is shown below:
@Service( value = "keAggregationStrategy" ) public class KEAggregationStrategy implements IKEAggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Object newBody = newExchange.getIn().getBody(); ArrayList<Object> list = null; if(oldExchange == null) { list = new ArrayList<Object>(); list.add(newBody); newExchange.getIn().setBody(list); return newExchange; } else { list = oldExchange.getIn().getBody(ArrayList.class); list.add(newBody); return oldExchange; } } } Now, when I run the app, the following exception is thrown by ActiveMQ: FYI: My environment is ActiveMQ 5.8, Camel 2.11.0. I'm not using embedded ActiveMQ. javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: com.package.models.bc.ke.MyEntityClass On Mon, May 27, 2013 at 3:08 PM, Okello Nelson <cn.oke...@gmail.com> wrote: > Hi Guys, > > I have the following Java DSL: > > from(queuePrefix + fileType + "_ValidQ?concurrentConsumers=100") > .routeId(fileType + "_ValidQ-To-DB") > .onException(Exception.class).redeliveryPolicyRef("redeliverypolicy") > .to(queuePrefix + fileType + "_DBLoading_ErrorQ").end() > .aggregate(header("CRBOriginalFileName")) > .completionSize(5000) > .parallelProcessing() > .groupExchanges() > .to("bean:keBouncedChequeLoader"); > > The destination is a bean. The bean is as shown below: > > public void process(Exchange exchange) throws Exception { > List<Exchange> exchanges = exchange.getProperty(Exchange.GROUPED_EXCHANGE, > List.class); > Session session = SessionFactoryUtils.getSession(sessionFactory, true); > Transaction tx = session.beginTransaction(); > for(int i = 0; i < exchanges.size(); i++) { > KEBouncedCheque bc = (KEBouncedCheque) exchanges.get(i).getIn().getBody(); > session.save(exchanges.get(i)); > FileUtils.writeStringToFile(new File("C:/tmp/bc.txt"), > bc.getClientNumber() + "\n", true); > if( i % 20 == 0) { > session.flush(); > session.clear(); > } > } > tx.commit(); > session.close(); > } > > The message doesn't seem to reach this bean. In ActiveMQ, I'm seeing the > error > > "Unknown message type [org.apache.activemq.command.ActiveMQMessage]"... > > I'm not sure what I'm doing wrong. Any assistance will be appreciated very > much. > > Kind Regards, > Okello Nelson. > -- Kind Regards, Okello Nelson +254 722 137 826 cn.oke...@gmail.com