Hi

Thanks for sharing your solution.

On Tue, May 28, 2013 at 9:30 AM, Okello Nelson <cn.oke...@gmail.com> wrote:
> My Solution:
> ==========
>
> Guys, I managed to sort out my issue in the following manner:
>
> I had to copy my app jar into the ActiveMQ's lib directory. This was so
> that ActiveMQ can have access to my data transfer objects.
>
> Next, I had to create a custom Aggregator (look at my previous post). In my
> custom aggregator, all I was doing is append to  a list the body of an
> exchange that comes in. This body represented a business object  (model
> class).
>
> Thirdly, in my route definition, in addition to having completionSize, I
> also had to include completionTimeout. This was so that when the number of
> exchanges could not reach my completionSize (when all exchanges have been
> received and there are no more), then the only completion condition that
> could be fulfilled is the completionTimeout.
>
> Lastly, in my bean, the one that does batch inserts into a PostgreSQL
> database was just an ordinary processor, and the exchange contained a list
> of my objects. Nothing like Exchange.GROUPED_EXCHANGES. Just the
> exchange.getBody(List<MyModelClass.class>).
>
> Kind Regards,
> Okello Nelson.
>
>
>
>
> On Mon, May 27, 2013 at 5:18 PM, Okello Nelson <cn.oke...@gmail.com> wrote:
>
>> Hi Guys,
>>
>> I've been trying to sort this issue. I've created a custom aggregation
>> strategy instead of relying on the inbuilt "groupExchanges()". My
>> aggregation strategy is shown below:
>>
>> @Service( value = "keAggregationStrategy" )
>> public class KEAggregationStrategy implements IKEAggregationStrategy {
>>
>> public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
>>  Object newBody = newExchange.getIn().getBody();
>> ArrayList<Object> list = null;
>>  if(oldExchange == null) {
>> list = new ArrayList<Object>();
>> list.add(newBody);
>>  newExchange.getIn().setBody(list);
>> return newExchange;
>>  } else {
>> list = oldExchange.getIn().getBody(ArrayList.class);
>> list.add(newBody);
>>  return oldExchange;
>> }
>>  }
>>
>> }
>>
>> Now, when I run the app, the following exception is thrown by ActiveMQ:
>> FYI: My environment is ActiveMQ 5.8, Camel 2.11.0. I'm not using embedded
>> ActiveMQ.
>>
>> javax.jms.JMSException: Failed to build body from content. Serializable 
>> class not available to broker. Reason: java.lang.ClassNotFoundException: 
>> com.package.models.bc.ke.MyEntityClass
>>
>>
>>
>>
>> On Mon, May 27, 2013 at 3:08 PM, Okello Nelson <cn.oke...@gmail.com>wrote:
>>
>>> Hi Guys,
>>>
>>> I have the following Java DSL:
>>>
>>> from(queuePrefix + fileType + "_ValidQ?concurrentConsumers=100")
>>>  .routeId(fileType + "_ValidQ-To-DB")
>>>  .onException(Exception.class).redeliveryPolicyRef("redeliverypolicy")
>>> .to(queuePrefix + fileType + "_DBLoading_ErrorQ").end()
>>>  .aggregate(header("CRBOriginalFileName"))
>>> .completionSize(5000)
>>> .parallelProcessing()
>>>  .groupExchanges()
>>> .to("bean:keBouncedChequeLoader");
>>>
>>> The destination is a bean. The bean is as shown below:
>>>
>>> public void process(Exchange exchange) throws Exception {
>>> List<Exchange> exchanges =
>>> exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
>>>  Session session = SessionFactoryUtils.getSession(sessionFactory, true);
>>> Transaction tx = session.beginTransaction();
>>>  for(int i = 0; i < exchanges.size(); i++) {
>>> KEBouncedCheque bc = (KEBouncedCheque) exchanges.get(i).getIn().getBody();
>>>  session.save(exchanges.get(i));
>>> FileUtils.writeStringToFile(new File("C:/tmp/bc.txt"),
>>> bc.getClientNumber() + "\n", true);
>>>  if( i % 20 == 0) {
>>> session.flush();
>>> session.clear();
>>>  }
>>> }
>>>  tx.commit();
>>>  session.close();
>>>  }
>>>
>>> The message doesn't seem to reach this bean. In ActiveMQ, I'm seeing the
>>> error
>>>
>>> "Unknown message type [org.apache.activemq.command.ActiveMQMessage]"...
>>>
>>> I'm not sure what I'm doing wrong. Any assistance will be appreciated
>>> very much.
>>>
>>> Kind Regards,
>>> Okello Nelson.
>>>
>>
>>
>>
>> --
>> Kind Regards,
>> Okello Nelson
>> +254 722 137 826
>> cn.oke...@gmail.com
>>
>
>
>
> --
> Kind Regards,
> Okello Nelson
> +254 722 137 826
> cn.oke...@gmail.com



-- 
Claus Ibsen
-----------------
www.camelone.org: The open source integration conference.

Red Hat, Inc.
FuseSource is now part of Red Hat
Email: cib...@redhat.com
Web: http://fusesource.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Reply via email to