You can force durable subscriptions by setting a flag.  Looks like this may
not have made it into the documentation.

     <networkConnectors>

         <networkConnector
name="edf-to-thirdpartybroker-network-connector"
useVirtualDestSubs="true"
duplex="true"

                                                uri="static:(tcp://
127.0.0.1:61616)" userName="omitted" password="omitted" >

           <staticallyIncludedDestinations>

               <topic physicalName="include.test.bar?forceDurable=true"/>

           </staticallyIncludedDestinations>


        </networkConnector>

    </networkConnectors>

On Mon, Nov 11, 2019 at 8:33 AM Tim Bain <tb...@alumni.duke.edu> wrote:

> OK, thanks for sharing what you did.
>
> Tim
>
> On Mon, Nov 11, 2019, 2:05 AM Cannock, Richard <
> richard.cann...@edfenergy.com> wrote:
>
> > HI Tim
> >
> > Thanks for the advice. In the end, I got the network connector 'promoted'
> > to a durable topic subscriber by adding a temporary Topic to the forward
> > declaration like this:
> >
> > <destinationInterceptors>
> >         <virtualDestinationInterceptor>
> >             <virtualDestinations>
> >                 <compositeTopic forwardOnly="true" name="myTopic">
> >                     <forwardTo>
> >                         <queue physicalName="myqyeue"/>
> >                         <topic physicalName="VirtualTopic.myTopic"/>
> >                     </forwardTo>
> >                 </compositeTopic>
> >             </virtualDestinations>
> >         </virtualDestinationInterceptor>
> >     </destinationInterceptors>
> >
> > Before attaching a durable scubscriber to this topic. Once restarted the
> > broker made the network connector durable.
> >
> > I then removed the   <topic physicalName="VirtualTopic.myTopic"/> and
> > restarted the broker again. The network connector remained durable and
> > messages remained to be sent to the queue even when the broker was
> offline.
> >
> > This is clearly a workaround, so a bit nervous of it's reliability, so
> > clearly an enhancement request to make this a duranble subscription might
> > be best.
> >
> > Thanks
> >
> > Richard
> >
> > -----Original Message-----
> > From: Tim Bain <tb...@alumni.duke.edu>
> > Sent: 08 November 2019 07:00
> > To: ActiveMQ Users <users@activemq.apache.org>
> > Subject: Re: compositeTopic and lost messages - broker restart
> >
> > Durable topic subscriptions are dangerous things, because if the consumer
> > disappears without removing the subscription, the broker will keep the
> > messages for it forever, eventually running the broker out of memory. A
> > durable topic subscription between brokers (for implementing a composite
> > destination) is doubly so, since the subscription isn't held by a
> > traditional consumer who might give assurances of cleaning things up
> before
> > tearing itself down. So it's no surprise to me that a non-durable
> > subscription is the default behavior, since that's the safer behavior.
> >
> > However, I think maybe your real question isn't why this isn't the
> > default, but why there isn't an option to specify the use of a durable
> > subscription if you so choose. Your use case seems like a valid one for
> > that particular feature, so if you submit an enhancement request, perhaps
> > it could be added in a future version.
> >
> > In the meantime, could you make the composite topic in your broker use a
> > different name than the remote topic (call it *myCompositeTopic*), and
> then
> > have an embedded Camel route that uses a durable subscription to consume
> > from *myCompositeTopic* and publishes the message to *sourceTopic*? Then
> > the network of brokers can ensure that there's a durable subscription on
> > the remote broker, and you'll still get the composite destination on your
> > local broker. This has the additional benefit of not having *sourceTopic*
> > be a composite destination on your local broker while being a
> non-composite
> > destination on the remote broker, which is a pretty wonky configuration
> > anyway.
> >
> > Tim
> >
> > On Fri, Oct 18, 2019 at 2:05 AM Cannock, Richard <
> > richard.cann...@edfenergy.com> wrote:
> >
> > > HI,
> > >
> > > Further investigations indicate that the resulting network connector
> > > establishes only a non durable topic subscription to the sourceTopic
> > > via the CompositeTopic, which is why we do not get any messages queued
> > > up at source and subsequently delivered received post the networked
> > > broker restart.
> > >
> > > However, this does not explain why this is only a non durable topic
> > > subscription, as I believe this should be a durable topic subscription
> > > by default.
> > >
> > > I have tried this with various ActiveMQ versions upto 5.15.9 and the
> > > behaviour is consistent.
> > >
> > > Any advice?
> > >
> > > Thanks
> > >
> > > Richard
> > >
> > > -----Original Message-----
> > > From: Cannock, Richard <richard.cann...@edfenergy.com>
> > > Sent: 16 October 2019 19:36
> > > To: users@activemq.apache.org
> > > Subject: RE: compositeTopic and lost messages - broker restart
> > >
> > > Apologies, mail server issue meant I wasn't sure that the original
> > > message had gotten through. Obviously it had!
> > >
> > > These are the same issue so only looking for advice on one of them,
> > > thanks.
> > >
> > > _______
> > > From: Justin Bertram [jbert...@apache.org]
> > > Sent: 16 October 2019 17:09
> > > To: users@activemq.apache.org
> > > Subject: Re: compositeTopic and lost messages - broker restart
> > >
> > > Richard, didn't you just send this same email to the ActiveMQ user
> > > list (although with a slightly different subject)? Just want to
> > > clarify if they are the same or different issues.
> > >
> > >
> > > Justin
> > >
> > > On Wed, Oct 16, 2019 at 10:40 AM Cannock, Richard <
> > > richard.cann...@edfenergy.com> wrote:
> > >
> > > > Hi,
> > > >
> > > >
> > > >
> > > > We are trying to enlist in a network of brokers arrangement with a
> > > > third party broker hosted externally to us. We wish to receive
> > > > messages from a topic (called *sourceTopic*) and forward to internal
> > > > queues on our local broker for load balancing and throughput
> > > > purposes using virtual destinations, a compositeTopic forwarding to
> > > > two local queues on our local broker.
> > > >
> > > >
> > > >
> > > > In order to avoid implementing ingress firewall rules for the third
> > > > party onto our estate, we have created a duplex network connector so
> > > > that we initiate the outbound tcp connection. We have excluded our
> > > > local broker queues from propagating to the other broker using the
> > > > <excludedDestinations> configuration element.
> > > >
> > > >
> > > >
> > > > Our configuration below works fine, as long as our local broker is
> > > > up, and we have an active consumer. In this case, messages published
> > > > to the third party broker *sourceTopic* are successfully received on
> > > > our composite topic forwarded queues, test1 and test2.  To avoid any
> > > > confusion, the source message producer publishing to *sourceTopic
> > > > *is explicitly using PersistentDelivery (even though I believe that
> > > > is the
> > > default anyway).
> > > >
> > > >
> > > >
> > > > If we restart our broker, any messages that were published to the
> > > > *sourceTopic* in the intervening period that our broker was down,
> > > > are not received onto local forwarded queues test and test2.
> > > >
> > > >
> > > >
> > > > It would appear that the network connector configuration and\or
> > > > compositeTopic do not result in a durable topic subscription to the
> > > > *sourceTopic* on the other broker.
> > > >
> > > >
> > > >
> > > > I can also successfully recreate this behaviour with two local
> > > > instances of Active MQ on my development machine running side by
> > > > side with broker B duplex connected to broker A, so it doesn't
> > > > appear to be an issue with the third party broker either.
> > > >
> > > >
> > > >
> > > > Our configuration (sensitive data removed) is as below:
> > > >
> > > >
> > > >
> > > > Any ideas on how we can resolve this?
> > > >
> > > >
> > > >
> > > > Many thanks
> > > >
> > > >
> > > > Richard
> > > >
> > > >
> > > >
> > > > <beans
> > > >
> > > >   xmlns="http://www.springframework.org/schema/beans";
> > > >
> > > >   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
> > > >
> > > >   xsi:schemaLocation="http://www.springframework.org/schema/beans
> > > > http://www.springframework.org/schema/beans/spring-beans.xsd
> > > >
> > > >   http://activemq.apache.org/schema/core
> > > > http://activemq.apache.org/schema/core/activemq-core.xsd";>
> > > >
> > > >
> > > >
> > > >     <!-- Allows us to use system properties as variables in this
> > > > configuration file -->
> > > >
> > > >     <bean
> > > > class="org.springframework.beans.factory.config.PropertyPlaceholderC
> > > > on
> > > > figurer">
> > > >
> > > >         <property name="locations">
> > > >
> > > >
> > > > <value>file:${activemq.conf}/credentials.properties</value>
> > > >
> > > >         </property>
> > > >
> > > >     </bean>
> > > >
> > > >
> > > >
> > > >    <!-- Allows accessing the server log -->
> > > >
> > > >     <bean id="logQuery"
> > > class="io.fabric8.insight.log.log4j.Log4jLogQuery"
> > > >
> > > >           lazy-init="false" scope="singleton"
> > > >
> > > >           init-method="start" destroy-method="stop">
> > > >
> > > >     </bean>
> > > >
> > > >
> > > >
> > > >     <!--
> > > >
> > > >         The <broker> element is used to configure the ActiveMQ
> broker.
> > > >
> > > >     -->
> > > >
> > > >     <broker xmlns="http://activemq.apache.org/schema/core";
> > > > brokerName="localhost" dataDirectory="${activemq.data}"
> > > > useVirtualDestSubs="true">
> > > >
> > > >
> > > >
> > > >         <destinationPolicy>
> > > >
> > > >             <policyMap>
> > > >
> > > >               <policyEntries>
> > > >
> > > >                 <policyEntry topic=">" >
> > > >
> > > >                     <!-- The constantPendingMessageLimitStrategy is
> > > > used to prevent
> > > >
> > > >                          slow topic consumers to block producers and
> > > > affect other consumers
> > > >
> > > >                          by limiting the number of messages that are
> > > > retained
> > > >
> > > >                          For more information, see:
> > > >
> > > >
> > > >
> > > >
> > > > http://activemq.apache.org/slow-consumer-handling.html
> > > >
> > > >
> > > >
> > > >                     -->
> > > >
> > > >                   <pendingMessageLimitStrategy>
> > > >
> > > >                     <constantPendingMessageLimitStrategy
> > > > limit="1000"/>
> > > >
> > > >                   </pendingMessageLimitStrategy>
> > > >
> > > >                 </policyEntry>
> > > >
> > > >
> > > > <policyEntry queue="test" enableAudit="false">
> > > >
> > > >
> > > > <networkBridgeFilterFactory>
> > > >
> > > >
> > > > <conditionalNetworkBridgeFilterFactory
> > > > replayWhenNoConsumers="true"/>
> > > >
> > > >
> > > > </networkBridgeFilterFactory>
> > > >
> > > >
> > > > </policyEntry>
> > > >
> > > >
> > > > <policyEntry topic="*sourceTopic*" enableAudit="false">
> > > >
> > > >
> > > > <networkBridgeFilterFactory>
> > > >
> > > >
> > > > <conditionalNetworkBridgeFilterFactory
> > > > replayWhenNoConsumers="true"/>
> > > >
> > > >
> > > > </networkBridgeFilterFactory>
> > > >
> > > >
> > > > </policyEntry>
> > > >
> > > >               </policyEntries>
> > > >
> > > >             </policyMap>
> > > >
> > > >         </destinationPolicy>
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >                                 <destinationInterceptors>
> > > >
> > > >
> > > > <virtualDestinationInterceptor>
> > > >
> > > >
> > > > <virtualDestinations>
> > > >
> > > >
> > > > <compositeTopic forwardOnly="true" name="* sourceTopic*">
> > > >
> > > >
> > > > <forwardTo>
> > > >
> > > >
> > > > <queue physicalName="test" />
> > > >
> > > >
> > > >
> > > > <queue physicalName="test2" />
> > > >
> > > >
> > > > </forwardTo>
> > > >
> > > >
> > > > </compositeTopic>
> > > >
> > > >
> > > > </virtualDestinations>
> > > >
> > > >
> > > > </virtualDestinationInterceptor>
> > > >
> > > >                                 </destinationInterceptors>
> > > >
> > > >
> > > >
> > > >         <!--
> > > >
> > > >             The managementContext is used to configure how ActiveMQ
> > > > is exposed in
> > > >
> > > >             JMX. By default, ActiveMQ uses the MBean server that is
> > > > started by
> > > >
> > > >             the JVM. For more information, see:
> > > >
> > > >
> > > >
> > > >            http://activemq.apache.org/jmx.html
> > > >
> > > >         -->
> > > >
> > > >         <managementContext>
> > > >
> > > >             <managementContext createConnector="false"/>
> > > >
> > > >         </managementContext>
> > > >
> > > >
> > > >
> > > >         <!--
> > > >
> > > >             Configure message persistence for the broker. The
> > > > default persistence
> > > >
> > > >             mechanism is the KahaDB store (identified by the kahaDB
> > tag).
> > > >
> > > >             For more information, see:
> > > >
> > > >
> > > >
> > > >             http://activemq.apache.org/persistence.html
> > > >
> > > >         -->
> > > >
> > > >         <persistenceAdapter>
> > > >
> > > >             <kahaDB directory="${activemq.data}/kahadb"/>
> > > >
> > > >         </persistenceAdapter>
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >           <!--
> > > >
> > > >             The systemUsage controls the maximum amount of space the
> > > > broker will
> > > >
> > > >             use before disabling caching and/or slowing down
> producers.
> > > > For more information, see:
> > > >
> > > >             http://activemq.apache.org/producer-flow-control.html
> > > >
> > > >           -->
> > > >
> > > >           <systemUsage>
> > > >
> > > >             <systemUsage>
> > > >
> > > >                 <memoryUsage>
> > > >
> > > >                     <memoryUsage percentOfJvmHeap="70" />
> > > >
> > > >                 </memoryUsage>
> > > >
> > > >                 <storeUsage>
> > > >
> > > >                     <storeUsage limit="100 gb"/>
> > > >
> > > >                 </storeUsage>
> > > >
> > > >                 <tempUsage>
> > > >
> > > >                     <tempUsage limit="50 gb"/>
> > > >
> > > >                 </tempUsage>
> > > >
> > > >             </systemUsage>
> > > >
> > > >         </systemUsage>
> > > >
> > > >
> > > >
> > > >         <!--
> > > >
> > > >             The transport connectors expose ActiveMQ over a given
> > > > protocol to
> > > >
> > > >             clients and other brokers. For more information, see:
> > > >
> > > >
> > > >
> > > >             http://activemq.apache.org/configuring-transports.html
> > > >
> > > >         -->
> > > >
> > > >         <transportConnectors>
> > > >
> > > >             <!-- DOS protection, limit concurrent connections to
> > > > 1000 and frame size to 100MB -->
> > > >
> > > >             <transportConnector name="openwire" uri="tcp://
> > > > 0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=10
> > > > 48
> > > > 57600
> > > > "/>
> > > >
> > > >             <!--
> > > >
> > > >                                                 <transportConnector
> > > > name="amqp" uri="amqp://
> > > > 0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104
> > > > 85
> > > > 7600
> > > > "/>
> > > >
> > > >             <transportConnector name="stomp" uri="stomp://
> > > > 0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=10
> > > > 48
> > > > 57600
> > > > "/>
> > > >
> > > >             <transportConnector name="mqtt" uri="mqtt://
> > > > 0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104
> > > > 85
> > > > 7600
> > > > "/>
> > > >
> > > >             <transportConnector name="ws" uri="ws://
> > > > 0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=10
> > > > 48
> > > > 57600
> > > > "/>
> > > >
> > > >                                                 -->
> > > >
> > > >         </transportConnectors>
> > > >
> > > >
> > > >
> > > >         <!-- destroy the spring context on shutdown to stop jetty
> > > > -->
> > > >
> > > >         <shutdownHooks>
> > > >
> > > >             <bean xmlns="http://www.springframework.org/schema/beans
> "
> > > > class="org.apache.activemq.hooks.SpringContextHook" />
> > > >
> > > >         </shutdownHooks>
> > > >
> > > >
> > > >
> > > >                                 <networkConnectors>
> > > >
> > > >                                                 <networkConnector
> > > > name="edf-to-thirdpartybroker-network-connector"
> > > >
> > > >
> >  useVirtualDestSubs="true"
> > > >
> > > >                                                 duplex="true"
> > > >
> > > >                                                 uri="static:(tcp://
> > > > 127.0.0.1:61616)" userName="omitted" password="omitted" >
> > > >
> > > >
> > > > <staticallyIncludedDestinations>
> > > >
> > > >
> > > > <topic physicalName="*sourceTopic*"/>
> > > >
> > > >
> > > > </staticallyIncludedDestinations>
> > > >
> > > >
> > > > <excludedDestinations>
> > > >
> > > >
> > > > <queue physicalName=">"/>
> > > >
> > > >
> > > > </excludedDestinations>
> > > >
> > > >
> > > >                                                 </networkConnector>
> > > >
> > > >                                 </networkConnectors>
> > > >
> > > >
> > > >
> > > >                                 <plugins>
> > > >
> > > >
> > > > <simpleAuthenticationPlugin>
> > > >
> > > >                                                   <users>
> > > >
> > > >
> > > > <authenticationUser username="system" password="omitted"
> > > > groups="users,admins"/>
> > > >
> > > >
> > > > <authenticationUser username="queueAUser" password="omitted"
> > > > groups="queueAusers"/>
> > > >
> > > >
> > > > <authenticationUser username="queueBUser" password="omitted"
> > > > groups="queueBusers"/>
> > > >
> > > >                                                   </users>
> > > >
> > > >
> > > > </simpleAuthenticationPlugin>
> > > >
> > > >
> > > >
> > > >
> > > > <authorizationPlugin>
> > > >
> > > >         <map>
> > > >
> > > >           <authorizationMap>
> > > >
> > > >             <authorizationEntries>
> > > >
> > > >               <authorizationEntry queue=">" read="admins"
> > write="admins"
> > > > admin="admins" />
> > > >
> > > >               <authorizationEntry queue="test" read="queueAusers"
> > > > write="queueAusers" admin="queueAUsers" />
> > > >
> > > >
> > > > <authorizationEntry queue="test2" read="queueBusers"
> > write="queueBusers"
> > > > admin="queueBusers" />
> > > >
> > > >
> > > >
> > > >               <authorizationEntry topic=">" read="admins"
> > write="admins"
> > > > admin="admins" />
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >               <authorizationEntry topic="ActiveMQ.Advisory.>"
> > > > read="queueAusers,queueBusers" write="admins,queueAusers,queueBusers"
> > > > admin="admins,queueAusers,queueBusers" />
> > > >
> > > >             </authorizationEntries>
> > > >
> > > >
> > > >
> > > >             <!-- let's assign roles to temporary destinations.
> > > > comment this entry if we don't want any roles assigned to temp
> > > > destinations
> > > > -->
> > > >
> > > >
> > > >
> > > >           </authorizationMap>
> > > >
> > > >         </map>
> > > >
> > > >       </authorizationPlugin>
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >                                 </plugins>
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >                 </broker>
> > > >
> > > >
> > > >
> > > >     <!--
> > > >
> > > >         Enable web consoles, REST and Ajax APIs and demos
> > > >
> > > >         The web consoles requires by default login, you can disable
> > > > this in the jetty.xml file
> > > >
> > > >
> > > >
> > > >         Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more
> > > > details
> > > >
> > > >     -->
> > > >
> > > >     <import resource="jetty.xml"/>
> > > >
> > > >
> > > >
> > > > </beans>
> > > >
> > > > <!-- END SNIPPET: example -->
> > > >
> > > >
> > > >
> > > > *Richard Cannock*
> > > >
> > > > *Solution Architect*
> > > >
> > > > Business Change and IT
> > > >
> > > > Customers
> > > >
> > > >
> > > >
> > > > *T:* +44 (0) 2081862465
> > > >
> > > > *M:* +44 (0)7729 320505
> > > >
> > > >
> > > >
> > > > Barnwood
> > > >
> > > > Barnett Way
> > > >
> > > > Gloucester, Gloucestershire, GL4 3RS
> > > >
> > > >
> > > >
> > > > [image: cid:4E18CA00-3CA9-4B94-970C-09C6DB028C24]
> > > >
> > > >
> > > >
> > > > edfenergy.com <http://www.edfenergy.com/>
> > > >
> > > >
> > > >
> > > > Please consider the environment before printing this email
> > > >
> > > >
> > > >
> > > > This e-mail and any files transmitted with it are confidential and
> > > > may be protected by legal privilege. If you are not the intended
> > > > recipient, please notify the sender and delete the e-mail from your
> > > system.
> > > > This e-mail has been scanned for malicious content but the internet
> > > > is inherently insecure and EDF Energy Limited can not accept any
> > > > liability for the integrity of this message or its attachments.
> > > > No employee or agent of EDF Energy Limited or any related company is
> > > > authorised to conclude any binding agreement on behalf of EDF Energy
> > > > Limited or any related company by e-mail.
> > > >
> > > > All e-mails sent and received by EDF Energy Limited are monitored to
> > > > ensure compliance with the company's information security policy.
> > > > Executable and script files are not permitted through the EDF Energy
> > > > Limited mail gateway.  EDF Energy does not accept or send mails
> > > > above
> > > > 30 Mb in size.
> > > >
> > > > EDF Energy Limited
> > > > Registered in England and Wales No. 2366852 Registered Office: 90
> > > > Whitfield Street, London W1T 4EZ
> > > >
> > >
> > >
> > > This e-mail and any files transmitted with it are confidential and may
> > > be protected by legal privilege. If you are not the intended
> > > recipient, please notify the sender and delete the e-mail from your
> > system.
> > > This e-mail has been scanned for malicious content but the internet is
> > > inherently insecure and EDF Energy Limited cannot accept any liability
> > > for the integrity of this message or its attachments. No employee or
> > > agent of EDF Energy Limited or any related company is authorised to
> > > conclude any binding agreement on behalf of EDF Energy Limited or any
> > > related company by e-mail.
> > > All e-mails sent and received by EDF Energy Limited are monitored to
> > > ensure compliance with the company's information security policy.
> > > Executable and script files are not permitted through the EDF Energy
> > > Limited mail gateway. EDF Energy does not accept or send mails above
> > > 30 Mb in size.
> > >
> > > EDF Energy Limited Registered in England and Wales No. 2366852
> > > Registered
> > > Office: 90 Whitfield Street, London W1T 4EZ
> > >
> >
> >
> > This e-mail and any files transmitted with it are confidential and may be
> > protected by legal privilege. If you are not the intended recipient,
> please
> > notify the sender and delete the e-mail from your system.
> > This e-mail has been scanned for malicious content but the internet is
> > inherently insecure and EDF Energy Limited cannot accept any liability
> for
> > the integrity of this message or its attachments. No employee or agent of
> > EDF Energy Limited or any related company is authorised to conclude any
> > binding agreement on behalf of EDF Energy Limited or any related company
> by
> > e-mail.
> > All e-mails sent and received by EDF Energy Limited are monitored to
> > ensure compliance with the company's information security policy.
> > Executable and script files are not permitted through the EDF Energy
> > Limited mail gateway. EDF Energy does not accept or send mails above 30
> Mb
> > in size.
> >
> > EDF Energy Limited Registered in England and Wales No. 2366852 Registered
> > Office: 90 Whitfield Street, London W1T 4EZ
> >
>

Reply via email to