Hi Thanks for sharing your solution.
On Tue, May 28, 2013 at 9:30 AM, Okello Nelson <cn.oke...@gmail.com> wrote: > My Solution: > ========== > > Guys, I managed to sort out my issue in the following manner: > > I had to copy my app jar into the ActiveMQ's lib directory. This was so > that ActiveMQ can have access to my data transfer objects. > > Next, I had to create a custom Aggregator (look at my previous post). In my > custom aggregator, all I was doing is append to a list the body of an > exchange that comes in. This body represented a business object (model > class). > > Thirdly, in my route definition, in addition to having completionSize, I > also had to include completionTimeout. This was so that when the number of > exchanges could not reach my completionSize (when all exchanges have been > received and there are no more), then the only completion condition that > could be fulfilled is the completionTimeout. > > Lastly, in my bean, the one that does batch inserts into a PostgreSQL > database was just an ordinary processor, and the exchange contained a list > of my objects. Nothing like Exchange.GROUPED_EXCHANGES. Just the > exchange.getBody(List<MyModelClass.class>). > > Kind Regards, > Okello Nelson. > > > > > On Mon, May 27, 2013 at 5:18 PM, Okello Nelson <cn.oke...@gmail.com> wrote: > >> Hi Guys, >> >> I've been trying to sort this issue. I've created a custom aggregation >> strategy instead of relying on the inbuilt "groupExchanges()". My >> aggregation strategy is shown below: >> >> @Service( value = "keAggregationStrategy" ) >> public class KEAggregationStrategy implements IKEAggregationStrategy { >> >> public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { >> Object newBody = newExchange.getIn().getBody(); >> ArrayList<Object> list = null; >> if(oldExchange == null) { >> list = new ArrayList<Object>(); >> list.add(newBody); >> newExchange.getIn().setBody(list); >> return newExchange; >> } else { >> list = oldExchange.getIn().getBody(ArrayList.class); >> list.add(newBody); >> return oldExchange; >> } >> } >> >> } >> >> Now, when I run the app, the following exception is thrown by ActiveMQ: >> FYI: My environment is ActiveMQ 5.8, Camel 2.11.0. I'm not using embedded >> ActiveMQ. >> >> javax.jms.JMSException: Failed to build body from content. Serializable >> class not available to broker. Reason: java.lang.ClassNotFoundException: >> com.package.models.bc.ke.MyEntityClass >> >> >> >> >> On Mon, May 27, 2013 at 3:08 PM, Okello Nelson <cn.oke...@gmail.com>wrote: >> >>> Hi Guys, >>> >>> I have the following Java DSL: >>> >>> from(queuePrefix + fileType + "_ValidQ?concurrentConsumers=100") >>> .routeId(fileType + "_ValidQ-To-DB") >>> .onException(Exception.class).redeliveryPolicyRef("redeliverypolicy") >>> .to(queuePrefix + fileType + "_DBLoading_ErrorQ").end() >>> .aggregate(header("CRBOriginalFileName")) >>> .completionSize(5000) >>> .parallelProcessing() >>> .groupExchanges() >>> .to("bean:keBouncedChequeLoader"); >>> >>> The destination is a bean. The bean is as shown below: >>> >>> public void process(Exchange exchange) throws Exception { >>> List<Exchange> exchanges = >>> exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class); >>> Session session = SessionFactoryUtils.getSession(sessionFactory, true); >>> Transaction tx = session.beginTransaction(); >>> for(int i = 0; i < exchanges.size(); i++) { >>> KEBouncedCheque bc = (KEBouncedCheque) exchanges.get(i).getIn().getBody(); >>> session.save(exchanges.get(i)); >>> FileUtils.writeStringToFile(new File("C:/tmp/bc.txt"), >>> bc.getClientNumber() + "\n", true); >>> if( i % 20 == 0) { >>> session.flush(); >>> session.clear(); >>> } >>> } >>> tx.commit(); >>> session.close(); >>> } >>> >>> The message doesn't seem to reach this bean. In ActiveMQ, I'm seeing the >>> error >>> >>> "Unknown message type [org.apache.activemq.command.ActiveMQMessage]"... >>> >>> I'm not sure what I'm doing wrong. Any assistance will be appreciated >>> very much. >>> >>> Kind Regards, >>> Okello Nelson. >>> >> >> >> >> -- >> Kind Regards, >> Okello Nelson >> +254 722 137 826 >> cn.oke...@gmail.com >> > > > > -- > Kind Regards, > Okello Nelson > +254 722 137 826 > cn.oke...@gmail.com -- Claus Ibsen ----------------- www.camelone.org: The open source integration conference. Red Hat, Inc. FuseSource is now part of Red Hat Email: cib...@redhat.com Web: http://fusesource.com Twitter: davsclaus Blog: http://davsclaus.com Author of Camel in Action: http://www.manning.com/ibsen