Any time, Claus.

On Tue, May 28, 2013 at 10:38 AM, Claus Ibsen <claus.ib...@gmail.com> wrote:

> Hi
>
> Thanks for sharing your solution.
>
> On Tue, May 28, 2013 at 9:30 AM, Okello Nelson <cn.oke...@gmail.com>
> wrote:
> > My Solution:
> > ==========
> >
> > Guys, I managed to sort out my issue in the following manner:
> >
> > I had to copy my app jar into the ActiveMQ's lib directory. This was so
> > that ActiveMQ can have access to my data transfer objects.
> >
> > Next, I had to create a custom Aggregator (look at my previous post). In
> my
> > custom aggregator, all I was doing is append to  a list the body of an
> > exchange that comes in. This body represented a business object  (model
> > class).
> >
> > Thirdly, in my route definition, in addition to having completionSize, I
> > also had to include completionTimeout. This was so that when the number
> of
> > exchanges could not reach my completionSize (when all exchanges have been
> > received and there are no more), then the only completion condition that
> > could be fulfilled is the completionTimeout.
> >
> > Lastly, in my bean, the one that does batch inserts into a PostgreSQL
> > database was just an ordinary processor, and the exchange contained a
> list
> > of my objects. Nothing like Exchange.GROUPED_EXCHANGES. Just the
> > exchange.getBody(List<MyModelClass.class>).
> >
> > Kind Regards,
> > Okello Nelson.
> >
> >
> >
> >
> > On Mon, May 27, 2013 at 5:18 PM, Okello Nelson <cn.oke...@gmail.com>
> wrote:
> >
> >> Hi Guys,
> >>
> >> I've been trying to sort this issue. I've created a custom aggregation
> >> strategy instead of relying on the inbuilt "groupExchanges()". My
> >> aggregation strategy is shown below:
> >>
> >> @Service( value = "keAggregationStrategy" )
> >> public class KEAggregationStrategy implements IKEAggregationStrategy {
> >>
> >> public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
> >>  Object newBody = newExchange.getIn().getBody();
> >> ArrayList<Object> list = null;
> >>  if(oldExchange == null) {
> >> list = new ArrayList<Object>();
> >> list.add(newBody);
> >>  newExchange.getIn().setBody(list);
> >> return newExchange;
> >>  } else {
> >> list = oldExchange.getIn().getBody(ArrayList.class);
> >> list.add(newBody);
> >>  return oldExchange;
> >> }
> >>  }
> >>
> >> }
> >>
> >> Now, when I run the app, the following exception is thrown by ActiveMQ:
> >> FYI: My environment is ActiveMQ 5.8, Camel 2.11.0. I'm not using
> embedded
> >> ActiveMQ.
> >>
> >> javax.jms.JMSException: Failed to build body from content. Serializable
> class not available to broker. Reason: java.lang.ClassNotFoundException:
> com.package.models.bc.ke.MyEntityClass
> >>
> >>
> >>
> >>
> >> On Mon, May 27, 2013 at 3:08 PM, Okello Nelson <cn.oke...@gmail.com
> >wrote:
> >>
> >>> Hi Guys,
> >>>
> >>> I have the following Java DSL:
> >>>
> >>> from(queuePrefix + fileType + "_ValidQ?concurrentConsumers=100")
> >>>  .routeId(fileType + "_ValidQ-To-DB")
> >>>  .onException(Exception.class).redeliveryPolicyRef("redeliverypolicy")
> >>> .to(queuePrefix + fileType + "_DBLoading_ErrorQ").end()
> >>>  .aggregate(header("CRBOriginalFileName"))
> >>> .completionSize(5000)
> >>> .parallelProcessing()
> >>>  .groupExchanges()
> >>> .to("bean:keBouncedChequeLoader");
> >>>
> >>> The destination is a bean. The bean is as shown below:
> >>>
> >>> public void process(Exchange exchange) throws Exception {
> >>> List<Exchange> exchanges =
> >>> exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
> >>>  Session session = SessionFactoryUtils.getSession(sessionFactory,
> true);
> >>> Transaction tx = session.beginTransaction();
> >>>  for(int i = 0; i < exchanges.size(); i++) {
> >>> KEBouncedCheque bc = (KEBouncedCheque)
> exchanges.get(i).getIn().getBody();
> >>>  session.save(exchanges.get(i));
> >>> FileUtils.writeStringToFile(new File("C:/tmp/bc.txt"),
> >>> bc.getClientNumber() + "\n", true);
> >>>  if( i % 20 == 0) {
> >>> session.flush();
> >>> session.clear();
> >>>  }
> >>> }
> >>>  tx.commit();
> >>>  session.close();
> >>>  }
> >>>
> >>> The message doesn't seem to reach this bean. In ActiveMQ, I'm seeing
> the
> >>> error
> >>>
> >>> "Unknown message type [org.apache.activemq.command.ActiveMQMessage]"...
> >>>
> >>> I'm not sure what I'm doing wrong. Any assistance will be appreciated
> >>> very much.
> >>>
> >>> Kind Regards,
> >>> Okello Nelson.
> >>>
> >>
> >>
> >>
> >> --
> >> Kind Regards,
> >> Okello Nelson
> >> +254 722 137 826
> >> cn.oke...@gmail.com
> >>
> >
> >
> >
> > --
> > Kind Regards,
> > Okello Nelson
> > +254 722 137 826
> > cn.oke...@gmail.com
>
>
>
> --
> Claus Ibsen
> -----------------
> www.camelone.org: The open source integration conference.
>
> Red Hat, Inc.
> FuseSource is now part of Red Hat
> Email: cib...@redhat.com
> Web: http://fusesource.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
>



-- 
Kind Regards,
Okello Nelson
+254 722 137 826
cn.oke...@gmail.com

Reply via email to