My Solution: ========== Guys, I managed to sort out my issue in the following manner:
I had to copy my app jar into the ActiveMQ's lib directory. This was so that ActiveMQ can have access to my data transfer objects. Next, I had to create a custom Aggregator (look at my previous post). In my custom aggregator, all I was doing is append to a list the body of an exchange that comes in. This body represented a business object (model class). Thirdly, in my route definition, in addition to having completionSize, I also had to include completionTimeout. This was so that when the number of exchanges could not reach my completionSize (when all exchanges have been received and there are no more), then the only completion condition that could be fulfilled is the completionTimeout. Lastly, in my bean, the one that does batch inserts into a PostgreSQL database was just an ordinary processor, and the exchange contained a list of my objects. Nothing like Exchange.GROUPED_EXCHANGES. Just the exchange.getBody(List<MyModelClass.class>). Kind Regards, Okello Nelson. On Mon, May 27, 2013 at 5:18 PM, Okello Nelson <cn.oke...@gmail.com> wrote: > Hi Guys, > > I've been trying to sort this issue. I've created a custom aggregation > strategy instead of relying on the inbuilt "groupExchanges()". My > aggregation strategy is shown below: > > @Service( value = "keAggregationStrategy" ) > public class KEAggregationStrategy implements IKEAggregationStrategy { > > public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { > Object newBody = newExchange.getIn().getBody(); > ArrayList<Object> list = null; > if(oldExchange == null) { > list = new ArrayList<Object>(); > list.add(newBody); > newExchange.getIn().setBody(list); > return newExchange; > } else { > list = oldExchange.getIn().getBody(ArrayList.class); > list.add(newBody); > return oldExchange; > } > } > > } > > Now, when I run the app, the following exception is thrown by ActiveMQ: > FYI: My environment is ActiveMQ 5.8, Camel 2.11.0. I'm not using embedded > ActiveMQ. > > javax.jms.JMSException: Failed to build body from content. Serializable class > not available to broker. Reason: java.lang.ClassNotFoundException: > com.package.models.bc.ke.MyEntityClass > > > > > On Mon, May 27, 2013 at 3:08 PM, Okello Nelson <cn.oke...@gmail.com>wrote: > >> Hi Guys, >> >> I have the following Java DSL: >> >> from(queuePrefix + fileType + "_ValidQ?concurrentConsumers=100") >> .routeId(fileType + "_ValidQ-To-DB") >> .onException(Exception.class).redeliveryPolicyRef("redeliverypolicy") >> .to(queuePrefix + fileType + "_DBLoading_ErrorQ").end() >> .aggregate(header("CRBOriginalFileName")) >> .completionSize(5000) >> .parallelProcessing() >> .groupExchanges() >> .to("bean:keBouncedChequeLoader"); >> >> The destination is a bean. The bean is as shown below: >> >> public void process(Exchange exchange) throws Exception { >> List<Exchange> exchanges = >> exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class); >> Session session = SessionFactoryUtils.getSession(sessionFactory, true); >> Transaction tx = session.beginTransaction(); >> for(int i = 0; i < exchanges.size(); i++) { >> KEBouncedCheque bc = (KEBouncedCheque) exchanges.get(i).getIn().getBody(); >> session.save(exchanges.get(i)); >> FileUtils.writeStringToFile(new File("C:/tmp/bc.txt"), >> bc.getClientNumber() + "\n", true); >> if( i % 20 == 0) { >> session.flush(); >> session.clear(); >> } >> } >> tx.commit(); >> session.close(); >> } >> >> The message doesn't seem to reach this bean. In ActiveMQ, I'm seeing the >> error >> >> "Unknown message type [org.apache.activemq.command.ActiveMQMessage]"... >> >> I'm not sure what I'm doing wrong. Any assistance will be appreciated >> very much. >> >> Kind Regards, >> Okello Nelson. >> > > > > -- > Kind Regards, > Okello Nelson > +254 722 137 826 > cn.oke...@gmail.com > -- Kind Regards, Okello Nelson +254 722 137 826 cn.oke...@gmail.com