My Solution:
==========

Guys, I managed to sort out my issue in the following manner:

I had to copy my app jar into the ActiveMQ's lib directory. This was so
that ActiveMQ can have access to my data transfer objects.

Next, I had to create a custom Aggregator (look at my previous post). In my
custom aggregator, all I was doing is append to  a list the body of an
exchange that comes in. This body represented a business object  (model
class).

Thirdly, in my route definition, in addition to having completionSize, I
also had to include completionTimeout. This was so that when the number of
exchanges could not reach my completionSize (when all exchanges have been
received and there are no more), then the only completion condition that
could be fulfilled is the completionTimeout.

Lastly, in my bean, the one that does batch inserts into a PostgreSQL
database was just an ordinary processor, and the exchange contained a list
of my objects. Nothing like Exchange.GROUPED_EXCHANGES. Just the
exchange.getBody(List<MyModelClass.class>).

Kind Regards,
Okello Nelson.




On Mon, May 27, 2013 at 5:18 PM, Okello Nelson <cn.oke...@gmail.com> wrote:

> Hi Guys,
>
> I've been trying to sort this issue. I've created a custom aggregation
> strategy instead of relying on the inbuilt "groupExchanges()". My
> aggregation strategy is shown below:
>
> @Service( value = "keAggregationStrategy" )
> public class KEAggregationStrategy implements IKEAggregationStrategy {
>
> public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
>  Object newBody = newExchange.getIn().getBody();
> ArrayList<Object> list = null;
>  if(oldExchange == null) {
> list = new ArrayList<Object>();
> list.add(newBody);
>  newExchange.getIn().setBody(list);
> return newExchange;
>  } else {
> list = oldExchange.getIn().getBody(ArrayList.class);
> list.add(newBody);
>  return oldExchange;
> }
>  }
>
> }
>
> Now, when I run the app, the following exception is thrown by ActiveMQ:
> FYI: My environment is ActiveMQ 5.8, Camel 2.11.0. I'm not using embedded
> ActiveMQ.
>
> javax.jms.JMSException: Failed to build body from content. Serializable class 
> not available to broker. Reason: java.lang.ClassNotFoundException: 
> com.package.models.bc.ke.MyEntityClass
>
>
>
>
> On Mon, May 27, 2013 at 3:08 PM, Okello Nelson <cn.oke...@gmail.com>wrote:
>
>> Hi Guys,
>>
>> I have the following Java DSL:
>>
>> from(queuePrefix + fileType + "_ValidQ?concurrentConsumers=100")
>>  .routeId(fileType + "_ValidQ-To-DB")
>>  .onException(Exception.class).redeliveryPolicyRef("redeliverypolicy")
>> .to(queuePrefix + fileType + "_DBLoading_ErrorQ").end()
>>  .aggregate(header("CRBOriginalFileName"))
>> .completionSize(5000)
>> .parallelProcessing()
>>  .groupExchanges()
>> .to("bean:keBouncedChequeLoader");
>>
>> The destination is a bean. The bean is as shown below:
>>
>> public void process(Exchange exchange) throws Exception {
>> List<Exchange> exchanges =
>> exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
>>  Session session = SessionFactoryUtils.getSession(sessionFactory, true);
>> Transaction tx = session.beginTransaction();
>>  for(int i = 0; i < exchanges.size(); i++) {
>> KEBouncedCheque bc = (KEBouncedCheque) exchanges.get(i).getIn().getBody();
>>  session.save(exchanges.get(i));
>> FileUtils.writeStringToFile(new File("C:/tmp/bc.txt"),
>> bc.getClientNumber() + "\n", true);
>>  if( i % 20 == 0) {
>> session.flush();
>> session.clear();
>>  }
>> }
>>  tx.commit();
>>  session.close();
>>  }
>>
>> The message doesn't seem to reach this bean. In ActiveMQ, I'm seeing the
>> error
>>
>> "Unknown message type [org.apache.activemq.command.ActiveMQMessage]"...
>>
>> I'm not sure what I'm doing wrong. Any assistance will be appreciated
>> very much.
>>
>> Kind Regards,
>> Okello Nelson.
>>
>
>
>
> --
> Kind Regards,
> Okello Nelson
> +254 722 137 826
> cn.oke...@gmail.com
>



-- 
Kind Regards,
Okello Nelson
+254 722 137 826
cn.oke...@gmail.com

Reply via email to