Hi

You create the ActiveMQ CF and set all its options as you like, also
the AMQ specific option about retain, and then configure the activemq
camel component to use your CF

On Fri, Feb 7, 2020 at 1:59 PM unknown <mtmmtm999...@gmail.com> wrote:
>
> How can i get retained-messages using activemq and camel ?
> I must set the flag setUseRetroactiveConsumer on the
> activemqconnectionfactory.
> This works, but there must be a better way.
>
> best regards, mtmmtm
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.camel.component.ActiveMQComponent;
> import org.apache.activemq.pool.PooledConnectionFactory;
> import org.apache.camel.*;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.jms.JmsConfiguration;
> import org.apache.camel.component.jms.JmsEndpoint;
> import org.apache.camel.impl.DefaultCamelContext;
>
> public class SendMQTTTest {
>     public void configure() throws Exception {
>         String mqtt =
> "activemq:topic:test?brokerURL=tcp://localhost:61610&allowAdditionalHeaders=true";
>         CamelContext camelContext = new DefaultCamelContext();
>         addMqtt(mqtt, camelContext);
>         camelContext.start();
>         enableRetain(camelContext);
>         camelContext.startRoute("mqtt");
>     }
>
>     private void addMqtt(String mqtt, CamelContext camelContext) throws
> Exception {
>         Processor print = exchange -> {
>             Message camelMessage = exchange.getIn();
>             Object retain_obj = camelMessage.getHeader("ActiveMQ.Retained");
>             boolean retain = retain_obj != null ? (Boolean) retain_obj :
> false;
>             System.out.println("retain=" + retain);
>         };
>         camelContext.addRoutes(new RouteBuilder() {
>             public void configure() {
>
> from(mqtt).routeId("mqtt").autoStartup(false).process(print);
>             }
>         });
>     }
>
>     private void enableRetain(CamelContext context) {
>         for (Route r : context.getRoutes()) {
>             Consumer c = r.getConsumer();
>             if (c.getEndpoint() instanceof JmsEndpoint) {
>                 JmsEndpoint ep = (JmsEndpoint) c.getEndpoint();
>                 JmsConfiguration cfg = ep.getConfiguration();
>                 if (cfg.getConnectionFactory() instanceof
> PooledConnectionFactory) {
>                     PooledConnectionFactory fact =
> (PooledConnectionFactory) cfg.getConnectionFactory();
>                     ActiveMQConnectionFactory a =
> (ActiveMQConnectionFactory) fact.getConnectionFactory();
>                     a.setUseRetroactiveConsumer(true);
>                 }
>             }
>         }
>     }
>
>     public static void main(String... args) throws Exception {
>         SendMQTTTest main = new SendMQTTTest();
>         main.configure();
>         Thread.sleep(10000000);
>     }
> }



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Reply via email to