Any time, Claus.
On Tue, May 28, 2013 at 10:38 AM, Claus Ibsen <claus.ib...@gmail.com> wrote: > Hi > > Thanks for sharing your solution. > > On Tue, May 28, 2013 at 9:30 AM, Okello Nelson <cn.oke...@gmail.com> > wrote: > > My Solution: > > ========== > > > > Guys, I managed to sort out my issue in the following manner: > > > > I had to copy my app jar into the ActiveMQ's lib directory. This was so > > that ActiveMQ can have access to my data transfer objects. > > > > Next, I had to create a custom Aggregator (look at my previous post). In > my > > custom aggregator, all I was doing is append to a list the body of an > > exchange that comes in. This body represented a business object (model > > class). > > > > Thirdly, in my route definition, in addition to having completionSize, I > > also had to include completionTimeout. This was so that when the number > of > > exchanges could not reach my completionSize (when all exchanges have been > > received and there are no more), then the only completion condition that > > could be fulfilled is the completionTimeout. > > > > Lastly, in my bean, the one that does batch inserts into a PostgreSQL > > database was just an ordinary processor, and the exchange contained a > list > > of my objects. Nothing like Exchange.GROUPED_EXCHANGES. Just the > > exchange.getBody(List<MyModelClass.class>). > > > > Kind Regards, > > Okello Nelson. > > > > > > > > > > On Mon, May 27, 2013 at 5:18 PM, Okello Nelson <cn.oke...@gmail.com> > wrote: > > > >> Hi Guys, > >> > >> I've been trying to sort this issue. I've created a custom aggregation > >> strategy instead of relying on the inbuilt "groupExchanges()". My > >> aggregation strategy is shown below: > >> > >> @Service( value = "keAggregationStrategy" ) > >> public class KEAggregationStrategy implements IKEAggregationStrategy { > >> > >> public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { > >> Object newBody = newExchange.getIn().getBody(); > >> ArrayList<Object> list = null; > >> if(oldExchange == null) { > >> list = new ArrayList<Object>(); > >> list.add(newBody); > >> newExchange.getIn().setBody(list); > >> return newExchange; > >> } else { > >> list = oldExchange.getIn().getBody(ArrayList.class); > >> list.add(newBody); > >> return oldExchange; > >> } > >> } > >> > >> } > >> > >> Now, when I run the app, the following exception is thrown by ActiveMQ: > >> FYI: My environment is ActiveMQ 5.8, Camel 2.11.0. I'm not using > embedded > >> ActiveMQ. > >> > >> javax.jms.JMSException: Failed to build body from content. Serializable > class not available to broker. Reason: java.lang.ClassNotFoundException: > com.package.models.bc.ke.MyEntityClass > >> > >> > >> > >> > >> On Mon, May 27, 2013 at 3:08 PM, Okello Nelson <cn.oke...@gmail.com > >wrote: > >> > >>> Hi Guys, > >>> > >>> I have the following Java DSL: > >>> > >>> from(queuePrefix + fileType + "_ValidQ?concurrentConsumers=100") > >>> .routeId(fileType + "_ValidQ-To-DB") > >>> .onException(Exception.class).redeliveryPolicyRef("redeliverypolicy") > >>> .to(queuePrefix + fileType + "_DBLoading_ErrorQ").end() > >>> .aggregate(header("CRBOriginalFileName")) > >>> .completionSize(5000) > >>> .parallelProcessing() > >>> .groupExchanges() > >>> .to("bean:keBouncedChequeLoader"); > >>> > >>> The destination is a bean. The bean is as shown below: > >>> > >>> public void process(Exchange exchange) throws Exception { > >>> List<Exchange> exchanges = > >>> exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class); > >>> Session session = SessionFactoryUtils.getSession(sessionFactory, > true); > >>> Transaction tx = session.beginTransaction(); > >>> for(int i = 0; i < exchanges.size(); i++) { > >>> KEBouncedCheque bc = (KEBouncedCheque) > exchanges.get(i).getIn().getBody(); > >>> session.save(exchanges.get(i)); > >>> FileUtils.writeStringToFile(new File("C:/tmp/bc.txt"), > >>> bc.getClientNumber() + "\n", true); > >>> if( i % 20 == 0) { > >>> session.flush(); > >>> session.clear(); > >>> } > >>> } > >>> tx.commit(); > >>> session.close(); > >>> } > >>> > >>> The message doesn't seem to reach this bean. In ActiveMQ, I'm seeing > the > >>> error > >>> > >>> "Unknown message type [org.apache.activemq.command.ActiveMQMessage]"... > >>> > >>> I'm not sure what I'm doing wrong. Any assistance will be appreciated > >>> very much. > >>> > >>> Kind Regards, > >>> Okello Nelson. > >>> > >> > >> > >> > >> -- > >> Kind Regards, > >> Okello Nelson > >> +254 722 137 826 > >> cn.oke...@gmail.com > >> > > > > > > > > -- > > Kind Regards, > > Okello Nelson > > +254 722 137 826 > > cn.oke...@gmail.com > > > > -- > Claus Ibsen > ----------------- > www.camelone.org: The open source integration conference. > > Red Hat, Inc. > FuseSource is now part of Red Hat > Email: cib...@redhat.com > Web: http://fusesource.com > Twitter: davsclaus > Blog: http://davsclaus.com > Author of Camel in Action: http://www.manning.com/ibsen > -- Kind Regards, Okello Nelson +254 722 137 826 cn.oke...@gmail.com