Hello Claus and Thanks for your answer.

Regarding the versions, my ActiveMQ log shows ActiveMQ 5.10.0 and Camel
2.13.1.

I have tried your suggestion (<convertBodyTo type="String"/> before the
<aggregate>). 
It generates an exception when the first aggregated message is attempted to
be sent on the QOUT queue (no aggregated message is sent out at all).
The exception is: "java.lang.IllegalStateException: Not the original message
from the broker Message: A+B+C"

Do you know what this exception means ?
Many Thanks for your help!

Best Regards,
Hedi





LOG EXTRACT -----

 INFO | ID-ncepspdev29-nce-amadeus-net-40798-1424957923187-0-1 >>>
(InvalAgregation) from(broker://queue:QIN) --> convertBodyTo[String] <<<
Pattern:InOnly, Headers:{JMSXGroupID=null,
JMSMessageID=ID:ncepspdev29.nce.amadeus.net-40799-1424957924825-3:1:1:1:1,
JMSDestination=queue://QIN, JMSTimestamp=1424957963591, JMSExpiration=0,
JMSReplyTo=null,
breadcrumbId=ID:ncepspdev29.nce.amadeus.net-40799-1424957924825-3:1:1:1:1,
JMSPriority=0, JMSCorrelationID=, JMSRedelivered=false, JMSDeliveryMode=1,
JMSType=, JMSXUserID=null}, BodyType:String, Body:A
 INFO | ID-ncepspdev29-nce-amadeus-net-40798-1424957923187-0-1 >>>
(InvalAgregation) convertBodyTo[String] --> aggregate[true] <<<
Pattern:InOnly, Headers:{JMSXGroupID=null, JMSCorrelationID=,
JMSMessageID=ID:ncepspdev29.nce.amadeus.net-40799-1424957924825-3:1:1:1:1,
JMSXUserID=null, JMSTimestamp=1424957963591, JMSExpiration=0, JMSPriority=0,
JMSType=, JMSRedelivered=false, JMSDestination=queue://QIN,
JMSDeliveryMode=1, JMSReplyTo=null,
breadcrumbId=ID:ncepspdev29.nce.amadeus.net-40799-1424957924825-3:1:1:1:1},
BodyType:String, Body:A
 INFO | ID-ncepspdev29-nce-amadeus-net-40798-1424957923187-0-2 >>>
(InvalAgregation) from(broker://queue:QIN) --> convertBodyTo[String] <<<
Pattern:InOnly, Headers:{JMSTimestamp=1424957995869, JMSPriority=0,
JMSType=,
JMSMessageID=ID:ncepspdev29.nce.amadeus.net-40799-1424957924825-3:1:1:1:2,
JMSDeliveryMode=1, JMSRedelivered=false, JMSExpiration=0,
breadcrumbId=ID:ncepspdev29.nce.amadeus.net-40799-1424957924825-3:1:1:1:2,
JMSDestination=queue://QIN, JMSCorrelationID=, JMSXGroupID=null,
JMSReplyTo=null, JMSXUserID=null}, BodyType:String, Body:B
 INFO | ID-ncepspdev29-nce-amadeus-net-40798-1424957923187-0-2 >>>
(InvalAgregation) convertBodyTo[String] --> aggregate[true] <<<
Pattern:InOnly,
Headers:{JMSMessageID=ID:ncepspdev29.nce.amadeus.net-40799-1424957924825-3:1:1:1:2,
JMSDeliveryMode=1, JMSReplyTo=null, JMSTimestamp=1424957995869, JMSType=,
JMSPriority=0, JMSCorrelationID=,
breadcrumbId=ID:ncepspdev29.nce.amadeus.net-40799-1424957924825-3:1:1:1:2,
JMSDestination=queue://QIN, JMSRedelivered=false, JMSXGroupID=null,
JMSXUserID=null, JMSExpiration=0}, BodyType:String, Body:B
 INFO | ID-ncepspdev29-nce-amadeus-net-40798-1424957923187-0-3 >>>
(InvalAgregation) from(broker://queue:QIN) --> convertBodyTo[String] <<<
Pattern:InOnly, Headers:{JMSCorrelationID=, JMSExpiration=0,
breadcrumbId=ID:ncepspdev29.nce.amadeus.net-40799-1424957924825-3:1:1:1:3,
JMSXGroupID=null, JMSType=, JMSDeliveryMode=1, JMSXUserID=null,
JMSRedelivered=false,
JMSMessageID=ID:ncepspdev29.nce.amadeus.net-40799-1424957924825-3:1:1:1:3,
JMSDestination=queue://QIN, JMSPriority=0, JMSTimestamp=1424958003518,
JMSReplyTo=null}, BodyType:String, Body:C
 INFO | ID-ncepspdev29-nce-amadeus-net-40798-1424957923187-0-3 >>>
(InvalAgregation) convertBodyTo[String] --> aggregate[true] <<<
Pattern:InOnly, Headers:{JMSCorrelationID=, JMSXUserID=null, JMSType=,
JMSTimestamp=1424958003518, JMSXGroupID=null, JMSReplyTo=null,
JMSExpiration=0, JMSRedelivered=false,
JMSMessageID=ID:ncepspdev29.nce.amadeus.net-40799-1424957924825-3:1:1:1:3,
JMSDestination=queue://QIN, JMSPriority=0,
breadcrumbId=ID:ncepspdev29.nce.amadeus.net-40799-1424957924825-3:1:1:1:3,
JMSDeliveryMode=1}, BodyType:String, Body:C
 INFO | ID-ncepspdev29-nce-amadeus-net-40798-1424957923187-0-4 >>>
(InvalAgregation) aggregate[true] --> broker://queue:QOUT <<<
Pattern:InOnly,
Headers:{breadcrumbId=ID:ncepspdev29.nce.amadeus.net-40799-1424957924825-3:1:1:1:1,
JMSDestination=queue://QIN, JMSXUserID=null, JMSRedelivered=false,
JMSDeliveryMode=1, JMSCorrelationID=, JMSTimestamp=1424957963591,
JMSReplyTo=null, JMSXGroupID=null, JMSExpiration=0, JMSPriority=0, JMSType=,
JMSMessageID=ID:ncepspdev29.nce.amadeus.net-40799-1424957924825-3:1:1:1:1},
BodyType:String, Body:A+B+C
ERROR | Failed delivery for (MessageId:
ID:ncepspdev29.nce.amadeus.net-40799-1424957924825-3:1:1:1:1 on ExchangeId:
ID-ncepspdev29-nce-amadeus-net-40798-1424957923187-0-4). Exhausted after
delivery attempt: 1 caught: java.lang.IllegalStateException: Not the
original message from the broker Message: A+B+C

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                             
                                          
Elapsed (ms)
[InvalAgregation   ] [InvalAgregation   ] [broker://queue:QIN                   
                                        
] [     39921]
[InvalAgregation   ] [convertBodyTo1    ] [convertBodyTo[String]                
                                        
] [         9]
[InvalAgregation   ] [aggregate1        ] [aggregate[true]                      
                                        
] [         2]
[InvalAgregation   ] [to1               ] [broker:queue:QOUT                    
                                        
] [         5]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
        Id                 
ID-ncepspdev29-nce-amadeus-net-40798-1424957923187-0-4
        ExchangePattern     InOnly
        Headers            
{breadcrumbId=ID:ncepspdev29.nce.amadeus.net-40799-1424957924825-3:1:1:1:1,
CamelRedelivered=false, CamelRedeliveryCounter=0, JMSCorrelationID=,
JMSDeliveryMode=1, JMSDestination=queue://QIN, JMSExpiration=0,
JMSMessageID=ID:ncepspdev29.nce.amadeus.net-40799-1424957924825-3:1:1:1:1,
JMSPriority=0, JMSRedelivered=false, JMSReplyTo=null,
JMSTimestamp=1424957963591, JMSType=, JMSXGroupID=null, JMSXUserID=null}
        BodyType            String
        Body                A+B+C
]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.IllegalStateException: Not the original message from the broker
Message: A+B+C
        at
org.apache.activemq.camel.component.broker.BrokerProducer.getMessage(BrokerProducer.java:96)[activemq-camel-5.10.0.jar:5.10.0]
        at
org.apache.activemq.camel.component.broker.BrokerProducer.processInOnly(BrokerProducer.java:55)[activemq-camel-5.10.0.jar:5.10.0]
        at
org.apache.activemq.camel.component.broker.BrokerProducer.process(BrokerProducer.java:43)[activemq-camel-5.10.0.jar:5.10.0]
        at
org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:143)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:307)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:138)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:163)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:105)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:548)[camel-core-2.13.1.jar:2.13.1]
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_18]
        at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_18]
        at
java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_18]
        at
org.apache.camel.util.concurrent.SynchronousExecutorService.execute(SynchronousExecutorService.java:62)[camel-core-2.13.1.jar:2.13.1]
        at
java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:78)[:1.6.0_18]
        at
org.apache.camel.processor.aggregate.AggregateProcessor.onSubmitCompletion(AggregateProcessor.java:540)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.processor.aggregate.AggregateProcessor.doProcess(AggregateProcessor.java:262)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.processor.aggregate.AggregateProcessor.process(AggregateProcessor.java:179)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:163)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:105)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)[camel-core-2.13.1.jar:2.13.1]
        at
org.apache.activemq.camel.component.broker.BrokerConsumer.intercept(BrokerConsumer.java:56)[activemq-camel-5.10.0.jar:5.10.0]
        at
org.apache.activemq.broker.inteceptor.MessageInterceptorFilter.send(MessageInterceptorFilter.java:109)[activemq-broker-5.10.0.jar:5.10.0]
        at
org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:152)[activemq-broker-5.10.0.jar:5.10.0]
        at
org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:496)[activemq-broker-5.10.0.jar:5.10.0]

----- END



--
View this message in context: 
http://camel.465427.n5.nabble.com/Aggregator-output-not-aggregated-tp5763177p5763229.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to