Thank You Tim -

Is there an example of an interceptor somewhere in the code base I can use as a 
reference to get started on this?

BTW - I did it working with the Camel Broker Component by switching from 
<recipientList> to <routingSlip>.  I’m not sure why that made a difference, but 
it did.

> On Apr 28, 2016, at 10:53 PM, Tim Bain <tb...@alumni.duke.edu> wrote:
> 
> I think you want to be using interceptors (
> http://activemq.apache.org/interceptors.html), not consuming from a queue
> and then publishing back to the same queue.  I've always believed that
> embedded Camel routes couldn't be inserted into the middle of accepting a
> message (which is what you really want), but interceptors should be able to
> do that.
> 
> Tim
> 
> On Thu, Apr 28, 2016 at 12:49 PM, Quinn Stevenson <
> qu...@pronoia-solutions.com> wrote:
> 
>> I’m trying to use the ActiveMQ Broker Camel Component to add some JMS user
>> properties to messages as they arrive at the broker.  I’m using a wildcard
>> on the from so I can apply the same logic to a set of topics or queues, but
>> I can’t seem to send the message back to the original queue.
>> 
>> Without wildcards, it works fine - something like this:
>> <beans xmlns="http://www.springframework.org/schema/beans";
>>       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>       xsi:schemaLocation="
>>            http://camel.apache.org/schema/spring
>> http://camel.apache.org/schema/spring/camel-spring.xsd
>>            http://www.springframework.org/schema/beans
>> http://www.springframework.org/schema/beans/spring-beans.xsd";>
>> 
>>    <camelContext id="audit-enrichment" xmlns="
>> http://camel.apache.org/schema/spring";>
>>        <route id="in-audit-to-file">
>>            <from uri="broker://queue:in.adt.epic"/>
>>            <setHeader headerName="MyCustomHeader">
>>                <constant>MyHeaderValue</constant>
>>            </setHeader>
>>            <to uri="broker://queue:in.adt.epic" />
>>        </route>
>> 
>>    </camelContext>
>> 
>> </beans>
>> 
>> However when I put in the wildcards, I get and IllegalStateException.  The
>> configuration I’m trying looks like this
>> <beans xmlns="http://www.springframework.org/schema/beans";
>>       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>       xsi:schemaLocation="
>>            http://camel.apache.org/schema/spring
>> http://camel.apache.org/schema/spring/camel-spring.xsd
>>            http://www.springframework.org/schema/beans
>> http://www.springframework.org/schema/beans/spring-beans.xsd";>
>> 
>>    <camelContext id="audit-enrichment" xmlns="
>> http://camel.apache.org/schema/spring";>
>>        <route id="in-audit-to-file">
>>            <from uri="broker://queue:in.adt.*"/>
>>            <setHeader headerName="MyCustomHeader">
>>                <constant>MyHeaderValue</constant>
>>            </setHeader>
>>            <recipientList>
>>                <simple>broker://${header[JMSDestination]}</simple>
>>            </recipientList>
>>        </route>
>> 
>>    </camelContext>
>> 
>> </beans>
>> 
>> And here’s the exception I get
>> 
>> 2016-04-28 12:47:43,721 | ERROR | Failed delivery for (MessageId:
>> ID-macpro-local-53588-1461869253207-0-2 on ExchangeId:
>> ID-macpro-local-53588-1461869253207-0-3). Exhausted after delivery attempt:
>> 1 caught: java.lang.IllegalStateException: Not the original message from
>> the broker Message: Enter some text here for the message body...
>> 
>> Message History
>> 
>> ---------------------------------------------------------------------------------------------------------------------------------------
>> RouteId              ProcessorId          Processor
>>                                                Elapsed (ms)
>> [in-audit-to-file  ] [in-audit-to-file  ] [broker://queue:in.adt.*
>>                                               ] [        23]
>> [in-audit-to-file  ] [setHeader1        ] [setHeader[MyCustomHeader]
>>                                               ] [         3]
>> [in-audit-to-file  ] [recipientList1    ]
>> [recipientList[simple{broker://${header[JMSDestination]}}]
>>   ] [        18]
>> 
>> Exchange
>> 
>> ---------------------------------------------------------------------------------------------------------------------------------------
>> Exchange[
>>        Id                  ID-macpro-local-53588-1461869253207-0-3
>>        ExchangePattern     InOnly
>>        Headers
>> {breadcrumbId=ID:macpro.local-53587-1461869252118-4:1:1:1:1,
>> CamelRedelivered=false, CamelRedeliveryCounter=0, JMSCorrelationID=,
>> JMSCorrelationIDAsBytes=, JMSDeliveryMode=1,
>> JMSDestination=queue://in.adt.epic, JMSExpiration=0,
>> JMSMessageID=ID:macpro.local-53587-1461869252118-4:1:1:1:1, JMSPriority=0,
>> JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1461869263684,
>> JMSType=, JMSXGroupID=null, JMSXUserID=null, MyCustomHeader=MyHeaderValue}
>>        BodyType            String
>>        Body                Enter some text here for the message body...
>> ]
>> 
>> Stacktrace
>> ---------------------------------------------------------------------------------------------------------------------------------------
>> | org.apache.camel.processor.DefaultErrorHandler | ActiveMQ VMTransport:
>> vm://localhost#1
>> java.lang.IllegalStateException: Not the original message from the broker
>> Message: Enter some text here for the message body...
>>        at
>> org.apache.activemq.camel.component.broker.BrokerProducer.checkOriginalMessage(BrokerProducer.java:95)[activemq-camel-5.13.2.jar:5.13.2]
>>        at
>> org.apache.activemq.camel.component.broker.BrokerProducer.getMessage(BrokerProducer.java:73)[activemq-camel-5.13.2.jar:5.13.2]
>>        at
>> org.apache.activemq.camel.component.broker.BrokerProducer.processInOnly(BrokerProducer.java:55)[activemq-camel-5.13.2.jar:5.13.2]
>>        at
>> org.apache.activemq.camel.component.broker.BrokerProducer.process(BrokerProducer.java:43)[activemq-camel-5.13.2.jar:5.13.2]
>>        at
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[camel-core-2.16.2.jar:2.16.2]
>>        at
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2]
>>        at
>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:668)[camel-core-2.16.2.jar:2.16.2]
>>        at
>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:596)[camel-core-2.16.2.jar:2.16.2]
>>        at
>> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:237)[camel-core-2.16.2.jar:2.16.2]
>>        at
>> org.apache.camel.processor.RecipientList.sendToRecipientList(RecipientList.java:178)[camel-core-2.16.2.jar:2.16.2]
>>        at
>> org.apache.camel.processor.RecipientList.process(RecipientList.java:131)[camel-core-2.16.2.jar:2.16.2]
>>        at
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:121)[camel-core-2.16.2.jar:2.16.2]
>>        at
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:83)[camel-core-2.16.2.jar:2.16.2]
>>        at
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)[camel-core-2.16.2.jar:2.16.2]
>>        at
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[camel-core-2.16.2.jar:2.16.2]
>>        at
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2]
>>        at
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:121)[camel-core-2.16.2.jar:2.16.2]
>>        at
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:83)[camel-core-2.16.2.jar:2.16.2]
>>        at
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2]
>>        at
>> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)[camel-core-2.16.2.jar:2.16.2]
>>        at
>> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)[camel-core-2.16.2.jar:2.16.2]
>>        at
>> org.apache.activemq.camel.component.broker.BrokerConsumer.intercept(BrokerConsumer.java:56)[activemq-camel-5.13.2.jar:5.13.2]
>>        at
>> org.apache.activemq.broker.inteceptor.MessageInterceptorFilter.send(MessageInterceptorFilter.java:109)[activemq-broker-5.13.2.jar:5.13.2]
>>        at
>> org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:158)[activemq-broker-5.13.2.jar:5.13.2]
>>        at
>> org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:546)[activemq-broker-5.13.2.jar:5.13.2]
>>        at
>> org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:768)[activemq-client-5.13.2.jar:5.13.2]
>>        at
>> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:338)[activemq-broker-5.13.2.jar:5.13.2]
>>        at
>> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)[activemq-broker-5.13.2.jar:5.13.2]
>>        at
>> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)[activemq-client-5.13.2.jar:5.13.2]
>>        at
>> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)[activemq-client-5.13.2.jar:5.13.2]
>>        at
>> org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:271)[activemq-broker-5.13.2.jar:5.13.2]
>>        at
>> org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:112)[activemq-client-5.13.2.jar:5.13.2]
>>        at
>> org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:42)[activemq-client-5.13.2.jar:5.13.2]
>> 2016-04-28 12:47:43,727 | WARN  | Error processing intercepted message:
>> ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId =
>> ID:macpro.local-53587-1461869252118-4:1:1:1:1, originalDestination = null,
>> originalTransactionId = null, producerId =
>> ID:macpro.local-53587-1461869252118-4:1:1:1, destination =
>> queue://in.adt.epic, transactionId = null, expiration = 0, timestamp =
>> 1461869263684, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
>> correlationId = , replyTo = null, persistent = false, type = , priority =
>> 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed =
>> false, userID = null, content = null, marshalledProperties = null,
>> dataStructure = null, redeliveryCounter = 0, size = 0, properties = null,
>> readOnlyProperties = true, readOnlyBody = true, droppable = false,
>> jmsXGroupFirstForConsumer = false, text = Enter some text here for the
>> message body...}.
>> Exchange[ID-macpro-local-53588-1461869253207-0-1][BrokerJmsMessage[JMSMessageID:
>> ID:macpro.local-53587-1461869252118-4:1:1:1:1]. Caused by:
>> [java.lang.IllegalStateException - Not the original message from the broker
>> Message: Enter some text here for the message body...] |
>> org.apache.activemq.camel.component.broker.BrokerConsumer | ActiveMQ
>> VMTransport: vm://localhost#1
>> 
>> Is there a way to accomplish what I’m after?
>> 
>> 
>> 
>> 

Reply via email to